Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fsspec HDF5 read issue with noaa-oisst-avhrr-only. #106

Open
sharkinsspatial opened this issue Apr 16, 2021 · 9 comments
Open

fsspec HDF5 read issue with noaa-oisst-avhrr-only. #106

sharkinsspatial opened this issue Apr 16, 2021 · 9 comments

Comments

@sharkinsspatial
Copy link
Contributor

@rabernat I'll investigate a bit more but I wanted to flag this here since you had logged a similar issue previously. When running a subset of the noaa-oisst-avhrr-only recipe I hit the following during processing with a single worker node

Unexpected error: ValueError('I/O operation on closed file.')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.8/site-packages/rechunker/executors/prefect.py", line 30, in run
    return self.stage.func(key)
  File "/Users/seanharkins/projects/pangeo-forge-flow-registration/venv/lib/python3.8/site-packages/pangeo_forge/recipe.py", line 347, in _store_chunk
  File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.8/site-packages/pangeo_forge/recipe.py", line 454, in open_chunk
    dsets = [stack.enter_context(self.open_input(i)) for i in inputs]
  File "/usr/local/lib/python3.8/site-packages/pangeo_forge/recipe.py", line 454, in <listcomp>
    dsets = [stack.enter_context(self.open_input(i)) for i in inputs]
  File "/usr/local/lib/python3.8/contextlib.py", line 425, in enter_context
    result = _cm_type.__enter__(cm)
  File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.8/site-packages/pangeo_forge/recipe.py", line 421, in open_input
    ds = xr.open_dataset(f, **self.xarray_open_kwargs)
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/api.py", line 500, in open_dataset
    backend_ds = backend.open_dataset(
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py", line 383, in open_dataset
    ds = store_entrypoint.open_dataset(
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/store.py", line 25, in open_dataset
    vars, attrs, coord_names = conventions.decode_cf_variables(
  File "/usr/local/lib/python3.8/site-packages/xarray/conventions.py", line 512, in decode_cf_variables
    new_vars[k] = decode_cf_variable(
  File "/usr/local/lib/python3.8/site-packages/xarray/conventions.py", line 360, in decode_cf_variable
    var = times.CFDatetimeCoder(use_cftime=use_cftime).decode(var, name=name)
  File "/usr/local/lib/python3.8/site-packages/xarray/coding/times.py", line 522, in decode
    dtype = _decode_cf_datetime_dtype(data, units, calendar, self.use_cftime)
  File "/usr/local/lib/python3.8/site-packages/xarray/coding/times.py", line 139, in _decode_cf_datetime_dtype
    [first_n_items(values, 1) or [0], last_item(values) or [0]]
  File "/usr/local/lib/python3.8/site-packages/xarray/core/formatting.py", line 71, in first_n_items
    return np.asarray(array).flat[:n_desired]
  File "/usr/local/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/usr/local/lib/python3.8/site-packages/xarray/core/indexing.py", line 503, in __array__
    return np.asarray(self.array, dtype=dtype)
  File "/usr/local/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/usr/local/lib/python3.8/site-packages/xarray/core/indexing.py", line 568, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py", line 44, in __getitem__
    return indexing.explicit_indexing_adapter(
  File "/usr/local/lib/python3.8/site-packages/xarray/core/indexing.py", line 857, in explicit_indexing_adapter
    result = raw_indexing_method(raw_key.tuple)
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py", line 54, in _getitem
    return array[key]
  File "/usr/local/lib/python3.8/site-packages/h5netcdf/core.py", line 157, in __getitem__
    return self._h5ds[key]
  File "h5py/_objects.pyx", line 54, in h5py._objects.with_phil.wrapper
  File "h5py/_objects.pyx", line 55, in h5py._objects.with_phil.wrapper
  File "/usr/local/lib/python3.8/site-packages/h5py/_hl/dataset.py", line 790, in __getitem__
    self.id.read(mspace, fspace, arr, mtype, dxpl=self._dxpl)
  File "h5py/_objects.pyx", line 54, in h5py._objects.with_phil.wrapper
  File "h5py/_objects.pyx", line 55, in h5py._objects.with_phil.wrapper
  File "h5py/h5d.pyx", line 192, in h5py.h5d.DatasetID.read
  File "h5py/_proxy.pyx", line 112, in h5py._proxy.dset_rw
  File "h5py/h5fd.pyx", line 162, in h5py.h5fd.H5FD_fileobj_read
  File "/usr/local/lib/python3.8/site-packages/fsspec/spec.py", line 1457, in readinto
    data = self.read(out.nbytes)
  File "/usr/local/lib/python3.8/site-packages/fsspec/spec.py", line 1442, in read
    raise ValueError("I/O operation on closed file.")
ValueError: I/O operation on closed file.
@rabernat
Copy link
Contributor

Thanks for the issue.

Can you give a little more context. What code produced this error?

@sharkinsspatial
Copy link
Contributor Author

sharkinsspatial commented Apr 16, 2021

Running this recipe the failure is occurring on open_dataset. Currently running on a cluster with a single worker node using the following versions (this PR branch of xarray is necessary to support NetCDFToZarrRecipe use of safe_chunks.

"botocore==1.19.52",
"s3fs==0.6.0",
"boto3==1.16.52",
"xarray@git+https://github.com/pydata/xarray.git@refs/pull/5065/merge#egg=xarray",
"dask-cloudprovider==2021.3.1",
"rechunker@git+https://github.com/pangeo-data/rechunker#egg=rechunker",
"pangeo_forge@git+https://github.com/pangeo-forge/pangeo-forge#egg=pangeo_forge",

@rabernat
Copy link
Contributor

Ok and how are you "running" it? Via the prefect executor? Can you share the code that is loading and executing the recipe?

We need to get to a reproducer that we can pass around, for example, to @martindurant, to dig into the fsspec part of this. Clearly the recipe works in other contexts, e.g. https://pangeo-forge.readthedocs.io/en/latest/tutorials/netcdf_zarr_sequential.html

@sharkinsspatial
Copy link
Contributor Author

@rabernat The is being run using the PrefectPipelineExecutor against our AWS bakery with a Prefect agent and dynamically created Dask cluster running on our Fargate infrastructure and S3 storage. For @martindurant to view detailed logs it may be most straightforward for me to add him to our development Prefect Cloud account directly. This is the code which loads and executes the recipe from the meta.yaml using hardcoded values to instantiate the DaskExecutor

import yaml
import os
import importlib
from rechunker.executors import PrefectPipelineExecutor
import dask.distributed as distributed
from distributed import PipInstall
from prefect import storage
from prefect.executors import DaskExecutor
from prefect.run_configs import ECSRun
from typing import TypedDict, Literal
from s3fs import S3FileSystem
from pangeo_forge.storage import FSSpecTarget, CacheFSSpecTarget

definition = yaml.safe_load(
    """
    networkMode: awsvpc
    cpu: 1024
    memory: 2048
    containerDefinitions:
        - name: flow
    """
)
definition["executionRoleArn"] = "arn:aws:iam::552819999234:role/pangeo-forge-aws-bakery-p-prefectecstaskexecutionr-5ZGMH0A8LVXF"
worker_image = "552819999234.dkr.ecr.us-west-2.amazonaws.com/pangeo-forge-aws-bakery-worker"

with open("meta.yaml") as f:
    meta = yaml.load(f, Loader=yaml.FullLoader)
    for recipe_meta in meta["recipes"]:
        module_path = os.path.abspath(recipe_meta["module"])
        spec = importlib.util.spec_from_file_location(recipe_meta["name"], module_path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        recipe = module.recipe
        fs = S3FileSystem(anon=False, default_cache_type='none', default_fill_cache=False,)
        target_path = f"s3://pangeo-forge-aws-bakery-flowcachebucketpangeofor-196cpck7y0pbl/target/{recipe_meta['id']}"
        target = FSSpecTarget(fs, target_path)
        recipe.target = target
        cache_path = f"s3://pangeo-forge-aws-bakery-flowcachebucketpangeofor-196cpck7y0pbl/cache/{recipe_meta['id']}"
        cache_target = CacheFSSpecTarget(fs, cache_path)
        recipe.input_cache = cache_target
        recipe.metadata_cache = target

        pipeline = recipe.to_pipelines()

        dask_executor = DaskExecutor(
            cluster_class="dask_cloudprovider.aws.FargateCluster",
            cluster_kwargs={
                "image": worker_image,
                "vpc": "vpc-0e519fd83fa521d72",
                "cluster_arn": "arn:aws:ecs:us-west-2:552819999234:cluster/pangeo-forge-aws-bakery-pangeo-forge-dask-bakeryclusterpangeoforgedask71B831F8-BTL3Vmp8cuso",
                "task_role_arn": "arn:aws:iam::552819999234:role/pangeo-forge-aws-bakery-p-prefectecstaskrolepangeo-3R73K3Z1XU70",
                "execution_role_arn": "arn:aws:iam::552819999234:role/pangeo-forge-aws-bakery-p-prefectecstaskexecutionr-5ZGMH0A8LVXF",
                "security_groups": ["sg-0c4d6e997637c801d"],
                "n_workers": 1,
                "scheduler_cpu": 1024,
                "scheduler_mem": 2048,
                "worker_cpu": 1024,
                "worker_mem": 4096,
                "scheduler_timeout": "15 minutes",
                "tags": {
                    "Project": "pangeo-forge",
                    "Recipe": recipe_meta["id"],
                }
            },
        )
        executor = PrefectPipelineExecutor()
        flow = executor.pipelines_to_plan(pipeline)
        flow.storage = storage.S3(bucket="pangeo-forge-aws-bakery-flowstoragebucketpangeof-71w6gsnambj9")
        flow.run_config = ECSRun(
            image=worker_image,
            labels=["dask_test"],
            task_definition=definition,
            run_task_kwargs={
                "tags": [
                    {"key": "Project", "value": "pangeo-forge"},
                    {"key": "Recipe", "value": recipe_meta["id"]}
                ]
            },
        )
        flow.executor = dask_executor
        flow.name = recipe_meta["id"]
        flow.register(project_name="pangeo-forge-aws-bakery")

For testing purposes I have running flows via Prefect Cloud console to view log aggregation rather than using execute_plan.

@rabernat
Copy link
Contributor

Sorry to keep asking questions! ... but can you link me to the worker logs? Do you have the pangeo_forge logs turned on?

@sharkinsspatial
Copy link
Contributor Author

sharkinsspatial commented Apr 16, 2021

@rabernat Let me kick off a new run with the log level set for debug and collect the worker logs for you.

@sharkinsspatial
Copy link
Contributor Author

@rabernat I attempted 4 more full runs without reproducing the error so I'm unsure what the root cause may have been. Regardless it was a good exercise as I was able to investigate some of the logging points raised in #92 and develop a good potential approach. I'll post my notes in #92.

@rabernat
Copy link
Contributor

Ok, glad it resolved, but nevertheless we should keep an eye on this. Intermittent bugs are the hardest to squash!

@cisaacstern
Copy link
Member

Do we think #171 fixed this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants