Skip to content

Commit

Permalink
add e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Nov 15, 2023
1 parent d601035 commit 4667774
Show file tree
Hide file tree
Showing 8 changed files with 760 additions and 142 deletions.
7 changes: 7 additions & 0 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@

import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.utils.StringUtils;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.GetResponse;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand All @@ -48,6 +48,14 @@ public class RabbitmqActionUtils {
.stringType()
.noDefaultValue()
.withDescription("Defines the format identifier for encoding value data.");
static final ConfigOption<String> EXCHANGE =
ConfigOptions.key("exchange").stringType().noDefaultValue().withDescription(".");

static final ConfigOption<String> EXCHANGE_TYPE =
ConfigOptions.key("exchange.type").stringType().noDefaultValue().withDescription(".");

static final ConfigOption<String> ROUTING_KEY =
ConfigOptions.key("routing.key").stringType().noDefaultValue().withDescription(".");

static final ConfigOption<String> QUEUE_NAME =
ConfigOptions.key("queue.name")
Expand Down Expand Up @@ -148,11 +156,32 @@ public class RabbitmqActionUtils {
.withDescription(
"Retrieve the message delivery timeout used in the queueing consumer. If not specified explicitly, the default value of 30000 milliseconds will be returned.");

static RMQSource<String> buildRabbitmqSource(Configuration rabbitmqConfig) {
static RabbitmqSource buildRabbitmqSource(Configuration rabbitmqConfig) {
validateRabbitmqConfig(rabbitmqConfig);
String queueName = rabbitmqConfig.get(QUEUE_NAME);
String exchange = null;
if (rabbitmqConfig.contains(EXCHANGE)) {
exchange = rabbitmqConfig.get(EXCHANGE);
}

BuiltinExchangeType exchangeType = null;
if (rabbitmqConfig.contains(EXCHANGE_TYPE)) {
exchangeType = BuiltinExchangeType.valueOf(rabbitmqConfig.get(EXCHANGE_TYPE));
}

String routingKey = null;
if (rabbitmqConfig.contains(ROUTING_KEY)) {
routingKey = rabbitmqConfig.get(ROUTING_KEY);
}

RMQConnectionConfig connectionConfig = setOptionConfig(rabbitmqConfig);
return new RMQSource<>(connectionConfig, queueName, new SimpleStringSchema());
return new RabbitmqSource(
connectionConfig,
exchange,
exchangeType,
routingKey,
queueName,
new SimpleStringSchema());
}

private static RMQConnectionConfig setOptionConfig(Configuration rabbitmqConfig) {
Expand Down Expand Up @@ -229,32 +258,44 @@ private static void validateRabbitmqConfig(Configuration rabbitmqConfig) {
static MessageQueueSchemaUtils.ConsumerWrapper createRabbitmqConsumer(
Configuration rabbitmqConfig, String queueName) {
RMQConnectionConfig connectionConfig = setOptionConfig(rabbitmqConfig);

try {
Connection connection = connectionConfig.getConnectionFactory().newConnection();
Channel channel = setupChannel(connectionConfig, connection);
Channel channel = setupChannel(connection);
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
}
setupQueue(channel, queueName);
QueueingConsumer consumer = new QueueingConsumer(connection, channel);
channel.basicConsume(queueName, true, consumer);
return new RabbitmqConsumerWrapper(consumer);
setupQueue(channel, rabbitmqConfig, queueName);
return new RabbitmqConsumerWrapper(connection, channel);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static void setupQueue(Channel channel, String queueName) throws IOException {
private static void setupQueue(Channel channel, Configuration rabbitmqConfig, String queueName)
throws IOException {
;
channel.queueDeclare(queueName, true, false, false, null);
String exchange = null;
if (rabbitmqConfig.contains(EXCHANGE)) {
exchange = rabbitmqConfig.get(EXCHANGE);
}
BuiltinExchangeType exchangeType = null;
if (rabbitmqConfig.contains(EXCHANGE_TYPE)) {
exchangeType = BuiltinExchangeType.valueOf(rabbitmqConfig.get(EXCHANGE_TYPE));
}
String routingKey = "";
if (rabbitmqConfig.contains(ROUTING_KEY)) {
routingKey = rabbitmqConfig.get(ROUTING_KEY);
}
if (!StringUtils.isBlank(exchange) && exchangeType != null) {
channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);
channel.queueBind(queueName, exchange, routingKey);
}
}

private static Channel setupChannel(
RMQConnectionConfig rmqConnectionConfig, Connection connection) throws Exception {
Channel chan = connection.createChannel();
if (rmqConnectionConfig.getPrefetchCount().isPresent()) {
chan.basicQos(rmqConnectionConfig.getPrefetchCount().get(), true);
}
return chan;
private static Channel setupChannel(Connection connection) throws Exception {
return connection.createChannel();
}

static DataFormat getDataFormat(Configuration rabbitmqConfig) {
Expand All @@ -264,28 +305,31 @@ static DataFormat getDataFormat(Configuration rabbitmqConfig) {
private static class RabbitmqConsumerWrapper
implements MessageQueueSchemaUtils.ConsumerWrapper {

private final QueueingConsumer consumer;
private final Channel channel;
private final Connection connection;

RabbitmqConsumerWrapper(QueueingConsumer consumer) {
this.consumer = consumer;
RabbitmqConsumerWrapper(Connection connection, Channel channel) {
this.channel = channel;
this.connection = connection;
}

@Override
public List<String> getRecords(String topic, int pollTimeOutMills) {
public List<String> getRecords(String queue, int pollTimeOutMills) {
try {
Delivery delivery = consumer.nextDelivery(pollTimeOutMills, TimeUnit.MILLISECONDS);
return delivery == null
GetResponse response = channel.basicGet(queue, false);
return response == null
? Collections.emptyList()
: Collections.singletonList(
new String(delivery.getBody(), StandardCharsets.UTF_8));
} catch (InterruptedException e) {
new String(response.getBody(), StandardCharsets.UTF_8));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws IOException, TimeoutException {
consumer.close();
channel.close();
connection.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.rabbitmq;

import org.apache.paimon.utils.StringUtils;

import com.rabbitmq.client.BuiltinExchangeType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

import java.io.IOException;

public class RabbitmqSource extends RMQSource<String> {
private String exchange;
private BuiltinExchangeType exchangeType;
private String routingKey;

public RabbitmqSource(
RMQConnectionConfig rmqConnectionConfig,
String exchange,
BuiltinExchangeType exchangeType,
String routingKey,
String queueName,
DeserializationSchema<String> deserializationSchema) {
super(rmqConnectionConfig, queueName, deserializationSchema);
this.exchange = exchange;
this.exchangeType = exchangeType;
this.routingKey = routingKey;
}

@Override
protected void setupQueue() throws IOException {
super.setupQueue();
if (!StringUtils.isBlank(exchange) && exchangeType != null) {
channel.exchangeDeclare(exchange, exchangeType);
channel.queueBind(
queueName, exchange, StringUtils.isBlank(routingKey) ? "" : routingKey);
}
}
}
Loading

0 comments on commit 4667774

Please sign in to comment.