Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35067][cdc-connector][postgres] Adding metadata 'row_kind' for Postgres CDC Connector. #3716

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
package org.apache.flink.cdc.connectors.postgres.table;

import org.apache.flink.cdc.debezium.table.MetadataConverter;
import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

/** Defines the supported metadata columns for {@link PostgreSQLTableSource}. */
public enum PostgreSQLReadableMetadata {
Expand Down Expand Up @@ -95,6 +97,28 @@ public Object read(SourceRecord record) {
return TimestampData.fromEpochMillis(
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
}),

/**
* It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE
* message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message
*/
ROW_KIND(
"row_kind",
DataTypes.STRING().notNull(),
new RowDataMetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object read(RowData rowData) {
return StringData.fromString(rowData.getRowKind().shortString());
}

@Override
public Object read(SourceRecord record) {
throw new UnsupportedOperationException(
"Please call read(RowData rowData) method instead.");
}
});

private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@

package org.apache.flink.cdc.connectors.postgres.table;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -29,28 +44,12 @@
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;

import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;

/** Integration tests for PostgreSQL Table source. */
@RunWith(Parameterized.class)
public class PostgreSQLConnectorITCase extends PostgresTestBase {
Expand Down Expand Up @@ -469,6 +468,7 @@ public void testMetadataColumns() throws Throwable {
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " schema_name STRING METADATA VIRTUAL,"
+ " table_name STRING METADATA VIRTUAL,"
+ " row_kind STRING METADATA FROM 'row_kind' VIRTUAL,"
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
Expand Down Expand Up @@ -501,6 +501,7 @@ public void testMetadataColumns() throws Throwable {
+ " database_name STRING,"
+ " schema_name STRING,"
+ " table_name STRING,"
+ " row_kind STRING,"
+ " id INT,"
+ " name STRING,"
+ " description STRING,"
Expand Down Expand Up @@ -546,52 +547,52 @@ public void testMetadataColumns() throws Throwable {
Arrays.asList(
"+I("
+ databaseName
+ ",inventory,products,101,scooter,Small 2-wheel scooter,3.140)",
+ ",inventory,products,+I,101,scooter,Small 2-wheel scooter,3.140)",
"+I("
+ databaseName
+ ",inventory,products,102,car battery,12V car battery,8.100)",
+ ",inventory,products,+I,102,car battery,12V car battery,8.100)",
"+I("
+ databaseName
+ ",inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
+ ",inventory,products,+I,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
"+I("
+ databaseName
+ ",inventory,products,104,hammer,12oz carpenter's hammer,0.750)",
+ ",inventory,products,+I,104,hammer,12oz carpenter's hammer,0.750)",
"+I("
+ databaseName
+ ",inventory,products,105,hammer,14oz carpenter's hammer,0.875)",
+ ",inventory,products,+I,105,hammer,14oz carpenter's hammer,0.875)",
"+I("
+ databaseName
+ ",inventory,products,106,hammer,16oz carpenter's hammer,1.000)",
+ ",inventory,products,+I,106,hammer,16oz carpenter's hammer,1.000)",
"+I("
+ databaseName
+ ",inventory,products,107,rocks,box of assorted rocks,5.300)",
+ ",inventory,products,+I,107,rocks,box of assorted rocks,5.300)",
"+I("
+ databaseName
+ ",inventory,products,108,jacket,water resistent black wind breaker,0.100)",
+ ",inventory,products,+I,108,jacket,water resistent black wind breaker,0.100)",
"+I("
+ databaseName
+ ",inventory,products,109,spare tire,24 inch spare tire,22.200)",
+ ",inventory,products,+I,109,spare tire,24 inch spare tire,22.200)",
"+I("
+ databaseName
+ ",inventory,products,110,jacket,water resistent white wind breaker,0.200)",
+ ",inventory,products,+I,110,jacket,water resistent white wind breaker,0.200)",
"+I("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.180)",
+ ",inventory,products,+I,111,scooter,Big 2-wheel scooter ,5.180)",
"+U("
+ databaseName
+ ",inventory,products,106,hammer,18oz carpenter hammer,1.000)",
+ ",inventory,products,+U,106,hammer,18oz carpenter hammer,1.000)",
"+U("
+ databaseName
+ ",inventory,products,107,rocks,box of assorted rocks,5.100)",
+ ",inventory,products,+U,107,rocks,box of assorted rocks,5.100)",
"+U("
+ databaseName
+ ",inventory,products,110,jacket,new water resistent white wind breaker,0.500)",
+ ",inventory,products,+U,110,jacket,new water resistent white wind breaker,0.500)",
"+U("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)",
+ ",inventory,products,+U,111,scooter,Big 2-wheel scooter ,5.170)",
"-D("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)");
+ ",inventory,products,-D,111,scooter,Big 2-wheel scooter ,5.170)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
Collections.sort(actual);
Collections.sort(expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECTION_POOL_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_MAX_RETRIES;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -100,6 +100,7 @@ public class PostgreSQLTableFactoryTest {
Column.physical("name", DataTypes.STRING()),
Column.physical("count", DataTypes.DECIMAL(38, 18)),
Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
Column.metadata("row_kind", DataTypes.STRING(), "row_kind", true),
Column.metadata(
"database_name", DataTypes.STRING(), "database_name", true),
Column.metadata("schema_name", DataTypes.STRING(), "schema_name", true),
Expand Down Expand Up @@ -211,7 +212,7 @@ public void testMetadataColumns() {
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
PostgreSQLTableSource postgreSQLTableSource = (PostgreSQLTableSource) actualSource;
postgreSQLTableSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name", "schema_name", "table_name"),
Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = postgreSQLTableSource.copy();
PostgreSQLTableSource expectedSource =
Expand Down Expand Up @@ -246,7 +247,7 @@ public void testMetadataColumns() {
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "schema_name", "table_name");
Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name");

assertEquals(expectedSource, actualSource);

Expand Down
Loading