Skip to content

Commit

Permalink
[FLINK-36093][transform] Fix preTransformoperator wrongly filters col…
Browse files Browse the repository at this point in the history
…umns belong to different transforms

This closes #3572
  • Loading branch information
MOBIN-F authored Nov 14, 2024
1 parent 13248fa commit 5db16ba
Show file tree
Hide file tree
Showing 5 changed files with 387 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,105 @@ void testMultipleDispatchTransform(ValuesDataSink.SinkApi sinkApi) throws Except
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, OLD], after=[], op=DELETE, meta=()}"));
}

@ParameterizedTest
@EnumSource
void testMultipleTransformWithDiffRefColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
runGenericTransformTest(
sinkApi,
Arrays.asList(
new TransformDef(
"default_namespace.default_schema.\\.*",
"id,age,'Juvenile' AS roleName",
"age < 18",
null,
null,
null,
null),
new TransformDef(
"default_namespace.default_schema.\\.*",
"id,age,name AS roleName",
"age >= 18",
null,
null,
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`age` INT,`roleName` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, 18, Alice], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, 20, Bob], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 20, Bob], after=[2, 30, Bob], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`age` TINYINT,`roleName` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, 15, Juvenile], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, 25, Derrida], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 25, Derrida], after=[], op=DELETE, meta=()}"));
}

@ParameterizedTest
@EnumSource
void testMultiTransformWithAsterisk(ValuesDataSink.SinkApi sinkApi) throws Exception {
runGenericTransformTest(
sinkApi,
Arrays.asList(
new TransformDef(
"default_namespace.default_schema.mytable2",
"*,'Juvenile' AS roleName",
"age < 18",
null,
null,
null,
null),
new TransformDef(
"default_namespace.default_schema.mytable2",
"id,name,age,description,name AS roleName",
"age >= 18",
null,
null,
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`roleName` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Juvenile], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Derrida], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Derrida], after=[], op=DELETE, meta=()}"));
}

@ParameterizedTest
@EnumSource
void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi) throws Exception {
runGenericTransformTest(
sinkApi,
Arrays.asList(
new TransformDef(
"default_namespace.default_schema.mytable2",
null,
"age < 18",
null,
null,
null,
null),
new TransformDef(
"default_namespace.default_schema.mytable2",
"id,UPPER(name) AS name,age,description",
"age >= 18",
null,
null,
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, DERRIDA, 25, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
}

/** This tests if transform generates metadata info correctly. */
@ParameterizedTest
@EnumSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,82 @@ public void testMultipleHittingTable() throws Exception {
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, 3011, 11], after=[], op=DELETE, meta=()}");
}

@Test
public void testMultipleTransformWithDiffRefColumn() throws Exception {
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.TABLEALPHA\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "sink:\n"
+ " type: values\n"
+ "transform:\n"
+ " - source-table: %s.TABLEALPHA\n"
+ " projection: ID, VERSION, PRICEALPHA, AGEALPHA, 'Juvenile' AS ROLENAME\n"
+ " filter: AGEALPHA < 18\n"
+ " - source-table: %s.TABLEALPHA\n"
+ " projection: ID, VERSION, PRICEALPHA, AGEALPHA, NAMEALPHA AS ROLENAME\n"
+ " filter: AGEALPHA >= 18\n"
+ "pipeline:\n"
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");

waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`ROLENAME` STRING}, primaryKeys=ID, options=()}",
transformTestDatabase.getDatabaseName()),
60000L);

validateEvents(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Juvenile], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave], op=INSERT, meta=()}");

LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
transformTestDatabase.getDatabaseName());
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;");
stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79, 25, 'IINA');");
stat.execute("DELETE FROM TABLEALPHA WHERE id=1011;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}

validateEvents(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob], after=[1009, 100, 0, 18, Bob], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 25, IINA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1011, 11, 59, 20, Dave], after=[], op=DELETE, meta=()}");
}

@Test
public void testTransformWithCast() throws Exception {
String pipelineJob =
Expand Down
Loading

0 comments on commit 5db16ba

Please sign in to comment.