-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: make sure that no inferred schemas ever hit a delta lake #247
Conversation
75824a9
to
d539fd3
Compare
# fsspec is atomic per-transaction. | ||
# If an error occurs inside the transaction, partial writes will be discarded. | ||
# But we only want a transaction if we're writing - read transactions may error out | ||
stack.enter_context(root.fs.transaction) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know why we weren't hitting this before - I was getting test errors on I think fake memory/buffer input files that didn't expect certain fsspec semantics - but it makes sense to skip the transaction on read anyway.
d539fd3
to
ee892b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just one or two comments of questionable use while you dig into deeper debugging.
cumulus_etl/etl/tasks/base.py
Outdated
def _uniquify_batch(self, batch: list[dict]) -> list[dict]: | ||
""" | ||
Drop duplicates inside the batch to guarantee to the formatter that the "id" column is unique. | ||
|
||
This does not fix uniqueness across batches, but formatters that care about that can control for it. | ||
|
||
For context: | ||
- We have seen duplicates inside and across files generated by Cerner bulk exports. So this is a real | ||
concern found in the wild, and we can't just trust input data to be "clean." | ||
- The deltalake backend in particular would prefer the ID to be at least unique inside a batch, so that | ||
it can more easily handle merge logic. Duplicate IDs across batches will be naturally overwritten as | ||
new batches are merged in. | ||
- Other backends like ndjson can currently just live with duplicates across batches, that's fine. | ||
""" | ||
id_set = set() | ||
|
||
def is_unique(row): | ||
nonlocal id_set | ||
if row["id"] in id_set: | ||
return False | ||
id_set.add(row["id"]) | ||
return True | ||
|
||
return [row for row in batch if is_unique(row)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feels like a good memory profiling target to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This costs one extra batch size plus a set of of IDs, and is temporary. It's definitely adding to the problem, but this is (in my mind) an acceptable memory cost since it is reaped back so quickly. That is, my ideal ETL would peak at roughly 2x the batch size in memory.
Whether it's the most efficient, time or memory wise, is certainly an open topic.
cumulus_etl/fhir/fhir_schemas.py
Outdated
def pyarrow_schema_from_resource_batch(resource_type: str, batch: list[dict]) -> pyarrow.Schema: | ||
# Switch batch from a list of rows to a list of columns (which lets pyarrow infer full unionized struct types | ||
# across all column values -- it seems to normally only want to infer from the first row if you use any of its | ||
# row-oriented APIs, but if you give it columns, it looks at the whole set). | ||
column_names = get_all_column_names(resource_type) | ||
columns = {col_name: pyarrow.array(row.get(col_name) for row in batch) for col_name in column_names} | ||
inferred_schema = pyarrow.struct({name: array.type for name, array in columns.items()}) | ||
|
||
return create_pyarrow_schema_for_resource(resource_type, inferred_schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to create a schema from the expected FHIR datatype without inspecting the batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - and that's basically what we did before (the "wide" version).
But this code is adding the feature of "wide plus any deep fields in the batch" which requires inspecting the batch. The quick and dumb way I did was to use pyarrow's inferring support, but for that to work, it needs to examine columnar data.
Instead, what I could do is write some code that inspects every row/column of the batch and discovers the maximal "shape" of the structs values for those columns, so we know what fields to include in the final schema. Which is basically what pyarrow is doing. But with some memory cost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up writing that code that inspects the shape manually (rather than converting the row-oriented data to column-oriented data). It wasn't so bad. Look for merge_shape_of_dict()
in fhir_schemas.py
cumulus_etl/formats/deltalake.py
Outdated
|
||
def pyarrow_to_spark(self, table: pyarrow.Table) -> pyspark.sql.DataFrame | None: | ||
"""Transforms a pyarrow table to a spark DF""" | ||
# There must be a better way? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh dear, one only hopes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the grand scheme of things, this is a small oddity - we were writing out pandas to parquet and reading back into sparks before (to get proper schemas by way of pyarrow). And now that we have pyarrow directly, I'm just swapping in pyarrow for pandas.
Searching for how to do direct pyarrow -> pyspark yields people talking about doing this (parquet intermediate) or going to pandas as an intermediate (but that way lies bad schemas, which is why even when we used pandas, we wrote to a parquet).
The native-rust python bindings seem to be very pyarrow focused, so that's promising for the future...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a fuller comment to this now that deltalake.py
is taking in a list of dicts rather than a pyarrow Table. I know of a better way to do this, but I'm leaving it for a future improvement:
# This is the quick and dirty way - write batch to parquet with pyarrow and read it back.
# But a more direct way would be to convert the pyarrow schema to a pyspark schema and just
# call self.spark.createDataFrame(batch.rows, schema=pyspark_schema). A future improvement.
ee892b7
to
f35fe21
Compare
@dogversioning OK I re-wrote portions of this PR to keep the incoming data stream as a simple list of dictionaries for as long as possible. The conversions to and from pyarrow Tables were adding too much memory spikes. So we now only convert data when we need to at the last mile. Please re-review when you can. But this PR no long makes our memory use worse. |
cumulus_etl/fhir/fhir_schemas.py
Outdated
return create_pyarrow_schema_for_resource(resource_type, batch_shape) | ||
|
||
|
||
def merge_shape_of_dict(total: dict | None, item) -> dict: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm bouncing off this name a bit - perhaps get_shape_of_merged_dict
?
does this function need a docstring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah OK for sure a docstring and I'll try to think of a better name - my brain said merge
because it's a recursive thing that is always merging back changes into the master dict. But that can be clearer / better worded. And yeah a docstring. Good call out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK done - hopefully this is a little clearer (and I included an example in the docstring)
cumulus_etl/formats/batch.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the record, i am pretty sure this is not an issue today - do we want to give this a slightly more descriptive name? like, if we end up with a different batching system for some reason in the future, is this too generic? or do we make that change when we come to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll lean defer finding a different name, just because I'm lazy but also I (and the ETL by extension) have been using "batch" as a sort of term of art around "a chunk of input data that winds its way through the ETL, whose size is controlled by --batch-size
"
Maybe at minimum, this deserves a nice docstring saying at least that.
This commit fixes a couple oversights with inferred schemas: - The type might clash with the wide schema we have per-spec, even though the types are really compatible (inferred int for a float field does happen) - The wrong type might get through unnoticed if it is a deep field that our per-spec schema didn't catch. Here's a summary of changes to make that happen: - Drop all use of pandas. It's too loose with the types. Instead, switch to pyarrow schemas which were used under the covers of pandas anyway. - Add schema earlier in the process (at Task batching time, not at Formatter writing time). This means that all formatters get to see the same nice schema, though only deltalake uses it.
f35fe21
to
5265647
Compare
This commit fixes a couple oversights with inferred schemas:
Here's a summary of changes to make that happen:
🚨 I am worried that some of the things here will increase memory use (i.e. create multiple copies of batches). Like how I create a per-column version of the data, to get pyarrow to detect the shape of the data. I did a quick and dirty check of peak memory before and after this PR and we went from 11x batch size to 20x batch size. So like, twice the memory? But neither situation is good. I'd hoped we'd be like, 2x batch size at worst. But maybe my test is bad? I think I need to look into peak memory use and keep a unit test to avoid regressions. But for now, I think higher memory use but safe schemas, is better than not... 🚨Checklist
docs/
) needs to be updated