-
Notifications
You must be signed in to change notification settings - Fork 577
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(iceberg): support eq delete merge on read with hash join #19126
base: main
Are you sure you want to change the base?
Conversation
|
GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
---|---|---|---|---|---|
9425213 | Triggered | Generic Password | 0ec5b56 | ci/scripts/e2e-source-test.sh | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secret safely. Learn here the best practices.
- Revoke and rotate this secret.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
Is this PR ready to review or just a draft? |
ready to review |
What about adding some description? Thanks. Also, what does "mor" mean in the title? Is it "more"? |
let delete_column_names = std::thread::spawn(move || { | ||
FRONTEND_RUNTIME.block_on(s.get_all_delete_column_names()) | ||
}) | ||
.join() | ||
.unwrap()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use tokio::task::block_in_place
to wrap it. See
let schema = tokio::task::block_in_place(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_all_delete_column_names()
internally calls iceberg plan_files()
, so there will be at least 2 calls when we execute an iceberg query. I think we should consider moving the plan_files form IcebergSplitEnumerator
to here after supporting the position delete hash join in the future.
src/frontend/src/optimizer/plan_rewriter/iceberg_source_rewriter.rs
Outdated
Show resolved
Hide resolved
@@ -266,7 +344,6 @@ impl IcebergSplitEnumerator { | |||
data_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); | |||
} | |||
iceberg::spec::DataContentType::EqualityDeletes => { | |||
task.project_field_ids = task.equality_ids.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove these field ids?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because after this pr, we can make sure that the filed id from the select() step is the final id, so doesn't need to be changed here
impl LogicalIcebergScan { | ||
pub fn new(logical_source: &LogicalSource) -> Self { | ||
pub fn new(logical_source: &LogicalSource, iceberg_scan_type: IcebergScanType) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my mind, different iceberg scan types should have different schemas. For example, equality delete should contain the exact delete columns that appear in the files instead of all columns of the iceberg table. Maybe we should change the core
to have a generic::IcebergScan
instead of generic::Source
@@ -247,7 +245,6 @@ macro_rules! for_all_rules { | |||
, { BatchPushLimitToScanRule } | |||
, { PullUpCorrelatedPredicateAggRule } | |||
, { SourceToKafkaScanRule } | |||
, { SourceToIcebergScanRule } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the logic of this rule is written in IcebergMergeOnReadRewriter
@@ -334,149 +367,3 @@ impl PositionDeleteFilter { | |||
self.position_delete_file_path_pos_map.remove(file_path); | |||
} | |||
} | |||
|
|||
struct EqualityDeleteFilter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the following logics shifted elsewhere? Or are they invalidated for some reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this pr, this logic is no longer valid and we have implemented the same functionality using hash join exec
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
In this pr,#18448. We support iceberg merge on read.
But he still has some problems, such as when there is too much delete data, we may oom, and this pr, we use hashjoin to implement the merge on read of eq delete
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.