Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add process_input and process_chunk hooks #65

Merged
merged 5 commits into from
Jan 28, 2021

Conversation

davidbrochart
Copy link
Contributor

This PR adds a process_input parameter to NetCDFtoZarrSequentialRecipe, which is a function optionally called after each input has been opened. It takes in the name of the opened file and the dataset, and returns a dataset.
The motivation is that it is very likely that we need to tweak the input data for the rest of the flow. For instance, some datasets have their timestamp encoded in the file name, so if you want to concatenate on a time dimension, you need to extract the timestamp from the file name and put it in a time coordinate.
If you think this can go in, I'll write a test.

@martindurant
Copy link
Contributor

This kind of functionality would be nice to have in Intake catalogues in general, and then you could express your inputs as usual Data Sources. We have that, e.g., for the CSV case (intake.source.base.PatternMixin), so the template syntax used there might be a good model to adhere to.

@rabernat
Copy link
Contributor

Thanks David! In general this is great and is and partially addresses #48. If you can also add a process_chunk option, then it will close #48 entirely.

We will need tests. I would prefer to have these things parametrized such that we are testing both with and without the option. I was puzzled why this didn't trigger codecov (see #61), but I realized I had not activated the webhook. So if you push again, we should get a coverage report.

@rabernat
Copy link
Contributor

so the template syntax used there might be a good model to adhere to.

@martindurant could you link us directly to the code you're talking about?

@martindurant
Copy link
Contributor

@davidbrochart
Copy link
Contributor Author

Awesome, thanks @martindurant and @rabernat.

@rabernat
Copy link
Contributor

See #63 (comment) for a discussion of how we imagine refactoring this monster class.

@rabernat rabernat linked an issue Jan 28, 2021 that may be closed by this pull request
@codecov-io
Copy link

codecov-io commented Jan 28, 2021

Codecov Report

Merging #65 (5ec76b4) into master (266318d) will increase coverage by 0.16%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #65      +/-   ##
==========================================
+ Coverage   91.07%   91.24%   +0.16%     
==========================================
  Files           4        4              
  Lines         269      274       +5     
  Branches       20       22       +2     
==========================================
+ Hits          245      250       +5     
  Misses         21       21              
  Partials        3        3              
Impacted Files Coverage Δ
pangeo_forge/recipe.py 90.60% <100.00%> (+0.32%) ⬆️
pangeo_forge/utils.py 100.00% <0.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 46b465c...5ec76b4. Read the comment docs.

@davidbrochart
Copy link
Contributor Author

I think the pattern matching in the file name mentioned by Martin could be implemented in another PR. We could provide decorators that could be applied to the hooks, like this:

@match(fname="data_{year}_{month}_{day}.nc")
def process_input(ds):
    # do some processing on ds using year, month and day
    return ds

What do you think?

Copy link
Contributor

@rabernat rabernat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 This is fantastic @davidbrochart!

@@ -8,6 +8,13 @@
dummy_fnames = ["a.nc", "b.nc", "c.nc"]


def incr_date(ds, fname=""):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the point of the fname kwarg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use incr_date for both process_input and process_chunk in the test, but they have a different signature:

  • process_input(ds, fname) -> ds
  • process_chunk(ds) -> ds

Would you like to have different hooks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, yes I see now. Adding the fname argument in the process_input makes sense I guess. If we were opening files directly with xarray (rather than first with fsspec) this information would also be available in the xarray .encoding attributes. But that's not the case here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also confused on this point. Can we rename fname to filename? I read fname as function name.

# check that the process_chunk hook made some changes
assert not ds_target.identical(ds_expected)
# apply these changes to the expected dataset
ds_expected = process_chunk(ds_expected)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great test.

@davidbrochart davidbrochart changed the title [WIP] Add process_input parameter Add process_input parameter Jan 28, 2021
@davidbrochart davidbrochart changed the title Add process_input parameter Add process_input and process_chunk hooks Jan 28, 2021
@rabernat
Copy link
Contributor

rabernat commented Jan 28, 2021

It would be nice to get #6 in and then rebase so we could see the API documentation built from this PR.

@davidbrochart
Copy link
Contributor Author

I think you meant #46, I'm looking at it.

@rabernat
Copy link
Contributor

Docs look great! Thanks so much for your work on this David!

I'd like to get one more set of eyes on this. I've tagged a few potential reviewers.

@@ -8,6 +8,13 @@
dummy_fnames = ["a.nc", "b.nc", "c.nc"]


def incr_date(ds, fname=""):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also confused on this point. Can we rename fname to filename? I read fname as function name.

Co-authored-by: Tom Augspurger <1312546+TomAugspurger@users.noreply.github.com>
@@ -156,6 +160,8 @@ class NetCDFtoZarrSequentialRecipe(BaseRecipe):
xarray_concat_kwargs: dict = field(default_factory=dict)
delete_input_encoding: bool = True
fsspec_open_kwargs: dict = field(default_factory=dict)
process_input: Optional[Callable[[xr.Dataset, str], xr.Dataset] = None
Copy link
Contributor

@TomAugspurger TomAugspurger Jan 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, sorry my suggestion was missing a ].

Might want to make sure typing.Callable is imported as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @TomAugspurger!

@rabernat rabernat merged commit aa5f8a9 into pangeo-forge:master Jan 28, 2021
@davidbrochart
Copy link
Contributor Author

I see we're still having an issue with the HTTP server in the CI?

@rabernat
Copy link
Contributor

I see we're still having an issue with the HTTP server in the CI?

Yes. The short term fix is maybe to just bump the sleep call again. The long term one is maybe to use this: https://pypi.org/project/pytest-httpserver/

@davidbrochart
Copy link
Contributor Author

Not sure pytest-httpserver will help since the issue doesn't seem to be with pytest but with the CI (I've never seen the tests hang locally).

@martindurant
Copy link
Contributor

I commonly have something like this to wait for servers:

    timeout = 5
    while True:
        try:
            r = requests.get(endpoint_uri)
            if r.ok:
                break
        except:
            pass
        timeout -= 0.1
        assert timeout > 0, "Timed out waiting for server"
        time.sleep(0.1)

@davidbrochart
Copy link
Contributor Author

Thanks Martin, that's much better than waiting for an arbitrary amount of time! I'll open a PR.

@rabernat
Copy link
Contributor

(I've never seen the tests hang locally).

I have! On my os-x laptop.

This was referenced Jan 29, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Input / chunk preprocessing
5 participants