Skip to content

Commit

Permalink
Add rabbitmq action
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Nov 15, 2023
1 parent b781397 commit d601035
Show file tree
Hide file tree
Showing 6 changed files with 583 additions and 10 deletions.
9 changes: 8 additions & 1 deletion paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ under the License.
<json-path.version>2.8.0</json-path.version>
<mongodb.testcontainers.version>1.18.3</mongodb.testcontainers.version>
<flink.connector.pulsar.version>4.0.0-1.17</flink.connector.pulsar.version>
<flink.connector.rabbitmq.version>3.0.1-1.17</flink.connector.rabbitmq.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -96,6 +97,13 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq</artifactId>
<version>${flink.connector.rabbitmq.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
Expand All @@ -110,7 +118,6 @@ under the License.
<scope>provided</scope>
</dependency>


<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -137,7 +139,7 @@ public MessageQueueSyncTableActionBase withTypeMapping(TypeMapping typeMapping)
return this;
}

protected abstract Source<String, ?, ?> buildSource();
protected abstract Object buildSource();

protected abstract String topic();

Expand All @@ -149,7 +151,7 @@ public MessageQueueSyncTableActionBase withTypeMapping(TypeMapping typeMapping)

@Override
public void build() throws Exception {
Source<String, ?, ?> source = buildSource();
Object source = buildSource();

catalog.createDatabase(database, true);
boolean caseSensitive = catalog.caseSensitive();
Expand Down Expand Up @@ -210,13 +212,7 @@ public void build() throws Exception {

CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
new CdcSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
sourceName())
.flatMap(recordParser)
.name("Parse"))
.withInput(fromSource(source).flatMap(recordParser).name("Parse"))
.withParserFactory(parserFactory)
.withTable(fileStoreTable)
.withIdentifier(identifier)
Expand All @@ -228,6 +224,17 @@ public void build() throws Exception {
sinkBuilder.build();
}

private DataStreamSource<String> fromSource(Object s) {
if (s instanceof Source) {
return env.fromSource(
(Source<String, ?, ?>) s, WatermarkStrategy.noWatermarks(), sourceName());
}
if (s instanceof SourceFunction) {
return env.addSource((SourceFunction<String>) s, sourceName());
}
throw new UnsupportedOperationException("Unsupported source type");
}

private Schema retrieveSchema() throws Exception {
String topic = topic();
try (MessageQueueSchemaUtils.ConsumerWrapper consumer = consumer(topic)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class QueueingConsumer extends DefaultConsumer {

private final Connection connection;
private final Channel channel;
private final BlockingQueue<Delivery> queue;
private volatile ShutdownSignalException shutdown;
private volatile ConsumerCancelledException cancelled;
private static final Delivery POISON = new Delivery(null, null, null);

public QueueingConsumer(Connection connection, Channel channel) {
this(connection, channel, Integer.MAX_VALUE);
}

public QueueingConsumer(Connection connection, Channel channel, int capacity) {
super(channel);
this.connection = connection;
this.channel = channel;
this.queue = new LinkedBlockingQueue(capacity);
}

private void checkShutdown() {
if (this.shutdown != null) {
throw Utility.fixStackTrace(this.shutdown);
}
}

private Delivery handle(Delivery delivery) {
if (delivery == POISON
|| delivery == null && (this.shutdown != null || this.cancelled != null)) {
if (delivery == POISON) {
this.queue.add(POISON);
if (this.shutdown == null && this.cancelled == null) {
throw new IllegalStateException(
"POISON in queue, but null shutdown and null cancelled. This should never happen, please report as a BUG");
}
}

if (null != this.shutdown) {
throw Utility.fixStackTrace(this.shutdown);
}

if (null != this.cancelled) {
throw Utility.fixStackTrace(this.cancelled);
}
}

return delivery;
}

public Delivery nextDelivery(long timeout, TimeUnit unit)
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
return this.handle(this.queue.poll(timeout, unit));
}

public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
this.shutdown = sig;
this.queue.add(POISON);
}

public void handleCancel(String consumerTag) {
this.cancelled = new ConsumerCancelledException();
this.queue.add(POISON);
}

public void handleDelivery(
String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
this.checkShutdown();
this.queue.add(new Delivery(envelope, properties, body));
}

public void close() throws IOException, TimeoutException {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
Loading

0 comments on commit d601035

Please sign in to comment.