Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous, parallel/concurrent data fetching from a single Argo data server ? #365

Open
gmaze opened this issue Jun 14, 2024 · 3 comments · May be fixed by #392
Open

Asynchronous, parallel/concurrent data fetching from a single Argo data server ? #365

gmaze opened this issue Jun 14, 2024 · 3 comments · May be fixed by #392
Labels
internals Internal machinery performance stale No activity over the last 90 days

Comments

@gmaze
Copy link
Member

gmaze commented Jun 14, 2024

This may be a design already implemented in the test_data CLI used to populate CI tests data in mocked http servers.

However, I wonder if we should do this when fetching a large amount of file from one of the GDAC servers (https and s3) ?

The fsspec http store is already asynchronous but I don't quite understand how is parallelisation implemented for multi-files download:

fs = fsspec.filesystem("http")
out = fs.cat([url1, url2, url3])  # fetches data concurrently

Our current option is to possibly use multithreading with the parallel option of the datafetcher, that is in httpstore.open_mfdataset. With this design, we apply pre/post-processing of Argo data on chunks in parallel, but that is different from downloading in parallel, then processing in parallel (possibly with another mechanism)

eg: https://stackoverflow.com/questions/57126286/fastest-parallel-requests-in-python

async def get(url, session):
    try:
        async with session.get(url=url) as response:
            resp = await response.read()
            print("Successfully got url {} with resp of length {}.".format(url, len(resp)))
    except Exception as e:
        print("Unable to get url {} due to {}.".format(url, e.__class__))


async def main(urls):
    async with aiohttp.ClientSession() as session:
        ret = await asyncio.gather(*(get(url, session) for url in urls))
    print("Finalized all. Return is a list of len {} outputs.".format(len(ret)))


urls = websites.split("\n")
start = time.time()
asyncio.run(main(urls))
end = time.time()
@gmaze gmaze added performance internals Internal machinery labels Jun 14, 2024
@gmaze
Copy link
Member Author

gmaze commented Jun 14, 2024

One small test:

from argopy import ArgoIndex
import fsspec
import xarray as xr
from argopy.stores import httpstore

idx = ArgoIndex(index_file='bgc-s').load().search_wmo_cyc(6903091, np.arange(1,45))
urls = [idx.host + "/dac/" + str(f) for f in idx.search['file']]

Method 1:

%%time
fs = fsspec.filesystem("http")
out = fs.cat(urls)  # fetches data concurrently
results = []
for url in out:
    results.append(xr.open_dataset(out[url]))
>>> CPU times: user 1.2 s, sys: 240 ms, total: 1.44 s
>>> Wall time: 6.95 s

Method 2:

%%time
results = httpstore().open_mfdataset(urls, concat=False)
>>> CPU times: user 1.52 s, sys: 255 ms, total: 1.78 s
>>> Wall time: 5.3 s

@gmaze
Copy link
Member Author

gmaze commented Jun 14, 2024

what's taking time is the creation of xarray dataset, not data download,

so may be this is not where to search for performance improvement

Copy link

This issue was marked as staled automatically because it has not seen any activity in 90 days

@github-actions github-actions bot added the stale No activity over the last 90 days label Sep 14, 2024
@gmaze gmaze linked a pull request Sep 18, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
internals Internal machinery performance stale No activity over the last 90 days
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant