From a521525d4c115012c6d8fb78c02bdc2c9cbea18b Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Thu, 16 Nov 2023 00:01:54 +0800 Subject: [PATCH] fixed --- .../rabbitmq/RabbitmqActionITCaseBase.java | 1 + ...> RabbitmqCanalSyncTableActionITCase.java} | 4 +- ...RabbitmqDebeziumSyncTableActionITCase.java | 144 ++++++++++++++++++ .../cdc/rabbitmq/RabbitmqSchemaITCase.java | 73 +++++++++ 4 files changed, 219 insertions(+), 3 deletions(-) rename paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/{RabbitmqSyncTableActionITCase.java => RabbitmqCanalSyncTableActionITCase.java} (98%) create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqDebeziumSyncTableActionITCase.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSchemaITCase.java diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionITCaseBase.java index a1773e1e816a4..98d2005ad450e 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionITCaseBase.java @@ -55,6 +55,7 @@ /** Base test class for Rabbitmq synchronization. */ public class RabbitmqActionITCaseBase extends CdcActionITCaseBase { private static final Logger LOG = LoggerFactory.getLogger(RabbitmqActionITCaseBase.class); + protected final String exchange = "schema_exchange"; private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG); private static final SimpleStringSchema SCHEMA = new SimpleStringSchema(); public static final String RABBITMQ = "rabbitmq:3.9.8-management-alpine"; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqCanalSyncTableActionITCase.java similarity index 98% rename from paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSyncTableActionITCase.java rename to paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqCanalSyncTableActionITCase.java index de1a6b452873c..8bc44ef3b781d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqCanalSyncTableActionITCase.java @@ -37,9 +37,7 @@ import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.VALUE_FORMAT; /** IT cases for {@link RabbitmqSyncTableAction}. */ -public class RabbitmqSyncTableActionITCase extends RabbitmqActionITCaseBase { - final String exchange = "schema_exchange"; - +public class RabbitmqCanalSyncTableActionITCase extends RabbitmqActionITCaseBase { @Test @Timeout(300) public void testSchemaEvolution() throws Exception { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqDebeziumSyncTableActionITCase.java new file mode 100644 index 0000000000000..34c760117bae4 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqDebeziumSyncTableActionITCase.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.rabbitmq; + +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import com.rabbitmq.client.BuiltinExchangeType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE_TYPE; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.QUEUE_NAME; + +/** IT cases for {@link RabbitmqDebeziumSyncTableActionITCase}. */ +public class RabbitmqDebeziumSyncTableActionITCase extends RabbitmqActionITCaseBase { + + @Test + @Timeout(120) + public void testSchemaEvolution() throws Exception { + runSingleTableSchemaEvolution("schemaevolution"); + } + + private void runSingleTableSchemaEvolution(String sourceDir) throws Exception { + final String queue = "schema_evolution"; + createQueue(exchange, queue); + // ---------- Write the debezium json into rabbitmq ------------------- + sendMessages( + exchange, + queue, + getMessages( + String.format("kafka/debezium/table/%s/debezium-data-1.txt", sourceDir))); + + Map rabbitmqConfig = getBasicRabbitmqConfig(); + rabbitmqConfig.put(QUEUE_NAME.key(), queue); + rabbitmqConfig.put(EXCHANGE.key(), exchange); + rabbitmqConfig.put(EXCHANGE_TYPE.key(), BuiltinExchangeType.FANOUT.name()); + rabbitmqConfig.put(RabbitmqActionUtils.VALUE_FORMAT.key(), "debezium-json"); + + RabbitmqSyncTableAction action = + syncTableActionBuilder(rabbitmqConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + testSchemaEvolutionImpl(queue, sourceDir); + } + + private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception { + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]"); + waitForResult(expected, table, rowType, primaryKeys); + + // ---------- Write the debezium json into rabbitmq ------------------- + sendMessages( + exchange, + topic, + getMessages( + String.format("kafka/debezium/table/%s/debezium-data-2.txt", sourceDir))); + + rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "age"}); + expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", + "+I[102, car battery, 12V car battery, 8.1, NULL]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]"); + waitForResult(expected, table, rowType, primaryKeys); + + sendMessages( + exchange, + topic, + getMessages( + String.format("kafka/debezium/table/%s/debezium-data-3.txt", sourceDir))); + + rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "age", "address"}); + expected = + Arrays.asList( + "+I[102, car battery, 12V car battery, 8.1, NULL, NULL]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18, NULL]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]", + "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]", + "+I[107, rocks, box of assorted rocks, 5.3, NULL, NULL]"); + waitForResult(expected, table, rowType, primaryKeys); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSchemaITCase.java new file mode 100644 index 0000000000000..baab11fe00603 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSchemaITCase.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.rabbitmq; + +import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; +import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; + +import com.rabbitmq.client.BuiltinExchangeType; +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE_TYPE; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.QUEUE_NAME; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.VALUE_FORMAT; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.getDataFormat; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for building schema from Rabbitmq. */ +public class RabbitmqSchemaITCase extends RabbitmqActionITCaseBase { + + @Test + @Timeout(120) + public void testPulsarSchema() throws Exception { + String queue = "test_rabbitmq_schema"; + createQueue(exchange, queue); + + // ---------- Write the Canal json into rabbitmq ------------------- + List messages = getMessages("kafka/canal/table/schemaevolution/canal-data-1.txt"); + sendMessages(exchange, queue, messages); + + Configuration rabbitmqConfig = Configuration.fromMap(getBasicRabbitmqConfig()); + rabbitmqConfig.set(QUEUE_NAME, queue); + rabbitmqConfig.set(EXCHANGE, exchange); + rabbitmqConfig.set(EXCHANGE_TYPE, BuiltinExchangeType.FANOUT.name()); + rabbitmqConfig.set(VALUE_FORMAT, "canal-json"); + + Schema rabbitmqSchema = + MessageQueueSchemaUtils.getSchema( + RabbitmqActionUtils.createRabbitmqConsumer(rabbitmqConfig, queue), + queue, + getDataFormat(rabbitmqConfig), + TypeMapping.defaultMapping()); + List fields = new ArrayList<>(); + fields.add(new DataField(0, "pt", DataTypes.INT())); + fields.add(new DataField(1, "_id", DataTypes.INT().notNull())); + fields.add(new DataField(2, "v1", DataTypes.VARCHAR(10))); + assertThat(rabbitmqSchema.fields()).isEqualTo(fields); + } +}