Skip to content

Commit

Permalink
fix empty integer error using schema
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-troy committed Feb 12, 2024
1 parent f320775 commit 51d1a57
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 2 deletions.
6 changes: 5 additions & 1 deletion liiatools/cin_census_pipeline/stream_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from liiatools.common.data import FileLocator, ProcessResult, DataContainer
from liiatools.common import stream_filters as stream_functions
from liiatools.common.stream_parse import dom_parse
from liiatools.common.stream_pipeline import to_dataframe_xml

from liiatools.cin_census_pipeline import stream_record

Expand Down Expand Up @@ -49,6 +50,9 @@ def task_cleanfile(
dataset = dataset_holder.value
errors = error_holder.value

dataset = DataContainer({k: pd.DataFrame(v) for k, v in dataset.items()})
# dataset = DataContainer({k: pd.DataFrame(v) for k, v in dataset.items()})
dataset = DataContainer(
{k: to_dataframe_xml(v, schema_path) for k, v in dataset.items()}
)

return ProcessResult(data=dataset, errors=errors)
5 changes: 5 additions & 0 deletions liiatools/common/data/__container.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ def _export_parquet(self, fs: FS, basename: str):
def _write(self, fs: FS, path: str, data: Any):
format = "wt" if isinstance(data, str) else "wb"
with fs.open(path, format) as f:
if isinstance(data, str):
data = (data
.replace("<NA>", "")
.replace("nan", "")
)
f.write(data)


Expand Down
28 changes: 28 additions & 0 deletions liiatools/common/stream_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Dict, List
import pandas as pd
import xml.etree.ElementTree as ET

from liiatools.common.spec.__data_schema import Column

Expand All @@ -13,4 +14,31 @@ def to_dataframe(data: List[Dict], table_config: Dict[str, Column]) -> pd.DataFr
elif column_spec.type == "category":
# set type to categorical
df[column_name] = df[column_name].astype("category")
elif column_spec.type == "numeric":
if column_spec.numeric.type == "integer":
# set type to Int64
df[column_name] = pd.to_numeric(df[column_name], errors="raise").astype('Int64')
return df


def to_dataframe_xml(data: List[Dict], table_config) -> pd.DataFrame:
df = pd.DataFrame(data)
xsd_xml = ET.parse(table_config)

for column_name in df.columns:
search_elem = f".//{{http://www.w3.org/2001/XMLSchema}}element[@name='{column_name}']"
element = xsd_xml.find(search_elem)

if element is not None:
column_type = element.attrib.get("type", None)
if column_type is not None:
if column_type == "xs:date":
# Set dtype on date columns
df[column_name] = pd.to_datetime(df[column_name], errors="raise").dt.date
elif column_type == "positiveintegertype":
# set type to Int64
df[column_name] = pd.to_numeric(df[column_name], errors="raise").astype('Int64')
elif column_type[-4:] == "type":
# set type to categorical
df[column_name] = df[column_name].astype("category")
return df
2 changes: 1 addition & 1 deletion tests/cin_census/test_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def test_s47_journeys():
"S47ActualStartDate",
"CPPstartDate",
"DateOfInitialCPC",
"YEAR",
"Year",
],
)

Expand Down

0 comments on commit 51d1a57

Please sign in to comment.