-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simpler Dask array construction, w/ optional batching #462
base: main
Are you sure you want to change the base?
Conversation
The simpler task graph seems to run faster
CodSpeed Performance ReportMerging #462 will improve performances by 26.63%Comparing Summary
Benchmarks breakdown
|
This was the idea yes, although as time has progressed I don't think it's a good idea. In theory we could chunk down to the individual compressed tiled in the FITS file, but that would make the dask graph insanely large with insanely small chunk sizes so I don't think we are likely to do it. |
@svank If we agreed to get rid of the sub-file loading chunksize (I think this would be dropping doing anything with chunksize completely) would you be willing to get this PR mergeable? I like the idea of being able to batch chunks, did you try out that functionality with the distributed scheduler? I wonder if it would make more difference there? |
# Specifies that each chunk occupies a space of 1 pixel in the first dimension, and all the pixels in the others | ||
chunks = ((1,) * loader_array.size,) + tuple((s,) for s in file_shape) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is only true for some arrays? i.e. VISP and not VBI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at this stage, the array is just (n_chunks, *chunk_size)
, and a few lines down the actual data cube shape is imposed. I think that approach seemed easier that figuring out how to assemble the loaders into the actual data cube shape from the beginning.
Yeah, if we drop the idea of individual files being split into chunks, I think this PR becomes easy to finish. I think the only remaining large task is figuring how to expose the batch size to the user, if that's something we want exposed. I think it was with the distributed scheduler that I was finding that the batching didn't actually help at all, maybe because of the extra work of concatenating the multiple files into one larger array. I don't know if a benchmark that does more substantial processing on the data after loading would start to see enough benefit to offset that cost. |
I'm unsure if this PR is worth it, but it offers some extra analysis on the optimization/chunk size front.
This PR speeds up computation with the distributed scheduler a bit by changing the code for building the Dask array to simplify the task graph (or so I believe). Instead of several iterations of
da.stack
calls, it directly generates Dask tasks for each chunk of the final array, following the example here. This shaves a bit of time off when using the distributed scheduler (36 to 32 seconds), but the "processes" scheduler is still faster and is almost unaffected by this change (from a consistent 20.5 to 19.7 seconds). I'm using the same test case as in #455 .This PR currently needs more thought on handling non-default chunksizes (the argument to
stack_loader_array
, which it looks like can be set in the ASDF file?). I guess I'm not sure what the intended use case is for that. Inmain
it looks like this option can only sub-divide files into smaller chunks. Are there DKIST files out there that are too big to be a single chunk? Or is the idea to subdivide files when only a portion of each file has to be loaded? As this PR is now, each file gets fully loaded, and then split up, which might be less efficient. But before this PR, I'm not sure if that was also the case, or ifhdu.data[slice]
loads just the sliced portion of the file. If the latter, changing this PR to make use of that would, I think, require manually creating tasks for each chunk of each file, which seems complicated.In #455, there was discussion of rechunking to larger chunk sizes. At least for that test case, using
.rechunk((1, 100, -1, -1))
actually slows down the total computation (I'm seeing it go from 36 to 45 seconds), I assume from the overhead of gathering and combining arrays. This PR's design makes it easy to try larger chunk sizes more efficiently, as one "loading" task can load multiple files and concatenate them immediately, rather than adding a separate layer to the task graph. This is implemented in this PR, in theDocstring
commit and before. A "batch size" can be set, and each Dask task loads that many files and concatenates them. (The batch size has to be adjusted in the code---I didn't know how to expose it as an option forload_dataset
.) I found that with a batch size of 100 (producing chunks of a few hundred MB, Dask's recommended size), I was actually getting a very slightly longer runtime (going from 24 to 27 seconds), but it's less extreme than using.rechunk()
. My guess is that thenp.concatenate
call copying all the data arrays offsets any efficiency gains from the larger chunks. (Maybe if I were doing a lot more with the data it would be worth it.) I didn't see a way to have Astropy FITS load a file directly into a view of a larger output array, unfortunately.Here's the task stream (top is before, bottom is this PR):
![image](https://private-user-images.githubusercontent.com/23462789/382972248-30085b54-e687-4b98-8d61-ee8847482a7f.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzk1MjcwMjAsIm5iZiI6MTczOTUyNjcyMCwicGF0aCI6Ii8yMzQ2Mjc4OS8zODI5NzIyNDgtMzAwODViNTQtZTY4Ny00Yjk4LThkNjEtZWU4ODQ3NDgyYTdmLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMTQlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjE0VDA5NTIwMFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTU2YmE3MWJhYWM2NmU1ZDA5M2I5Yzk4ODkzZGYyZTM2MzU4NmE2MDhiZjZhYjFjNGQ5ODQ4ZWU3N2UzYTNiZGMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.BDfJwry5kBhoFBqfGi8oUieT_nlVB7EN14vkxJGx_HQ)
And here's a portion of the task graph for each (left is before, right is this PR). This is from
![image](https://private-user-images.githubusercontent.com/23462789/382972859-af017922-72bd-4772-b92a-78de6a9d9f0d.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzk1MjcwMjAsIm5iZiI6MTczOTUyNjcyMCwicGF0aCI6Ii8yMzQ2Mjc4OS8zODI5NzI4NTktYWYwMTc5MjItNzJiZC00NzcyLWI5MmEtNzhkZTZhOWQ5ZjBkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMTQlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjE0VDA5NTIwMFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTczYWEwZDdjODE3OGI1Y2EyNGI2ZGViNDVmZjgzNjU1N2Q1NmZhMDA3ZDMzZjliMDIzMmIxZmFlNmQ5YmNlNDgmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.Ghvh82yFl_CFOn6xBvYYsu91YSi-5WePHxbfR-FOBvY)
dataset.data.visualize()
, and I made an asdf file with only a handful of FITS files so the images wouldn't be unrenderably large.Right now my thinking is "just use the
processes
scheduler", where the gains from this PR are small enough that I'm not sure it's worth more work to handle thechunksize
argument better.