diff --git a/paimon-flink/paimon-flink-cdc/pom.xml b/paimon-flink/paimon-flink-cdc/pom.xml index 3941bd441dae..4d4d5dc74ab5 100644 --- a/paimon-flink/paimon-flink-cdc/pom.xml +++ b/paimon-flink/paimon-flink-cdc/pom.xml @@ -42,6 +42,7 @@ under the License. 2.8.0 1.18.3 4.0.0-1.17 + 3.0.1-1.17 @@ -96,6 +97,13 @@ under the License. provided + + org.apache.flink + flink-connector-rabbitmq + ${flink.connector.rabbitmq.version} + provided + + org.apache.flink flink-connector-kafka @@ -110,7 +118,6 @@ under the License. provided - com.ververica flink-connector-mongodb-cdc diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java index 854131eb34e3..c39b9d3a8675 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java @@ -35,6 +35,8 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.Source; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,7 +139,7 @@ public MessageQueueSyncTableActionBase withTypeMapping(TypeMapping typeMapping) return this; } - protected abstract Source buildSource(); + protected abstract Object buildSource(); protected abstract String topic(); @@ -149,7 +151,7 @@ public MessageQueueSyncTableActionBase withTypeMapping(TypeMapping typeMapping) @Override public void build() throws Exception { - Source source = buildSource(); + Object source = buildSource(); catalog.createDatabase(database, true); boolean caseSensitive = catalog.caseSensitive(); @@ -210,13 +212,7 @@ public void build() throws Exception { CdcSinkBuilder sinkBuilder = new CdcSinkBuilder() - .withInput( - env.fromSource( - source, - WatermarkStrategy.noWatermarks(), - sourceName()) - .flatMap(recordParser) - .name("Parse")) + .withInput(fromSource(source).flatMap(recordParser).name("Parse")) .withParserFactory(parserFactory) .withTable(fileStoreTable) .withIdentifier(identifier) @@ -228,6 +224,17 @@ public void build() throws Exception { sinkBuilder.build(); } + private DataStreamSource fromSource(Object s) { + if (s instanceof Source) { + return env.fromSource( + (Source) s, WatermarkStrategy.noWatermarks(), sourceName()); + } + if (s instanceof SourceFunction) { + return env.addSource((SourceFunction) s, sourceName()); + } + throw new UnsupportedOperationException("Unsupported source type"); + } + private Schema retrieveSchema() throws Exception { String topic = topic(); try (MessageQueueSchemaUtils.ConsumerWrapper consumer = consumer(topic)) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/QueueingConsumer.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/QueueingConsumer.java new file mode 100644 index 000000000000..868e43cf7d39 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/QueueingConsumer.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConsumerCancelledException; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Delivery; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.ShutdownSignalException; +import com.rabbitmq.utility.Utility; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class QueueingConsumer extends DefaultConsumer { + + private final Connection connection; + private final Channel channel; + private final BlockingQueue queue; + private volatile ShutdownSignalException shutdown; + private volatile ConsumerCancelledException cancelled; + private static final Delivery POISON = new Delivery(null, null, null); + + public QueueingConsumer(Connection connection, Channel channel) { + this(connection, channel, Integer.MAX_VALUE); + } + + public QueueingConsumer(Connection connection, Channel channel, int capacity) { + super(channel); + this.connection = connection; + this.channel = channel; + this.queue = new LinkedBlockingQueue(capacity); + } + + private void checkShutdown() { + if (this.shutdown != null) { + throw Utility.fixStackTrace(this.shutdown); + } + } + + private Delivery handle(Delivery delivery) { + if (delivery == POISON + || delivery == null && (this.shutdown != null || this.cancelled != null)) { + if (delivery == POISON) { + this.queue.add(POISON); + if (this.shutdown == null && this.cancelled == null) { + throw new IllegalStateException( + "POISON in queue, but null shutdown and null cancelled. This should never happen, please report as a BUG"); + } + } + + if (null != this.shutdown) { + throw Utility.fixStackTrace(this.shutdown); + } + + if (null != this.cancelled) { + throw Utility.fixStackTrace(this.cancelled); + } + } + + return delivery; + } + + public Delivery nextDelivery(long timeout, TimeUnit unit) + throws InterruptedException, ShutdownSignalException, ConsumerCancelledException { + return this.handle(this.queue.poll(timeout, unit)); + } + + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + this.shutdown = sig; + this.queue.add(POISON); + } + + public void handleCancel(String consumerTag) { + this.cancelled = new ConsumerCancelledException(); + this.queue.add(POISON); + } + + public void handleDelivery( + String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + this.checkShutdown(); + this.queue.add(new Delivery(envelope, properties, body)); + } + + public void close() throws IOException, TimeoutException { + if (channel != null) { + channel.close(); + } + if (connection != null) { + connection.close(); + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionUtils.java new file mode 100644 index 000000000000..9a0bbdfa9416 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionUtils.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.rabbitmq; + +import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; +import org.apache.paimon.flink.action.cdc.format.DataFormat; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Delivery; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Utils for rabbitmq synchronization. */ +public class RabbitmqActionUtils { + + static final ConfigOption VALUE_FORMAT = + ConfigOptions.key("value.format") + .stringType() + .noDefaultValue() + .withDescription("Defines the format identifier for encoding value data."); + + static final ConfigOption QUEUE_NAME = + ConfigOptions.key("queue.name") + .stringType() + .noDefaultValue() + .withDescription("The queue to receive messages from."); + + static final ConfigOption RABBITMQ_HOST = + ConfigOptions.key("host") + .stringType() + .noDefaultValue() + .withDescription("The default host to use for connections."); + + static final ConfigOption RABBITMQ_VIRTUAL_HOST = + ConfigOptions.key("virtual.host") + .stringType() + .noDefaultValue() + .withDescription("The virtual host to use when connecting to the broker."); + + static final ConfigOption RABBITMQ_PORT = + ConfigOptions.key("port") + .intType() + .noDefaultValue() + .withDescription("The default port to use for connections."); + + static final ConfigOption RABBITMQ_USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription("The AMQP user name to use when connecting to the broker."); + + static final ConfigOption RABBITMQ_PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription("The password to use when connecting to the broker."); + + static final ConfigOption RABBITMQ_NETWORK_RECOVERY_INTERVAL = + ConfigOptions.key("network.recovery.interval") + .intType() + .defaultValue(5000) + .withDescription( + "How long will automatic recovery wait before attempting to reconnect, in ms."); + + static final ConfigOption RABBITMQ_AUTOMATIC_RECOVERY = + ConfigOptions.key("automatic.recovery") + .booleanType() + .noDefaultValue() + .withDescription( + "Enables or disables automatic connection recovery. if true, enables connection recovery."); + + static final ConfigOption RABBITMQ_TOPOLOGY_RECOVERY = + ConfigOptions.key("topology.recovery") + .booleanType() + .defaultValue(true) + .withDescription( + "Enables or disables topology recovery. if true, enables topology recovery."); + + static final ConfigOption RABBITMQ_CONNECTION_TIMEOUT = + ConfigOptions.key("connection.timeout") + .intType() + .noDefaultValue() + .withDescription( + "Connection establishment timeout in milliseconds; zero for infinite."); + + static final ConfigOption RABBITMQ_REQUESTED_CHANNEL_MAX = + ConfigOptions.key("requested.channel.max") + .intType() + .noDefaultValue() + .withDescription( + "The initially requested maximum channel number; zero for unlimited."); + + static final ConfigOption RABBITMQ_REQUESTED_FRAME_MAX = + ConfigOptions.key("requested.frame.max") + .intType() + .noDefaultValue() + .withDescription( + "The initially requested maximum frame size, in octets; zero for unlimited."); + + static final ConfigOption RABBITMQ_REQUESTED_HEARTBEAT = + ConfigOptions.key("requested.heartbeat") + .intType() + .noDefaultValue() + .withDescription( + "The initially requested heartbeat interval, in seconds; zero for none."); + + static final ConfigOption RABBITMQ_PREFETCH_COUNT = + ConfigOptions.key("prefetch.count") + .intType() + .noDefaultValue() + .withDescription( + "The max number of messages to receive without acknowledgement.."); + + static final ConfigOption RABBITMQ_DELIVERY_TIMEOUT = + ConfigOptions.key("delivery.timeout") + .longType() + .defaultValue(30000L) + .withDescription( + "Retrieve the message delivery timeout used in the queueing consumer. If not specified explicitly, the default value of 30000 milliseconds will be returned."); + + static RMQSource buildRabbitmqSource(Configuration rabbitmqConfig) { + validateRabbitmqConfig(rabbitmqConfig); + String queueName = rabbitmqConfig.get(QUEUE_NAME); + RMQConnectionConfig connectionConfig = setOptionConfig(rabbitmqConfig); + return new RMQSource<>(connectionConfig, queueName, new SimpleStringSchema()); + } + + private static RMQConnectionConfig setOptionConfig(Configuration rabbitmqConfig) { + RMQConnectionConfig.Builder builder = + new RMQConnectionConfig.Builder() + .setHost(rabbitmqConfig.get(RABBITMQ_HOST)) + .setPort(rabbitmqConfig.get(RABBITMQ_PORT)) + .setUserName(rabbitmqConfig.get(RABBITMQ_USERNAME)) + .setPassword(rabbitmqConfig.get(RABBITMQ_PASSWORD)) + .setVirtualHost(rabbitmqConfig.get(RABBITMQ_VIRTUAL_HOST)); + + if (rabbitmqConfig.contains(RABBITMQ_NETWORK_RECOVERY_INTERVAL)) { + builder.setNetworkRecoveryInterval( + rabbitmqConfig.get(RABBITMQ_NETWORK_RECOVERY_INTERVAL)); + } + if (rabbitmqConfig.contains(RABBITMQ_AUTOMATIC_RECOVERY)) { + builder.setAutomaticRecovery(rabbitmqConfig.get(RABBITMQ_AUTOMATIC_RECOVERY)); + } + if (rabbitmqConfig.contains(RABBITMQ_TOPOLOGY_RECOVERY)) { + builder.setTopologyRecoveryEnabled(rabbitmqConfig.get(RABBITMQ_TOPOLOGY_RECOVERY)); + } + if (rabbitmqConfig.contains(RABBITMQ_CONNECTION_TIMEOUT)) { + builder.setConnectionTimeout(rabbitmqConfig.get(RABBITMQ_CONNECTION_TIMEOUT)); + } + if (rabbitmqConfig.contains(RABBITMQ_REQUESTED_CHANNEL_MAX)) { + builder.setRequestedChannelMax(rabbitmqConfig.get(RABBITMQ_REQUESTED_CHANNEL_MAX)); + } + if (rabbitmqConfig.contains(RABBITMQ_REQUESTED_FRAME_MAX)) { + builder.setRequestedFrameMax(rabbitmqConfig.get(RABBITMQ_REQUESTED_FRAME_MAX)); + } + if (rabbitmqConfig.contains(RABBITMQ_REQUESTED_HEARTBEAT)) { + builder.setRequestedHeartbeat(rabbitmqConfig.get(RABBITMQ_REQUESTED_HEARTBEAT)); + } + if (rabbitmqConfig.contains(RABBITMQ_PREFETCH_COUNT)) { + builder.setPrefetchCount(rabbitmqConfig.get(RABBITMQ_PREFETCH_COUNT)); + } + if (rabbitmqConfig.contains(RABBITMQ_DELIVERY_TIMEOUT)) { + builder.setDeliveryTimeout(rabbitmqConfig.get(RABBITMQ_DELIVERY_TIMEOUT)); + } + return builder.build(); + } + + private static void validateRabbitmqConfig(Configuration rabbitmqConfig) { + checkArgument( + rabbitmqConfig.contains(VALUE_FORMAT), + String.format("rabbitmq-conf [%s] must be specified.", VALUE_FORMAT.key())); + + checkArgument( + rabbitmqConfig.contains(RABBITMQ_HOST), + String.format("rabbitmq-conf [%s] must be specified.", RABBITMQ_HOST.key())); + + checkArgument( + rabbitmqConfig.contains(RABBITMQ_PORT), + String.format("rabbitmq-conf [%s] must be specified.", RABBITMQ_PORT.key())); + + checkArgument( + rabbitmqConfig.contains(RABBITMQ_USERNAME), + String.format("rabbitmq-conf [%s] must be specified.", RABBITMQ_USERNAME.key())); + + checkArgument( + rabbitmqConfig.contains(RABBITMQ_PASSWORD), + String.format("rabbitmq-conf [%s] must be specified.", RABBITMQ_PASSWORD.key())); + + checkArgument( + rabbitmqConfig.contains(RABBITMQ_VIRTUAL_HOST), + String.format( + "rabbitmq-conf [%s] must be specified.", RABBITMQ_VIRTUAL_HOST.key())); + + checkArgument( + rabbitmqConfig.contains(QUEUE_NAME), + String.format("rabbitmq-conf [%s] must be specified.", QUEUE_NAME.key())); + } + + static MessageQueueSchemaUtils.ConsumerWrapper createRabbitmqConsumer( + Configuration rabbitmqConfig, String queueName) { + RMQConnectionConfig connectionConfig = setOptionConfig(rabbitmqConfig); + try { + Connection connection = connectionConfig.getConnectionFactory().newConnection(); + Channel channel = setupChannel(connectionConfig, connection); + if (channel == null) { + throw new RuntimeException("None of RabbitMQ channels are available"); + } + setupQueue(channel, queueName); + QueueingConsumer consumer = new QueueingConsumer(connection, channel); + channel.basicConsume(queueName, true, consumer); + return new RabbitmqConsumerWrapper(consumer); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void setupQueue(Channel channel, String queueName) throws IOException { + channel.queueDeclare(queueName, true, false, false, null); + } + + private static Channel setupChannel( + RMQConnectionConfig rmqConnectionConfig, Connection connection) throws Exception { + Channel chan = connection.createChannel(); + if (rmqConnectionConfig.getPrefetchCount().isPresent()) { + chan.basicQos(rmqConnectionConfig.getPrefetchCount().get(), true); + } + return chan; + } + + static DataFormat getDataFormat(Configuration rabbitmqConfig) { + return DataFormat.fromConfigString(rabbitmqConfig.get(VALUE_FORMAT)); + } + + private static class RabbitmqConsumerWrapper + implements MessageQueueSchemaUtils.ConsumerWrapper { + + private final QueueingConsumer consumer; + + RabbitmqConsumerWrapper(QueueingConsumer consumer) { + this.consumer = consumer; + } + + @Override + public List getRecords(String topic, int pollTimeOutMills) { + try { + Delivery delivery = consumer.nextDelivery(pollTimeOutMills, TimeUnit.MILLISECONDS); + return delivery == null + ? Collections.emptyList() + : Collections.singletonList( + new String(delivery.getBody(), StandardCharsets.UTF_8)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException, TimeoutException { + consumer.close(); + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSyncTableAction.java new file mode 100644 index 000000000000..364c2683763b --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSyncTableAction.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.rabbitmq; + +import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; +import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase; +import org.apache.paimon.flink.action.cdc.format.DataFormat; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.Map; + +/** Synchronize table from Rabbitmq. */ +public class RabbitmqSyncTableAction extends MessageQueueSyncTableActionBase { + + public RabbitmqSyncTableAction( + String warehouse, + String database, + String table, + Map catalogConfig, + Map mqConfig) { + super(warehouse, database, table, catalogConfig, mqConfig); + } + + @Override + protected SourceFunction buildSource() { + return RabbitmqActionUtils.buildRabbitmqSource(mqConfig); + } + + @Override + protected String topic() { + return mqConfig.get(RabbitmqActionUtils.QUEUE_NAME); + } + + @Override + protected MessageQueueSchemaUtils.ConsumerWrapper consumer(String topic) { + return RabbitmqActionUtils.createRabbitmqConsumer(mqConfig, topic); + } + + @Override + protected DataFormat getDataFormat() { + return RabbitmqActionUtils.getDataFormat(mqConfig); + } + + @Override + protected String sourceName() { + return "Rabbitmq Source"; + } + + @Override + protected String jobName() { + return String.format("Rabbitmq-Paimon Table Sync: %s.%s", database, table); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSyncTableActionFactory.java new file mode 100644 index 000000000000..b9a6e910a519 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSyncTableActionFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.rabbitmq; + +import org.apache.paimon.flink.action.Action; +import org.apache.paimon.flink.action.ActionFactory; +import org.apache.paimon.flink.action.cdc.TypeMapping; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.MultipleParameterTool; + +import java.util.ArrayList; +import java.util.Optional; + +/** Factory to create {@link RabbitmqSyncTableAction}. */ +public class RabbitmqSyncTableActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "rabbitmq-sync-table"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterTool params) { + Tuple3 tablePath = getTablePath(params); + checkRequiredArgument(params, "rabbitmq-conf"); + + RabbitmqSyncTableAction action = + new RabbitmqSyncTableAction( + tablePath.f0, + tablePath.f1, + tablePath.f2, + optionalConfigMap(params, "catalog-conf"), + optionalConfigMap(params, "rabbitmq-conf")); + action.withTableConfig(optionalConfigMap(params, "table-conf")); + + if (params.has("partition-keys")) { + action.withPartitionKeys(params.get("partition-keys").split(",")); + } + + if (params.has("primary-keys")) { + action.withPrimaryKeys(params.get("primary-keys").split(",")); + } + + if (params.has("computed-column")) { + action.withComputedColumnArgs( + new ArrayList<>(params.getMultiParameter("computed-column"))); + } + + if (params.has("type-mapping")) { + String[] options = params.get("type-mapping").split(","); + action.withTypeMapping(TypeMapping.parse(options)); + } + + return Optional.of(action); + } + + @Override + public void printHelp() { + System.out.println( + "Action \"rabbitmq-sync-table\" creates a streaming job " + + "with a Flink rabbitmq CDC source and a Paimon table sink to consume CDC events."); + System.out.println(); + } +}