diff --git a/paimon-flink/paimon-flink-cdc/pom.xml b/paimon-flink/paimon-flink-cdc/pom.xml
index 4d4d5dc74ab5..3afe2b82c09b 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -290,6 +290,13 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>rabbitmq</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>mongodb</artifactId>
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/QueueingConsumer.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/QueueingConsumer.java
deleted file mode 100644
index 868e43cf7d39..000000000000
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/QueueingConsumer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action.cdc.rabbitmq;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConsumerCancelledException;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Delivery;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.ShutdownSignalException;
-import com.rabbitmq.utility.Utility;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-public class QueueingConsumer extends DefaultConsumer {
-
-    private final Connection connection;
-    private final Channel channel;
-    private final BlockingQueue<Delivery> queue;
-    private volatile ShutdownSignalException shutdown;
-    private volatile ConsumerCancelledException cancelled;
-    private static final Delivery POISON = new Delivery(null, null, null);
-
-    public QueueingConsumer(Connection connection, Channel channel) {
-        this(connection, channel, Integer.MAX_VALUE);
-    }
-
-    public QueueingConsumer(Connection connection, Channel channel, int capacity) {
-        super(channel);
-        this.connection = connection;
-        this.channel = channel;
-        this.queue = new LinkedBlockingQueue(capacity);
-    }
-
-    private void checkShutdown() {
-        if (this.shutdown != null) {
-            throw Utility.fixStackTrace(this.shutdown);
-        }
-    }
-
-    private Delivery handle(Delivery delivery) {
-        if (delivery == POISON
-                || delivery == null && (this.shutdown != null || this.cancelled != null)) {
-            if (delivery == POISON) {
-                this.queue.add(POISON);
-                if (this.shutdown == null && this.cancelled == null) {
-                    throw new IllegalStateException(
-                            "POISON in queue, but null shutdown and null cancelled. This should never happen, please report as a BUG");
-                }
-            }
-
-            if (null != this.shutdown) {
-                throw Utility.fixStackTrace(this.shutdown);
-            }
-
-            if (null != this.cancelled) {
-                throw Utility.fixStackTrace(this.cancelled);
-            }
-        }
-
-        return delivery;
-    }
-
-    public Delivery nextDelivery(long timeout, TimeUnit unit)
-            throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
-        return this.handle(this.queue.poll(timeout, unit));
-    }
-
-    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
-        this.shutdown = sig;
-        this.queue.add(POISON);
-    }
-
-    public void handleCancel(String consumerTag) {
-        this.cancelled = new ConsumerCancelledException();
-        this.queue.add(POISON);
-    }
-
-    public void handleDelivery(
-            String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
-        this.checkShutdown();
-        this.queue.add(new Delivery(envelope, properties, body));
-    }
-
-    public void close() throws IOException, TimeoutException {
-        if (channel != null) {
-            channel.close();
-        }
-        if (connection != null) {
-            connection.close();
-        }
-    }
-}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionUtils.java
index 9a0bbdfa9416..a0abe8a50c5f 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionUtils.java
@@ -20,22 +20,22 @@
 
 import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
 import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.utils.StringUtils;
 
+import com.rabbitmq.client.BuiltinExchangeType;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.GetResponse;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
 import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -48,6 +48,14 @@ public class RabbitmqActionUtils {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("Defines the format identifier for encoding value data.");
+    static final ConfigOption<String> EXCHANGE =
+            ConfigOptions.key("exchange").stringType().noDefaultValue().withDescription(".");
+
+    static final ConfigOption<String> EXCHANGE_TYPE =
+            ConfigOptions.key("exchange.type").stringType().noDefaultValue().withDescription(".");
+
+    static final ConfigOption<String> ROUTING_KEY =
+            ConfigOptions.key("routing.key").stringType().noDefaultValue().withDescription(".");
 
     static final ConfigOption<String> QUEUE_NAME =
             ConfigOptions.key("queue.name")
@@ -148,11 +156,32 @@ public class RabbitmqActionUtils {
                     .withDescription(
                             "Retrieve the message delivery timeout used in the queueing consumer. If not specified explicitly, the default value of 30000 milliseconds will be returned.");
 
-    static RMQSource<String> buildRabbitmqSource(Configuration rabbitmqConfig) {
+    static RabbitmqSource buildRabbitmqSource(Configuration rabbitmqConfig) {
         validateRabbitmqConfig(rabbitmqConfig);
         String queueName = rabbitmqConfig.get(QUEUE_NAME);
+        String exchange = null;
+        if (rabbitmqConfig.contains(EXCHANGE)) {
+            exchange = rabbitmqConfig.get(EXCHANGE);
+        }
+
+        BuiltinExchangeType exchangeType = null;
+        if (rabbitmqConfig.contains(EXCHANGE_TYPE)) {
+            exchangeType = BuiltinExchangeType.valueOf(rabbitmqConfig.get(EXCHANGE_TYPE));
+        }
+
+        String routingKey = null;
+        if (rabbitmqConfig.contains(ROUTING_KEY)) {
+            routingKey = rabbitmqConfig.get(ROUTING_KEY);
+        }
+
         RMQConnectionConfig connectionConfig = setOptionConfig(rabbitmqConfig);
-        return new RMQSource<>(connectionConfig, queueName, new SimpleStringSchema());
+        return new RabbitmqSource(
+                connectionConfig,
+                exchange,
+                exchangeType,
+                routingKey,
+                queueName,
+                new SimpleStringSchema());
     }
 
     private static RMQConnectionConfig setOptionConfig(Configuration rabbitmqConfig) {
@@ -229,32 +258,44 @@ private static void validateRabbitmqConfig(Configuration rabbitmqConfig) {
     static MessageQueueSchemaUtils.ConsumerWrapper createRabbitmqConsumer(
             Configuration rabbitmqConfig, String queueName) {
         RMQConnectionConfig connectionConfig = setOptionConfig(rabbitmqConfig);
+
         try {
             Connection connection = connectionConfig.getConnectionFactory().newConnection();
-            Channel channel = setupChannel(connectionConfig, connection);
+            Channel channel = setupChannel(connection);
             if (channel == null) {
                 throw new RuntimeException("None of RabbitMQ channels are available");
             }
-            setupQueue(channel, queueName);
-            QueueingConsumer consumer = new QueueingConsumer(connection, channel);
-            channel.basicConsume(queueName, true, consumer);
-            return new RabbitmqConsumerWrapper(consumer);
+            setupQueue(channel, rabbitmqConfig, queueName);
+            return new RabbitmqConsumerWrapper(connection, channel);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    private static void setupQueue(Channel channel, String queueName) throws IOException {
+    private static void setupQueue(Channel channel, Configuration rabbitmqConfig, String queueName)
+            throws IOException {
+        ;
         channel.queueDeclare(queueName, true, false, false, null);
+        String exchange = null;
+        if (rabbitmqConfig.contains(EXCHANGE)) {
+            exchange = rabbitmqConfig.get(EXCHANGE);
+        }
+        BuiltinExchangeType exchangeType = null;
+        if (rabbitmqConfig.contains(EXCHANGE_TYPE)) {
+            exchangeType = BuiltinExchangeType.valueOf(rabbitmqConfig.get(EXCHANGE_TYPE));
+        }
+        String routingKey = "";
+        if (rabbitmqConfig.contains(ROUTING_KEY)) {
+            routingKey = rabbitmqConfig.get(ROUTING_KEY);
+        }
+        if (!StringUtils.isBlank(exchange) && exchangeType != null) {
+            channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);
+            channel.queueBind(queueName, exchange, routingKey);
+        }
     }
 
-    private static Channel setupChannel(
-            RMQConnectionConfig rmqConnectionConfig, Connection connection) throws Exception {
-        Channel chan = connection.createChannel();
-        if (rmqConnectionConfig.getPrefetchCount().isPresent()) {
-            chan.basicQos(rmqConnectionConfig.getPrefetchCount().get(), true);
-        }
-        return chan;
+    private static Channel setupChannel(Connection connection) throws Exception {
+        return connection.createChannel();
     }
 
     static DataFormat getDataFormat(Configuration rabbitmqConfig) {
@@ -264,28 +305,31 @@ static DataFormat getDataFormat(Configuration rabbitmqConfig) {
     private static class RabbitmqConsumerWrapper
             implements MessageQueueSchemaUtils.ConsumerWrapper {
 
-        private final QueueingConsumer consumer;
+        private final Channel channel;
+        private final Connection connection;
 
-        RabbitmqConsumerWrapper(QueueingConsumer consumer) {
-            this.consumer = consumer;
+        RabbitmqConsumerWrapper(Connection connection, Channel channel) {
+            this.channel = channel;
+            this.connection = connection;
         }
 
         @Override
-        public List<String> getRecords(String topic, int pollTimeOutMills) {
+        public List<String> getRecords(String queue, int pollTimeOutMills) {
             try {
-                Delivery delivery = consumer.nextDelivery(pollTimeOutMills, TimeUnit.MILLISECONDS);
-                return delivery == null
+                GetResponse response = channel.basicGet(queue, false);
+                return response == null
                         ? Collections.emptyList()
                         : Collections.singletonList(
-                                new String(delivery.getBody(), StandardCharsets.UTF_8));
-            } catch (InterruptedException e) {
+                                new String(response.getBody(), StandardCharsets.UTF_8));
+            } catch (IOException e) {
                 throw new RuntimeException(e);
             }
         }
 
         @Override
         public void close() throws IOException, TimeoutException {
-            consumer.close();
+            channel.close();
+            connection.close();
         }
     }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSource.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSource.java
new file mode 100644
index 000000000000..0dbdefdead9b
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSource.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.rabbitmq;
+
+import org.apache.paimon.utils.StringUtils;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+
+import java.io.IOException;
+
+public class RabbitmqSource extends RMQSource<String> {
+    private String exchange;
+    private BuiltinExchangeType exchangeType;
+    private String routingKey;
+
+    public RabbitmqSource(
+            RMQConnectionConfig rmqConnectionConfig,
+            String exchange,
+            BuiltinExchangeType exchangeType,
+            String routingKey,
+            String queueName,
+            DeserializationSchema<String> deserializationSchema) {
+        super(rmqConnectionConfig, queueName, deserializationSchema);
+        this.exchange = exchange;
+        this.exchangeType = exchangeType;
+        this.routingKey = routingKey;
+    }
+
+    @Override
+    protected void setupQueue() throws IOException {
+        super.setupQueue();
+        if (!StringUtils.isBlank(exchange) && exchangeType != null) {
+            channel.exchangeDeclare(exchange, exchangeType);
+            channel.queueBind(
+                    queueName, exchange, StringUtils.isBlank(routingKey) ? "" : routingKey);
+        }
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionITCaseBase.java
new file mode 100644
index 000000000000..98d2005ad450
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqActionITCaseBase.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.rabbitmq;
+
+import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.junit.ClassRule;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for Rabbitmq synchronization. */
+public class RabbitmqActionITCaseBase extends CdcActionITCaseBase {
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitmqActionITCaseBase.class);
+    protected final String exchange = "schema_exchange";
+    private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
+    private static final SimpleStringSchema SCHEMA = new SimpleStringSchema();
+    public static final String RABBITMQ = "rabbitmq:3.9.8-management-alpine";
+    private static final int RABBITMQ_PORT = 5672;
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private List<String> queues = new ArrayList<>();
+
+    @ClassRule
+    private static final RabbitMQContainer RMQ_CONTAINER =
+            new RabbitMQContainer(DockerImageName.parse(RABBITMQ))
+                    .withExposedPorts(RABBITMQ_PORT)
+                    .withLogConsumer(LOG_CONSUMER);
+
+    private Connection connection;
+    private Channel channel;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        RMQ_CONTAINER.start();
+        connection = getRMQConnection();
+        channel = connection.createChannel();
+    }
+
+    private static Connection getRMQConnection() throws IOException, TimeoutException {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost(RMQ_CONTAINER.getHost());
+        factory.setVirtualHost("/");
+        factory.setPort(RMQ_CONTAINER.getAmqpPort());
+        factory.setUsername(RMQ_CONTAINER.getAdminUsername());
+        factory.setPassword(RMQ_CONTAINER.getAdminPassword());
+        return factory.newConnection();
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        super.after();
+        // Delete topics for avoid reusing topics of pulsar cluster
+        deleteTopics();
+        channel.close();
+        connection.close();
+        RMQ_CONTAINER.close();
+    }
+
+    protected Map<String, String> getBasicRabbitmqConfig() {
+        Map<String, String> config = new HashMap<>();
+        config.put(RabbitmqActionUtils.RABBITMQ_USERNAME.key(), RMQ_CONTAINER.getAdminUsername());
+        config.put(RabbitmqActionUtils.RABBITMQ_PASSWORD.key(), RMQ_CONTAINER.getAdminPassword());
+        config.put(RabbitmqActionUtils.RABBITMQ_VIRTUAL_HOST.key(), "/");
+        config.put(RabbitmqActionUtils.RABBITMQ_HOST.key(), RMQ_CONTAINER.getHost());
+        config.put(RabbitmqActionUtils.RABBITMQ_PORT.key(), RMQ_CONTAINER.getAmqpPort().toString());
+        return config;
+    }
+
+    protected void createQueue(String exchange, String queueName) {
+        try {
+            channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);
+            channel.queueDeclare(queueName, true, false, false, null);
+            channel.queueBind(queueName, exchange, "");
+            queues.add(queueName);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void deleteTopics() throws Exception {
+        for (String queue : queues) {
+            channel.queueDelete(queue);
+        }
+        queues = new ArrayList<>();
+    }
+
+    protected List<String> getMessages(String resource) throws IOException {
+        URL url = RabbitmqActionITCaseBase.class.getClassLoader().getResource(resource);
+        assertThat(url).isNotNull();
+        java.nio.file.Path path = new File(url.getFile()).toPath();
+        List<String> lines = Files.readAllLines(path);
+        List<String> messages = new ArrayList<>();
+        for (String line : lines) {
+            try {
+                objectMapper.readTree(line);
+                if (!StringUtils.isEmpty(line)) {
+                    messages.add(line);
+                }
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+
+        return messages;
+    }
+
+    /**
+     * Send a list messages to Rabbitmq.
+     *
+     * @param queueName The name of the queue.
+     * @param messages The records need to be sent.
+     */
+    public static void sendMessages(String exchange, String queueName, List<String> messages)
+            throws IOException, TimeoutException {
+        try (Connection rmqConnection = getRMQConnection();
+                Channel channel = rmqConnection.createChannel()) {
+            for (String msg : messages) {
+                channel.basicPublish(
+                        exchange,
+                        queueName,
+                        MessageProperties.PERSISTENT_TEXT_PLAIN,
+                        SCHEMA.serialize(msg));
+            }
+        }
+    }
+
+    protected RabbitmqSyncTableActionBuilder syncTableActionBuilder(
+            Map<String, String> rabbitmqConfig) {
+        return new RabbitmqSyncTableActionBuilder(rabbitmqConfig);
+    }
+
+    /** Builder to build {@link RabbitmqSyncTableAction} from action arguments. */
+    protected class RabbitmqSyncTableActionBuilder
+            extends SyncTableActionBuilder<RabbitmqSyncTableAction> {
+
+        public RabbitmqSyncTableActionBuilder(Map<String, String> rabbitmqConfig) {
+            super(rabbitmqConfig);
+        }
+
+        public RabbitmqSyncTableAction build() {
+            List<String> args =
+                    new ArrayList<>(
+                            Arrays.asList(
+                                    "--warehouse",
+                                    warehouse,
+                                    "--database",
+                                    database,
+                                    "--table",
+                                    tableName));
+
+            args.addAll(mapToArgs("--rabbitmq-conf", sourceConfig));
+            args.addAll(mapToArgs("--catalog-conf", catalogConfig));
+            args.addAll(mapToArgs("--table-conf", tableConfig));
+
+            args.addAll(listToArgs("--partition-keys", partitionKeys));
+            args.addAll(listToArgs("--primary-keys", primaryKeys));
+            args.addAll(listToArgs("--type-mapping", typeMappingModes));
+
+            args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));
+
+            MultipleParameterTool params =
+                    MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
+            return (RabbitmqSyncTableAction)
+                    new RabbitmqSyncTableActionFactory()
+                            .create(params)
+                            .orElseThrow(RuntimeException::new);
+        }
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqCanalSyncTableActionITCase.java
new file mode 100644
index 000000000000..8bc44ef3b781
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqCanalSyncTableActionITCase.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.rabbitmq;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE;
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE_TYPE;
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.QUEUE_NAME;
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.VALUE_FORMAT;
+
+/** IT cases for {@link RabbitmqSyncTableAction}. */
+public class RabbitmqCanalSyncTableActionITCase extends RabbitmqActionITCaseBase {
+    @Test
+    @Timeout(300)
+    public void testSchemaEvolution() throws Exception {
+        runSingleTableSchemaEvolution("schemaevolution");
+    }
+
+    private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
+        final String queue = "schema_evolution";
+        createQueue(exchange, queue);
+        // ---------- Write the Canal json into Rabbitmq -------------------
+        sendMessages(
+                exchange,
+                queue,
+                getMessages(String.format("kafka/canal/table/%s/canal-data-1.txt", sourceDir)));
+
+        Map<String, String> rabbitmqConfig = getBasicRabbitmqConfig();
+        rabbitmqConfig.put(QUEUE_NAME.key(), queue);
+        rabbitmqConfig.put(EXCHANGE.key(), exchange);
+        rabbitmqConfig.put(EXCHANGE_TYPE.key(), BuiltinExchangeType.FANOUT.name());
+        rabbitmqConfig.put(VALUE_FORMAT.key(), "canal-json");
+
+        RabbitmqSyncTableAction action =
+                syncTableActionBuilder(rabbitmqConfig)
+                        .withPartitionKeys("pt")
+                        .withPrimaryKeys("pt", "_id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        testSchemaEvolutionImpl(queue, sourceDir);
+    }
+
+    private void testSchemaEvolutionImpl(String queue, String sourceDir) throws Exception {
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10)
+                        },
+                        new String[] {"pt", "_id", "v1"});
+        List<String> primaryKeys = Arrays.asList("pt", "_id");
+        List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        sendMessages(
+                exchange,
+                queue,
+                getMessages(String.format("kafka/canal/table/%s/canal-data-2.txt", sourceDir)));
+
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.INT()
+                        },
+                        new String[] {"pt", "_id", "v1", "v2"});
+        expected =
+                Arrays.asList(
+                        "+I[1, 1, one, NULL]",
+                        "+I[1, 2, second, NULL]",
+                        "+I[2, 3, three, 30]",
+                        "+I[2, 4, four, NULL]",
+                        "+I[1, 5, five, 50]",
+                        "+I[1, 6, six, 60]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        sendMessages(
+                exchange,
+                queue,
+                getMessages(String.format("kafka/canal/table/%s/canal-data-3.txt", sourceDir)));
+
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {"pt", "_id", "v1", "v2"});
+        expected =
+                Arrays.asList(
+                        "+I[1, 1, one, NULL]",
+                        "+I[1, 2, second, NULL]",
+                        "+I[2, 3, three, 30000000000]",
+                        "+I[2, 4, four, NULL]",
+                        "+I[1, 6, six, 60]",
+                        "+I[2, 7, seven, 70000000000]",
+                        "+I[2, 8, eight, 80000000000]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        sendMessages(
+                exchange,
+                queue,
+                getMessages(String.format("kafka/canal/table/%s/canal-data-4.txt", sourceDir)));
+
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(20),
+                            DataTypes.BIGINT(),
+                            DataTypes.DECIMAL(8, 3),
+                            DataTypes.VARBINARY(10),
+                            DataTypes.FLOAT()
+                        },
+                        new String[] {"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
+        expected =
+                Arrays.asList(
+                        "+I[1, 1, one, NULL, NULL, NULL, NULL]",
+                        "+I[1, 2, second, NULL, NULL, NULL, NULL]",
+                        "+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
+                        "+I[2, 4, four, NULL, NULL, NULL, NULL]",
+                        "+I[1, 6, six, 60, NULL, NULL, NULL]",
+                        "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
+                        "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]",
+                        "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        sendMessages(
+                exchange,
+                queue,
+                getMessages(String.format("kafka/canal/table/%s/canal-data-5.txt", sourceDir)));
+
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(20),
+                            DataTypes.BIGINT(),
+                            DataTypes.DECIMAL(8, 3),
+                            DataTypes.VARBINARY(20),
+                            DataTypes.DOUBLE()
+                        },
+                        new String[] {"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
+        expected =
+                Arrays.asList(
+                        "+I[1, 1, one, NULL, NULL, NULL, NULL]",
+                        "+I[1, 2, second, NULL, NULL, NULL, NULL]",
+                        "+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
+                        "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]",
+                        "+I[1, 6, six, 60, NULL, NULL, NULL]",
+                        "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
+                        "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]",
+                        "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqDebeziumSyncTableActionITCase.java
new file mode 100644
index 000000000000..34c760117bae
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqDebeziumSyncTableActionITCase.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.rabbitmq;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE;
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE_TYPE;
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.QUEUE_NAME;
+
+/** IT cases for {@link RabbitmqDebeziumSyncTableActionITCase}. */
+public class RabbitmqDebeziumSyncTableActionITCase extends RabbitmqActionITCaseBase {
+
+    @Test
+    @Timeout(120)
+    public void testSchemaEvolution() throws Exception {
+        runSingleTableSchemaEvolution("schemaevolution");
+    }
+
+    private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
+        final String queue = "schema_evolution";
+        createQueue(exchange, queue);
+        // ---------- Write the debezium json into rabbitmq -------------------
+        sendMessages(
+                exchange,
+                queue,
+                getMessages(
+                        String.format("kafka/debezium/table/%s/debezium-data-1.txt", sourceDir)));
+
+        Map<String, String> rabbitmqConfig = getBasicRabbitmqConfig();
+        rabbitmqConfig.put(QUEUE_NAME.key(), queue);
+        rabbitmqConfig.put(EXCHANGE.key(), exchange);
+        rabbitmqConfig.put(EXCHANGE_TYPE.key(), BuiltinExchangeType.FANOUT.name());
+        rabbitmqConfig.put(RabbitmqActionUtils.VALUE_FORMAT.key(), "debezium-json");
+
+        RabbitmqSyncTableAction action =
+                syncTableActionBuilder(rabbitmqConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        testSchemaEvolutionImpl(queue, sourceDir);
+    }
+
+    private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception {
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        List<String> expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+                        "+I[102, car battery, 12V car battery, 8.1]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        // ---------- Write the debezium json into rabbitmq -------------------
+        sendMessages(
+                exchange,
+                topic,
+                getMessages(
+                        String.format("kafka/debezium/table/%s/debezium-data-2.txt", sourceDir)));
+
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight", "age"});
+        expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]",
+                        "+I[102, car battery, 12V car battery, 8.1, NULL]",
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        sendMessages(
+                exchange,
+                topic,
+                getMessages(
+                        String.format("kafka/debezium/table/%s/debezium-data-3.txt", sourceDir)));
+
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight", "age", "address"});
+        expected =
+                Arrays.asList(
+                        "+I[102, car battery, 12V car battery, 8.1, NULL, NULL]",
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18, NULL]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]",
+                        "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]",
+                        "+I[107, rocks, box of assorted rocks, 5.3, NULL, NULL]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSchemaITCase.java
new file mode 100644
index 000000000000..baab11fe0060
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/rabbitmq/RabbitmqSchemaITCase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.rabbitmq;
+
+import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE;
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.EXCHANGE_TYPE;
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.QUEUE_NAME;
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.VALUE_FORMAT;
+import static org.apache.paimon.flink.action.cdc.rabbitmq.RabbitmqActionUtils.getDataFormat;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for building schema from Rabbitmq. */
+public class RabbitmqSchemaITCase extends RabbitmqActionITCaseBase {
+
+    @Test
+    @Timeout(120)
+    public void testPulsarSchema() throws Exception {
+        String queue = "test_rabbitmq_schema";
+        createQueue(exchange, queue);
+
+        // ---------- Write the Canal json into rabbitmq -------------------
+        List<String> messages = getMessages("kafka/canal/table/schemaevolution/canal-data-1.txt");
+        sendMessages(exchange, queue, messages);
+
+        Configuration rabbitmqConfig = Configuration.fromMap(getBasicRabbitmqConfig());
+        rabbitmqConfig.set(QUEUE_NAME, queue);
+        rabbitmqConfig.set(EXCHANGE, exchange);
+        rabbitmqConfig.set(EXCHANGE_TYPE, BuiltinExchangeType.FANOUT.name());
+        rabbitmqConfig.set(VALUE_FORMAT, "canal-json");
+
+        Schema rabbitmqSchema =
+                MessageQueueSchemaUtils.getSchema(
+                        RabbitmqActionUtils.createRabbitmqConsumer(rabbitmqConfig, queue),
+                        queue,
+                        getDataFormat(rabbitmqConfig),
+                        TypeMapping.defaultMapping());
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "pt", DataTypes.INT()));
+        fields.add(new DataField(1, "_id", DataTypes.INT().notNull()));
+        fields.add(new DataField(2, "v1", DataTypes.VARCHAR(10)));
+        assertThat(rabbitmqSchema.fields()).isEqualTo(fields);
+    }
+}