-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8f3df96
commit 69f0116
Showing
8 changed files
with
126 additions
and
168 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,75 +1,94 @@ | ||
from collections import defaultdict | ||
|
||
import pandas as pd | ||
|
||
from flora.pylib.rules.part import Part | ||
|
||
from . import writer_utils as w_utils | ||
|
||
PARTS_SET = {*Part.labels, "multiple_parts"} | ||
|
||
|
||
class CsvWriter: | ||
def __init__(self, csv_file, csv_min=0, first=None): | ||
self.csv_file = csv_file | ||
self.csv_min = csv_min | ||
self.csv_rows = [] | ||
self.first = first if first else ["taxon"] | ||
|
||
def write(self, treatments, size_units="centimeters"): | ||
csv_rows = self.format_all_rows(treatments) | ||
df = pd.DataFrame(csv_rows) | ||
df = self.sort_df(df) | ||
|
||
with self.csv_file.open("w") as out_file: | ||
out_file.write(f"** All sizes are given in {size_units}. **\n") | ||
df.to_csv(out_file, index=False) | ||
|
||
def format_all_rows(self, treatments): | ||
csv_rows = [self.format_row(r) for r in treatments] | ||
return csv_rows | ||
|
||
def format_row(self, treatment): | ||
csv_row = {"taxon": treatment.taxon} | ||
return self.row_builder(treatment, csv_row) | ||
|
||
def row_builder(self, treatment, csv_row): | ||
by_header = defaultdict(list) | ||
for trait in treatment.traits: | ||
if trait["trait"] in PARTS_SET: | ||
continue | ||
|
||
key_set = set(trait.keys()) | ||
|
||
if not (PARTS_SET & key_set): | ||
continue | ||
|
||
base_header = w_utils.html_label(trait) | ||
|
||
self.group_values_by_header(by_header, trait, base_header) | ||
self.number_columns(by_header, csv_row) | ||
return csv_row | ||
|
||
def sort_df(self, df): | ||
rest = [ | ||
c | ||
for c in df.columns | ||
if c not in self.first and df[c].notna().sum() >= self.csv_min | ||
] | ||
|
||
columns = self.first + sorted(rest) | ||
df = df[columns] | ||
return df | ||
|
||
@staticmethod | ||
def group_values_by_header(by_header, trait, base_header): | ||
filtered = {k: v for k, v in trait.items() if k not in w_utils.COLUMN_SKIPS} | ||
by_header[base_header].append(filtered) | ||
|
||
@staticmethod | ||
def number_columns(by_header, csv_row): | ||
for unnumbered_header, trait_list in by_header.items(): | ||
for i, trait in enumerate(trait_list, 1): | ||
for key, value in trait.items(): | ||
header = f"{unnumbered_header}.{i}.{key}" | ||
csv_row[header] = value | ||
from traiter.pylib.darwin_core import DarwinCore | ||
|
||
from flora.pylib.treatments import Treatments | ||
|
||
TAXON = "dwc:scientificName" | ||
|
||
|
||
def write_csv(treatments: Treatments): | ||
rows = [] | ||
for treatment in treatments: | ||
grouped = group_traits(treatment) | ||
flattened = flatten_traits(grouped) | ||
formatted = remove_duplicates(flattened) | ||
add_row_fields(treatment, formatted) | ||
rows.append(formatted) | ||
|
||
max_indexes = get_max_indexes(rows) | ||
rows = number_columns(rows, max_indexes) | ||
|
||
df = pd.DataFrame(rows) | ||
print(df.head()) | ||
# sort columns | ||
# output data frame | ||
|
||
|
||
def number_columns(rows, max_indexes): | ||
new_rows = [] | ||
for row in rows: | ||
new_row = {} | ||
for (key, i), value in row.items(): | ||
suffix = f"_{i}" if max_indexes[key] > 1 else "" | ||
for col, val in value.items(): | ||
new_row[col + suffix] = val | ||
|
||
new_rows.append(new_row) | ||
return new_rows | ||
|
||
|
||
def get_max_indexes(rows): | ||
max_index = defaultdict(int) | ||
for row in rows: | ||
for key, i in row: | ||
if i > max_index[key]: | ||
max_index[key] = i | ||
return max_index | ||
|
||
|
||
def remove_duplicates(flattened): | ||
cleaned = {} | ||
for key, values in flattened.items(): | ||
i = 0 | ||
used = set() | ||
for val in values: | ||
as_tuple = tuple(val.items()) | ||
if as_tuple not in used: | ||
i += 1 | ||
used.add(as_tuple) | ||
cleaned[(key, i)] = val | ||
return cleaned | ||
|
||
|
||
def flatten_traits(grouped): | ||
flattened = defaultdict(list) | ||
for name, dwc_list in grouped.items(): | ||
for dwc_value in dwc_list: | ||
new = {} | ||
flat = dwc_value.flatten() | ||
for key, value in flat.items(): | ||
if isinstance(value, dict): | ||
for field, val in value.items(): | ||
new[f"{key}_{field}"] = val | ||
else: | ||
new[key] = value | ||
flattened[name].append(new) | ||
return flattened | ||
|
||
|
||
def group_traits(treatment): | ||
grouped: dict[str, list[DarwinCore]] = defaultdict(list) | ||
for trait in treatment.traits: | ||
dwc = DarwinCore() | ||
dwc_trait = trait.to_dwc(dwc) | ||
grouped[trait.key].append(dwc_trait) | ||
return grouped | ||
|
||
|
||
def add_row_fields(treatment, formatted: dict[tuple, dict]): | ||
taxon = formatted.get((TAXON, 1)) | ||
taxon = taxon[TAXON] if taxon else "unknown" | ||
formatted[("taxon", 1)] = {"taxon": taxon} | ||
formatted[("treatment", 1)] = {"treatment": treatment.path.stem} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.