Skip to content

Conversation

wwj6591812
Copy link
Contributor

@wwj6591812 wwj6591812 commented Sep 24, 2025

Purpose

In our company, we have a business requirement to compute the primary keys of new (+I) and deleted (-D) records between today's and yesterday's partitions of an ODPS (Alibaba Cloud MaxCompute) table using Paimon.

We have conducted a POC that a daily Flink batch job reads a partition from the ODPS table and writes it to a Paimon primary key table. This job is configured with the following parameters:
'full-compaction.delta-commits'='1', 'changelog-producer'='full-compaction', 'tag.automatic-creation' = 'batch', 'tag.batch.customized-name'='ds=20250922'
This gives us two snapshots and one tag daily.

We found that we can read the changelog by providing snapshot IDs, like in the SQL below, but we can't do the same using tags.
SELECT rowkind, item_id, sku_id, ds FROM alake.omega_alake.kk_invalid_sku_prediction_v4$audit_log /*+ OPTIONS('scan.parallelism'='128', 'incremental-between-scan-mode'='changelog', 'incremental-between'='2,4')*/ where rowkind = '+U' OR rowkind = '-U' limit 100;

So, this PR adds support for reading the incremental changelog and delta between two specified tags.

Linked issue: close #xxx

Tests

API and Format

Documentation

@wwj6591812 wwj6591812 force-pushed the support_read_tag_changelog_0924 branch from 6fa4eef to 8763494 Compare September 24, 2025 09:56
@wwj6591812 wwj6591812 changed the title [Core] Support read increment tag changelog. [WIP][Core] Support read increment tag changelog. Sep 25, 2025
@wwj6591812 wwj6591812 force-pushed the support_read_tag_changelog_0924 branch 2 times, most recently from 14b4f23 to ca25cdd Compare September 25, 2025 11:23
@wwj6591812 wwj6591812 changed the title [WIP][Core] Support read increment tag changelog. [Core] Support read increment tag changelog. Sep 25, 2025
@wwj6591812 wwj6591812 changed the title [Core] Support read increment tag changelog. [Core] Support read increment changelog and delta between two tag. Sep 29, 2025
CoreOptions.IncrementalBetweenScanMode scanMode =
options.incrementalBetweenScanMode();
if (scanMode == CHANGELOG || scanMode == DELTA) {
return IncrementalDeltaStartingScanner.betweenSnapshotIds(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the snapshot corresponding to the tag does not exist?

Copy link
Contributor Author

@wwj6591812 wwj6591812 Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception will be throw in IncrementalDeltaStartingScanner#betweenSnapshotIds, In this case, we don't support read delta / changelog between tags.

@wwj6591812 wwj6591812 force-pushed the support_read_tag_changelog_0924 branch from ca25cdd to 63e69fd Compare October 10, 2025 08:07
@wwj6591812 wwj6591812 force-pushed the support_read_tag_changelog_0924 branch from 63e69fd to 5f901eb Compare October 10, 2025 08:28
@wwj6591812
Copy link
Contributor Author

@JingsongLi
Hi, please help review this PR.
The failed UT / IT was not caused by my modifications.
Thx.

@JingsongLi
Copy link
Contributor

+1

@JingsongLi JingsongLi merged commit 537c625 into apache:master Oct 13, 2025
23 of 25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants