Skip to content

Commit

Permalink
Merge pull request #101 from csbrown/master
Browse files Browse the repository at this point in the history
adding ability for core file to have a hash index
  • Loading branch information
niconoe authored Nov 13, 2023
2 parents b5b2bea + 01104a3 commit accbf19
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 9 deletions.
17 changes: 8 additions & 9 deletions dwca/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import List, Union, IO, Dict, Optional

from dwca.descriptors import DataFileDescriptor
from dwca.rows import CoreRow, ExtensionRow
from dwca.rows import CoreRow, ExtensionRow, Row


class CSVDataFile(object):
Expand Down Expand Up @@ -105,11 +105,6 @@ def coreid_index(self) -> Dict[str, array]:
Creating this index can be time and memory consuming for large archives, so it's created on the fly
at first access.
"""
if self.file_descriptor.represents_corefile:
raise AttributeError(
"coreid_index is only available for extension data files"
)

if self._coreid_index is None:
self._coreid_index = self._build_coreid_index()

Expand All @@ -120,14 +115,18 @@ def _build_coreid_index(self) -> Dict[str, List[int]]:
index = {} # type: Dict[str, array[int]]

for position, row in enumerate(self):
tmp = ExtensionRow(row, position, self.file_descriptor)
index.setdefault(tmp.core_id, array('L')).append(position)
if self.file_descriptor.represents_corefile:
tmp = CoreRow(row, position, self.file_descriptor)
index.setdefault(tmp.id, array('L')).append(position)
else:
tmp = ExtensionRow(row, position, self.file_descriptor)
index.setdefault(tmp.core_id, array('L')).append(position)

return index

# TODO: For ExtensionRow and a specific field only, generalize ?
# TODO: What happens if called on a Core Row?
def get_all_rows_by_coreid(self, core_id: int) -> List[ExtensionRow]:
def get_all_rows_by_coreid(self, core_id: int) -> List[Row]:
"""Return a list of :class:`dwca.rows.ExtensionRow` whose Core Id field match `core_id`."""
if core_id not in self.coreid_index:
return []
Expand Down
58 changes: 58 additions & 0 deletions dwca/star_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from dwca.files import CSVDataFile
from typing import List, Literal
import itertools


class StarRecordIterator(object):
""" Object used to iterate over multiple DWCA-files joined on the coreid
:param files_to_join: a list of the `dwca.files.CSVDataFile`s we'd like to join.
May or may not include the core file (the core is not treated in a special way)
:param how: indicates the type of join. "inner" and "outer" correspond vaguely to
inner and full joins. The outer join includes rows that don't match on all files,
however, it doesn't create null fields to fill in when rows are missing in files.
Attempts to conform to pandas.DataFrame.merge API.
"""
def __init__(self, files_to_join: List[CSVDataFile], how: Literal["inner", "outer"] = "inner"):
self.files_to_join = files_to_join

# gather the coreids we want to join over.
self.common_core = set(self.files_to_join[0].coreid_index)
for data_file in self.files_to_join[1:]:
# inner join: coreid must be in all files
if how == "inner":
self.common_core &= set(data_file.coreid_index)
# outer join: coreid may be in any files
elif how == "outer":
self.common_core |= set(data_file.coreid_index)

# initialize iterator variables
self._common_core_iterator = iter(self.common_core)
self._cross_product_iterator = iter([])


def __next__(self):
# the next combination of rows matching this coreid
next_positions = next(self._cross_product_iterator, None)
# we finished all the combinations for this coreid
if not next_positions:
# get the next coreid
self._current_coreid = next(self._common_core_iterator)
self._files_with_current_coreid = [
csv_file for csv_file in self.files_to_join
if self._current_coreid in csv_file.coreid_index]
# this iterates over all combinations of rows matching a coreid from all files
self._cross_product_iterator = itertools.product(
*(
csv_file.coreid_index[self._current_coreid]
for csv_file in self._files_with_current_coreid
))
# go back and try to iterate over the rows for the new coreid
return next(self)
# zip up this combination of rows from all of the files.
return (
csv_file.get_row_by_position(position) for position, csv_file in zip(next_positions, self._files_with_current_coreid)
)


def __iter__(self): return self
4 changes: 4 additions & 0 deletions dwca/test/test_datafile.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ def test_coreid_index(self):
with DwCAReader(sample_data_path("dwca-2extensions.zip")) as dwca:
extension_files = dwca.extension_files

core_txt = dwca.core_file
description_txt = extension_files[0]
vernacular_txt = extension_files[1]

expected_core = {'1': array('L', [0]), '2': array('L', [1]), '3': array('L', [2]), '4': array('L', [3])}
assert core_txt.coreid_index == expected_core

expected_vernacular = {"1": array('L', [0, 1, 2]), "2": array('L', [3])}
assert vernacular_txt.coreid_index == expected_vernacular

Expand Down
55 changes: 55 additions & 0 deletions dwca/test/test_star_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from dwca.read import DwCAReader
from dwca.rows import CoreRow
from dwca.star_record import StarRecordIterator
from .helpers import sample_data_path
import pytest
import unittest

class TestStarRecordIterator(unittest.TestCase):

def test_inner_join(self):

expected_inner_join = frozenset({
frozenset({('1', 0, 'Description'), ('1', 0, 'Taxon'), ('1', 0, 'VernacularName')}),
frozenset({('1', 0, 'Description'), ('1', 0, 'Taxon'), ('1', 1, 'VernacularName')}),
frozenset({('1', 0, 'Description'), ('1', 0, 'Taxon'), ('1', 2, 'VernacularName')}),
frozenset({('1', 1, 'Description'), ('1', 0, 'Taxon'), ('1', 0, 'VernacularName')}),
frozenset({('1', 1, 'Description'), ('1', 0, 'Taxon'), ('1', 1, 'VernacularName')}),
frozenset({('1', 1, 'Description'), ('1', 0, 'Taxon'), ('1', 2, 'VernacularName')})
})

with DwCAReader(sample_data_path("dwca-2extensions.zip")) as dwca:
star_records = StarRecordIterator(dwca.extension_files + [dwca.core_file], how="inner")
stars = []
for star_record in star_records:
rows = []
for row in star_record:
rows.append((row.id if isinstance(row, CoreRow) else row.core_id, row.position, row.rowtype.split('/')[-1]))
stars.append(frozenset(rows))

assert frozenset(stars) == expected_inner_join

def test_outer_join(self):

expected_outer_join = frozenset({
frozenset({('4', 2, 'Description'), ('4', 3, 'Taxon')}),
frozenset({('1', 0, 'Description'), ('1', 0, 'Taxon'), ('1', 0, 'VernacularName')}),
frozenset({('1', 0, 'Description'), ('1', 0, 'Taxon'), ('1', 1, 'VernacularName')}),
frozenset({('1', 0, 'Description'), ('1', 0, 'Taxon'), ('1', 2, 'VernacularName')}),
frozenset({('1', 1, 'Description'), ('1', 0, 'Taxon'), ('1', 0, 'VernacularName')}),
frozenset({('1', 1, 'Description'), ('1', 0, 'Taxon'), ('1', 1, 'VernacularName')}),
frozenset({('1', 1, 'Description'), ('1', 0, 'Taxon'), ('1', 2, 'VernacularName')}),
frozenset({('3', 2, 'Taxon')}),
frozenset({('2', 1, 'Taxon'), ('2', 3, 'VernacularName')})
})

with DwCAReader(sample_data_path("dwca-2extensions.zip")) as dwca:
star_records = StarRecordIterator(dwca.extension_files + [dwca.core_file], how="outer")
stars = []
for star_record in star_records:
rows = []
for row in star_record:
rows.append((row.id if isinstance(row, CoreRow) else row.core_id, row.position, row.rowtype.split('/')[-1]))
stars.append(frozenset(rows))

assert frozenset(stars) == expected_outer_join

0 comments on commit accbf19

Please sign in to comment.