-
Notifications
You must be signed in to change notification settings - Fork 257
feat: CometNativeWriteExec support with native scan as a child #2839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2839 +/- ##
============================================
+ Coverage 56.12% 59.17% +3.05%
- Complexity 976 1490 +514
============================================
Files 119 167 +48
Lines 11743 15274 +3531
Branches 2251 2524 +273
============================================
+ Hits 6591 9039 +2448
- Misses 4012 4945 +933
- Partials 1140 1290 +150 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
comphead
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering if there could be a test for this? 🤔
Added. |
| // Perform native write | ||
| df.write.parquet(outputPath) | ||
|
|
||
| // Wait for listener to be called with timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use sparkContext.listenerBus.waitUntilEmpty() or org.scalatest.concurrent.Eventually#eventually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll update the tests soon to use this approach
wForget
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mbutrovich , LGTM
| val outputDir = new File(outputPath) | ||
| val partFiles = outputDir.listFiles().filter(_.getName.startsWith("part-")) | ||
| // With 1000 rows and default parallelism, we should get multiple partitions | ||
| assert(partFiles.length > 1, "Expected multiple part files to be created") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check exact number of partitions? example: if you write a df hash partiotined by 50 we should have 50 files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just moved that logic. Since this is a pretty early proof-of-concept feature from @andygrove I'm not too inclined to change test behavior in this PR.
andygrove
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mbutrovich @wForget
Which issue does this PR close?
Closes #.
Rationale for this change
Support native scan (tested with
COMET_PARQUET_SCAN_IMPL=native_datafusion) as a child. Previously it never converted the native scan child.What changes are included in this PR?
One change to reset the
firstNativeOpflag and a lot of documentation to explain why.How are these changes tested?
Existing test but with
COMET_PARQUET_SCAN_IMPL=native_datafusion.