From 4667774a6c0e1b8f90e2f00b66731ff658bd204d Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Wed, 15 Nov 2023 00:30:51 +0800 Subject: [PATCH] add e2e test --- paimon-flink/paimon-flink-cdc/pom.xml | 7 + .../action/cdc/rabbitmq/QueueingConsumer.java | 115 ---------- .../cdc/rabbitmq/RabbitmqActionUtils.java | 98 +++++--- .../action/cdc/rabbitmq/RabbitmqSource.java | 57 +++++ .../rabbitmq/RabbitmqActionITCaseBase.java | 213 ++++++++++++++++++ .../RabbitmqCanalSyncTableActionITCase.java | 195 ++++++++++++++++ ...RabbitmqDebeziumSyncTableActionITCase.java | 144 ++++++++++++ .../cdc/rabbitmq/RabbitmqSchemaITCase.java | 73 ++++++ 8 files changed, 760 insertions(+), 142 deletions(-) delete mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/QueueingConsumer.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSource.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionITCaseBase.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqCanalSyncTableActionITCase.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqDebeziumSyncTableActionITCase.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSchemaITCase.java diff --git a/paimon-flink/paimon-flink-cdc/pom.xml b/paimon-flink/paimon-flink-cdc/pom.xml index 4d4d5dc74ab5..3afe2b82c09b 100644 --- a/paimon-flink/paimon-flink-cdc/pom.xml +++ b/paimon-flink/paimon-flink-cdc/pom.xml @@ -290,6 +290,13 @@ under the License. test + + org.testcontainers + rabbitmq + ${testcontainers.version} + test + + org.testcontainers mongodb diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/QueueingConsumer.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/QueueingConsumer.java deleted file mode 100644 index 868e43cf7d39..000000000000 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/QueueingConsumer.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.action.cdc.rabbitmq; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConsumerCancelledException; -import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.client.Delivery; -import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.ShutdownSignalException; -import com.rabbitmq.utility.Utility; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -public class QueueingConsumer extends DefaultConsumer { - - private final Connection connection; - private final Channel channel; - private final BlockingQueue queue; - private volatile ShutdownSignalException shutdown; - private volatile ConsumerCancelledException cancelled; - private static final Delivery POISON = new Delivery(null, null, null); - - public QueueingConsumer(Connection connection, Channel channel) { - this(connection, channel, Integer.MAX_VALUE); - } - - public QueueingConsumer(Connection connection, Channel channel, int capacity) { - super(channel); - this.connection = connection; - this.channel = channel; - this.queue = new LinkedBlockingQueue(capacity); - } - - private void checkShutdown() { - if (this.shutdown != null) { - throw Utility.fixStackTrace(this.shutdown); - } - } - - private Delivery handle(Delivery delivery) { - if (delivery == POISON - || delivery == null && (this.shutdown != null || this.cancelled != null)) { - if (delivery == POISON) { - this.queue.add(POISON); - if (this.shutdown == null && this.cancelled == null) { - throw new IllegalStateException( - "POISON in queue, but null shutdown and null cancelled. This should never happen, please report as a BUG"); - } - } - - if (null != this.shutdown) { - throw Utility.fixStackTrace(this.shutdown); - } - - if (null != this.cancelled) { - throw Utility.fixStackTrace(this.cancelled); - } - } - - return delivery; - } - - public Delivery nextDelivery(long timeout, TimeUnit unit) - throws InterruptedException, ShutdownSignalException, ConsumerCancelledException { - return this.handle(this.queue.poll(timeout, unit)); - } - - public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { - this.shutdown = sig; - this.queue.add(POISON); - } - - public void handleCancel(String consumerTag) { - this.cancelled = new ConsumerCancelledException(); - this.queue.add(POISON); - } - - public void handleDelivery( - String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { - this.checkShutdown(); - this.queue.add(new Delivery(envelope, properties, body)); - } - - public void close() throws IOException, TimeoutException { - if (channel != null) { - channel.close(); - } - if (connection != null) { - connection.close(); - } - } -} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionUtils.java index 9a0bbdfa9416..a0abe8a50c5f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionUtils.java @@ -20,22 +20,22 @@ import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; import org.apache.paimon.flink.action.cdc.format.DataFormat; +import org.apache.paimon.utils.StringUtils; +import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; -import com.rabbitmq.client.Delivery; +import com.rabbitmq.client.GetResponse; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -48,6 +48,14 @@ public class RabbitmqActionUtils { .stringType() .noDefaultValue() .withDescription("Defines the format identifier for encoding value data."); + static final ConfigOption EXCHANGE = + ConfigOptions.key("exchange").stringType().noDefaultValue().withDescription("."); + + static final ConfigOption EXCHANGE_TYPE = + ConfigOptions.key("exchange.type").stringType().noDefaultValue().withDescription("."); + + static final ConfigOption ROUTING_KEY = + ConfigOptions.key("routing.key").stringType().noDefaultValue().withDescription("."); static final ConfigOption QUEUE_NAME = ConfigOptions.key("queue.name") @@ -148,11 +156,32 @@ public class RabbitmqActionUtils { .withDescription( "Retrieve the message delivery timeout used in the queueing consumer. If not specified explicitly, the default value of 30000 milliseconds will be returned."); - static RMQSource buildRabbitmqSource(Configuration rabbitmqConfig) { + static RabbitmqSource buildRabbitmqSource(Configuration rabbitmqConfig) { validateRabbitmqConfig(rabbitmqConfig); String queueName = rabbitmqConfig.get(QUEUE_NAME); + String exchange = null; + if (rabbitmqConfig.contains(EXCHANGE)) { + exchange = rabbitmqConfig.get(EXCHANGE); + } + + BuiltinExchangeType exchangeType = null; + if (rabbitmqConfig.contains(EXCHANGE_TYPE)) { + exchangeType = BuiltinExchangeType.valueOf(rabbitmqConfig.get(EXCHANGE_TYPE)); + } + + String routingKey = null; + if (rabbitmqConfig.contains(ROUTING_KEY)) { + routingKey = rabbitmqConfig.get(ROUTING_KEY); + } + RMQConnectionConfig connectionConfig = setOptionConfig(rabbitmqConfig); - return new RMQSource<>(connectionConfig, queueName, new SimpleStringSchema()); + return new RabbitmqSource( + connectionConfig, + exchange, + exchangeType, + routingKey, + queueName, + new SimpleStringSchema()); } private static RMQConnectionConfig setOptionConfig(Configuration rabbitmqConfig) { @@ -229,32 +258,44 @@ private static void validateRabbitmqConfig(Configuration rabbitmqConfig) { static MessageQueueSchemaUtils.ConsumerWrapper createRabbitmqConsumer( Configuration rabbitmqConfig, String queueName) { RMQConnectionConfig connectionConfig = setOptionConfig(rabbitmqConfig); + try { Connection connection = connectionConfig.getConnectionFactory().newConnection(); - Channel channel = setupChannel(connectionConfig, connection); + Channel channel = setupChannel(connection); if (channel == null) { throw new RuntimeException("None of RabbitMQ channels are available"); } - setupQueue(channel, queueName); - QueueingConsumer consumer = new QueueingConsumer(connection, channel); - channel.basicConsume(queueName, true, consumer); - return new RabbitmqConsumerWrapper(consumer); + setupQueue(channel, rabbitmqConfig, queueName); + return new RabbitmqConsumerWrapper(connection, channel); } catch (Exception e) { throw new RuntimeException(e); } } - private static void setupQueue(Channel channel, String queueName) throws IOException { + private static void setupQueue(Channel channel, Configuration rabbitmqConfig, String queueName) + throws IOException { + ; channel.queueDeclare(queueName, true, false, false, null); + String exchange = null; + if (rabbitmqConfig.contains(EXCHANGE)) { + exchange = rabbitmqConfig.get(EXCHANGE); + } + BuiltinExchangeType exchangeType = null; + if (rabbitmqConfig.contains(EXCHANGE_TYPE)) { + exchangeType = BuiltinExchangeType.valueOf(rabbitmqConfig.get(EXCHANGE_TYPE)); + } + String routingKey = ""; + if (rabbitmqConfig.contains(ROUTING_KEY)) { + routingKey = rabbitmqConfig.get(ROUTING_KEY); + } + if (!StringUtils.isBlank(exchange) && exchangeType != null) { + channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT); + channel.queueBind(queueName, exchange, routingKey); + } } - private static Channel setupChannel( - RMQConnectionConfig rmqConnectionConfig, Connection connection) throws Exception { - Channel chan = connection.createChannel(); - if (rmqConnectionConfig.getPrefetchCount().isPresent()) { - chan.basicQos(rmqConnectionConfig.getPrefetchCount().get(), true); - } - return chan; + private static Channel setupChannel(Connection connection) throws Exception { + return connection.createChannel(); } static DataFormat getDataFormat(Configuration rabbitmqConfig) { @@ -264,28 +305,31 @@ static DataFormat getDataFormat(Configuration rabbitmqConfig) { private static class RabbitmqConsumerWrapper implements MessageQueueSchemaUtils.ConsumerWrapper { - private final QueueingConsumer consumer; + private final Channel channel; + private final Connection connection; - RabbitmqConsumerWrapper(QueueingConsumer consumer) { - this.consumer = consumer; + RabbitmqConsumerWrapper(Connection connection, Channel channel) { + this.channel = channel; + this.connection = connection; } @Override - public List getRecords(String topic, int pollTimeOutMills) { + public List getRecords(String queue, int pollTimeOutMills) { try { - Delivery delivery = consumer.nextDelivery(pollTimeOutMills, TimeUnit.MILLISECONDS); - return delivery == null + GetResponse response = channel.basicGet(queue, false); + return response == null ? Collections.emptyList() : Collections.singletonList( - new String(delivery.getBody(), StandardCharsets.UTF_8)); - } catch (InterruptedException e) { + new String(response.getBody(), StandardCharsets.UTF_8)); + } catch (IOException e) { throw new RuntimeException(e); } } @Override public void close() throws IOException, TimeoutException { - consumer.close(); + channel.close(); + connection.close(); } } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSource.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSource.java new file mode 100644 index 000000000000..0dbdefdead9b --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSource.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.rabbitmq; + +import org.apache.paimon.utils.StringUtils; + +import com.rabbitmq.client.BuiltinExchangeType; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; + +import java.io.IOException; + +public class RabbitmqSource extends RMQSource { + private String exchange; + private BuiltinExchangeType exchangeType; + private String routingKey; + + public RabbitmqSource( + RMQConnectionConfig rmqConnectionConfig, + String exchange, + BuiltinExchangeType exchangeType, + String routingKey, + String queueName, + DeserializationSchema deserializationSchema) { + super(rmqConnectionConfig, queueName, deserializationSchema); + this.exchange = exchange; + this.exchangeType = exchangeType; + this.routingKey = routingKey; + } + + @Override + protected void setupQueue() throws IOException { + super.setupQueue(); + if (!StringUtils.isBlank(exchange) && exchangeType != null) { + channel.exchangeDeclare(exchange, exchangeType); + channel.queueBind( + queueName, exchange, StringUtils.isBlank(routingKey) ? "" : routingKey); + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionITCaseBase.java new file mode 100644 index 000000000000..98d2005ad450 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionITCaseBase.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.rabbitmq; + +import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.MessageProperties; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.junit.ClassRule; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Base test class for Rabbitmq synchronization. */ +public class RabbitmqActionITCaseBase extends CdcActionITCaseBase { + private static final Logger LOG = LoggerFactory.getLogger(RabbitmqActionITCaseBase.class); + protected final String exchange = "schema_exchange"; + private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG); + private static final SimpleStringSchema SCHEMA = new SimpleStringSchema(); + public static final String RABBITMQ = "rabbitmq:3.9.8-management-alpine"; + private static final int RABBITMQ_PORT = 5672; + private final ObjectMapper objectMapper = new ObjectMapper(); + + private List queues = new ArrayList<>(); + + @ClassRule + private static final RabbitMQContainer RMQ_CONTAINER = + new RabbitMQContainer(DockerImageName.parse(RABBITMQ)) + .withExposedPorts(RABBITMQ_PORT) + .withLogConsumer(LOG_CONSUMER); + + private Connection connection; + private Channel channel; + + @BeforeEach + public void setup() throws Exception { + RMQ_CONTAINER.start(); + connection = getRMQConnection(); + channel = connection.createChannel(); + } + + private static Connection getRMQConnection() throws IOException, TimeoutException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(RMQ_CONTAINER.getHost()); + factory.setVirtualHost("/"); + factory.setPort(RMQ_CONTAINER.getAmqpPort()); + factory.setUsername(RMQ_CONTAINER.getAdminUsername()); + factory.setPassword(RMQ_CONTAINER.getAdminPassword()); + return factory.newConnection(); + } + + @AfterEach + public void after() throws Exception { + super.after(); + // Delete topics for avoid reusing topics of pulsar cluster + deleteTopics(); + channel.close(); + connection.close(); + RMQ_CONTAINER.close(); + } + + protected Map getBasicRabbitmqConfig() { + Map config = new HashMap<>(); + config.put(RabbitmqActionUtils.RABBITMQ_USERNAME.key(), RMQ_CONTAINER.getAdminUsername()); + config.put(RabbitmqActionUtils.RABBITMQ_PASSWORD.key(), RMQ_CONTAINER.getAdminPassword()); + config.put(RabbitmqActionUtils.RABBITMQ_VIRTUAL_HOST.key(), "/"); + config.put(RabbitmqActionUtils.RABBITMQ_HOST.key(), RMQ_CONTAINER.getHost()); + config.put(RabbitmqActionUtils.RABBITMQ_PORT.key(), RMQ_CONTAINER.getAmqpPort().toString()); + return config; + } + + protected void createQueue(String exchange, String queueName) { + try { + channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT); + channel.queueDeclare(queueName, true, false, false, null); + channel.queueBind(queueName, exchange, ""); + queues.add(queueName); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void deleteTopics() throws Exception { + for (String queue : queues) { + channel.queueDelete(queue); + } + queues = new ArrayList<>(); + } + + protected List getMessages(String resource) throws IOException { + URL url = RabbitmqActionITCaseBase.class.getClassLoader().getResource(resource); + assertThat(url).isNotNull(); + java.nio.file.Path path = new File(url.getFile()).toPath(); + List lines = Files.readAllLines(path); + List messages = new ArrayList<>(); + for (String line : lines) { + try { + objectMapper.readTree(line); + if (!StringUtils.isEmpty(line)) { + messages.add(line); + } + } catch (Exception e) { + // ignore + } + } + + return messages; + } + + /** + * Send a list messages to Rabbitmq. + * + * @param queueName The name of the queue. + * @param messages The records need to be sent. + */ + public static void sendMessages(String exchange, String queueName, List messages) + throws IOException, TimeoutException { + try (Connection rmqConnection = getRMQConnection(); + Channel channel = rmqConnection.createChannel()) { + for (String msg : messages) { + channel.basicPublish( + exchange, + queueName, + MessageProperties.PERSISTENT_TEXT_PLAIN, + SCHEMA.serialize(msg)); + } + } + } + + protected RabbitmqSyncTableActionBuilder syncTableActionBuilder( + Map rabbitmqConfig) { + return new RabbitmqSyncTableActionBuilder(rabbitmqConfig); + } + + /** Builder to build {@link RabbitmqSyncTableAction} from action arguments. */ + protected class RabbitmqSyncTableActionBuilder + extends SyncTableActionBuilder { + + public RabbitmqSyncTableActionBuilder(Map rabbitmqConfig) { + super(rabbitmqConfig); + } + + public RabbitmqSyncTableAction build() { + List args = + new ArrayList<>( + Arrays.asList( + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName)); + + args.addAll(mapToArgs("--rabbitmq-conf", sourceConfig)); + args.addAll(mapToArgs("--catalog-conf", catalogConfig)); + args.addAll(mapToArgs("--table-conf", tableConfig)); + + args.addAll(listToArgs("--partition-keys", partitionKeys)); + args.addAll(listToArgs("--primary-keys", primaryKeys)); + args.addAll(listToArgs("--type-mapping", typeMappingModes)); + + args.addAll(listToMultiArgs("--computed-column", computedColumnArgs)); + + MultipleParameterTool params = + MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0]))); + return (RabbitmqSyncTableAction) + new RabbitmqSyncTableActionFactory() + .create(params) + .orElseThrow(RuntimeException::new); + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqCanalSyncTableActionITCase.java new file mode 100644 index 000000000000..8bc44ef3b781 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqCanalSyncTableActionITCase.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.rabbitmq; + +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import com.rabbitmq.client.BuiltinExchangeType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE_TYPE; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.QUEUE_NAME; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.VALUE_FORMAT; + +/** IT cases for {@link RabbitmqSyncTableAction}. */ +public class RabbitmqCanalSyncTableActionITCase extends RabbitmqActionITCaseBase { + @Test + @Timeout(300) + public void testSchemaEvolution() throws Exception { + runSingleTableSchemaEvolution("schemaevolution"); + } + + private void runSingleTableSchemaEvolution(String sourceDir) throws Exception { + final String queue = "schema_evolution"; + createQueue(exchange, queue); + // ---------- Write the Canal json into Rabbitmq ------------------- + sendMessages( + exchange, + queue, + getMessages(String.format("kafka/canal/table/%s/canal-data-1.txt", sourceDir))); + + Map rabbitmqConfig = getBasicRabbitmqConfig(); + rabbitmqConfig.put(QUEUE_NAME.key(), queue); + rabbitmqConfig.put(EXCHANGE.key(), exchange); + rabbitmqConfig.put(EXCHANGE_TYPE.key(), BuiltinExchangeType.FANOUT.name()); + rabbitmqConfig.put(VALUE_FORMAT.key(), "canal-json"); + + RabbitmqSyncTableAction action = + syncTableActionBuilder(rabbitmqConfig) + .withPartitionKeys("pt") + .withPrimaryKeys("pt", "_id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + testSchemaEvolutionImpl(queue, sourceDir); + } + + private void testSchemaEvolutionImpl(String queue, String sourceDir) throws Exception { + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10) + }, + new String[] {"pt", "_id", "v1"}); + List primaryKeys = Arrays.asList("pt", "_id"); + List expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]"); + waitForResult(expected, table, rowType, primaryKeys); + + sendMessages( + exchange, + queue, + getMessages(String.format("kafka/canal/table/%s/canal-data-2.txt", sourceDir))); + + rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10), + DataTypes.INT() + }, + new String[] {"pt", "_id", "v1", "v2"}); + expected = + Arrays.asList( + "+I[1, 1, one, NULL]", + "+I[1, 2, second, NULL]", + "+I[2, 3, three, 30]", + "+I[2, 4, four, NULL]", + "+I[1, 5, five, 50]", + "+I[1, 6, six, 60]"); + waitForResult(expected, table, rowType, primaryKeys); + + sendMessages( + exchange, + queue, + getMessages(String.format("kafka/canal/table/%s/canal-data-3.txt", sourceDir))); + + rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10), + DataTypes.BIGINT() + }, + new String[] {"pt", "_id", "v1", "v2"}); + expected = + Arrays.asList( + "+I[1, 1, one, NULL]", + "+I[1, 2, second, NULL]", + "+I[2, 3, three, 30000000000]", + "+I[2, 4, four, NULL]", + "+I[1, 6, six, 60]", + "+I[2, 7, seven, 70000000000]", + "+I[2, 8, eight, 80000000000]"); + waitForResult(expected, table, rowType, primaryKeys); + + sendMessages( + exchange, + queue, + getMessages(String.format("kafka/canal/table/%s/canal-data-4.txt", sourceDir))); + + rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(20), + DataTypes.BIGINT(), + DataTypes.DECIMAL(8, 3), + DataTypes.VARBINARY(10), + DataTypes.FLOAT() + }, + new String[] {"pt", "_id", "v1", "v2", "v3", "v4", "v5"}); + expected = + Arrays.asList( + "+I[1, 1, one, NULL, NULL, NULL, NULL]", + "+I[1, 2, second, NULL, NULL, NULL, NULL]", + "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", + "+I[2, 4, four, NULL, NULL, NULL, NULL]", + "+I[1, 6, six, 60, NULL, NULL, NULL]", + "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", + "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", + "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]"); + waitForResult(expected, table, rowType, primaryKeys); + + sendMessages( + exchange, + queue, + getMessages(String.format("kafka/canal/table/%s/canal-data-5.txt", sourceDir))); + + rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(20), + DataTypes.BIGINT(), + DataTypes.DECIMAL(8, 3), + DataTypes.VARBINARY(20), + DataTypes.DOUBLE() + }, + new String[] {"pt", "_id", "v1", "v2", "v3", "v4", "v5"}); + expected = + Arrays.asList( + "+I[1, 1, one, NULL, NULL, NULL, NULL]", + "+I[1, 2, second, NULL, NULL, NULL, NULL]", + "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", + "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]", + "+I[1, 6, six, 60, NULL, NULL, NULL]", + "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", + "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", + "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]"); + waitForResult(expected, table, rowType, primaryKeys); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqDebeziumSyncTableActionITCase.java new file mode 100644 index 000000000000..34c760117bae --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqDebeziumSyncTableActionITCase.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.rabbitmq; + +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import com.rabbitmq.client.BuiltinExchangeType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE_TYPE; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.QUEUE_NAME; + +/** IT cases for {@link RabbitmqDebeziumSyncTableActionITCase}. */ +public class RabbitmqDebeziumSyncTableActionITCase extends RabbitmqActionITCaseBase { + + @Test + @Timeout(120) + public void testSchemaEvolution() throws Exception { + runSingleTableSchemaEvolution("schemaevolution"); + } + + private void runSingleTableSchemaEvolution(String sourceDir) throws Exception { + final String queue = "schema_evolution"; + createQueue(exchange, queue); + // ---------- Write the debezium json into rabbitmq ------------------- + sendMessages( + exchange, + queue, + getMessages( + String.format("kafka/debezium/table/%s/debezium-data-1.txt", sourceDir))); + + Map rabbitmqConfig = getBasicRabbitmqConfig(); + rabbitmqConfig.put(QUEUE_NAME.key(), queue); + rabbitmqConfig.put(EXCHANGE.key(), exchange); + rabbitmqConfig.put(EXCHANGE_TYPE.key(), BuiltinExchangeType.FANOUT.name()); + rabbitmqConfig.put(RabbitmqActionUtils.VALUE_FORMAT.key(), "debezium-json"); + + RabbitmqSyncTableAction action = + syncTableActionBuilder(rabbitmqConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + testSchemaEvolutionImpl(queue, sourceDir); + } + + private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception { + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]"); + waitForResult(expected, table, rowType, primaryKeys); + + // ---------- Write the debezium json into rabbitmq ------------------- + sendMessages( + exchange, + topic, + getMessages( + String.format("kafka/debezium/table/%s/debezium-data-2.txt", sourceDir))); + + rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "age"}); + expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", + "+I[102, car battery, 12V car battery, 8.1, NULL]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]"); + waitForResult(expected, table, rowType, primaryKeys); + + sendMessages( + exchange, + topic, + getMessages( + String.format("kafka/debezium/table/%s/debezium-data-3.txt", sourceDir))); + + rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "age", "address"}); + expected = + Arrays.asList( + "+I[102, car battery, 12V car battery, 8.1, NULL, NULL]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18, NULL]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]", + "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]", + "+I[107, rocks, box of assorted rocks, 5.3, NULL, NULL]"); + waitForResult(expected, table, rowType, primaryKeys); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSchemaITCase.java new file mode 100644 index 000000000000..baab11fe0060 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSchemaITCase.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.rabbitmq; + +import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; +import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; + +import com.rabbitmq.client.BuiltinExchangeType; +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE_TYPE; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.QUEUE_NAME; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.VALUE_FORMAT; +import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.getDataFormat; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for building schema from Rabbitmq. */ +public class RabbitmqSchemaITCase extends RabbitmqActionITCaseBase { + + @Test + @Timeout(120) + public void testPulsarSchema() throws Exception { + String queue = "test_rabbitmq_schema"; + createQueue(exchange, queue); + + // ---------- Write the Canal json into rabbitmq ------------------- + List messages = getMessages("kafka/canal/table/schemaevolution/canal-data-1.txt"); + sendMessages(exchange, queue, messages); + + Configuration rabbitmqConfig = Configuration.fromMap(getBasicRabbitmqConfig()); + rabbitmqConfig.set(QUEUE_NAME, queue); + rabbitmqConfig.set(EXCHANGE, exchange); + rabbitmqConfig.set(EXCHANGE_TYPE, BuiltinExchangeType.FANOUT.name()); + rabbitmqConfig.set(VALUE_FORMAT, "canal-json"); + + Schema rabbitmqSchema = + MessageQueueSchemaUtils.getSchema( + RabbitmqActionUtils.createRabbitmqConsumer(rabbitmqConfig, queue), + queue, + getDataFormat(rabbitmqConfig), + TypeMapping.defaultMapping()); + List fields = new ArrayList<>(); + fields.add(new DataField(0, "pt", DataTypes.INT())); + fields.add(new DataField(1, "_id", DataTypes.INT().notNull())); + fields.add(new DataField(2, "v1", DataTypes.VARCHAR(10))); + assertThat(rabbitmqSchema.fields()).isEqualTo(fields); + } +}