Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Nov 14, 2023
1 parent d9cac3e commit 1d401ba
Show file tree
Hide file tree
Showing 3 changed files with 392 additions and 0 deletions.
7 changes: 7 additions & 0 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.rabbitmq;

import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static org.assertj.core.api.Assertions.assertThat;

/** Base test class for Rabbitmq synchronization. */
public class RabbitmqActionITCaseBase extends CdcActionITCaseBase {
private static final Logger LOG = LoggerFactory.getLogger(RabbitmqActionITCaseBase.class);
private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
public static final String RABBITMQ = "rabbitmq:3.9.8-management-alpine";
private static final int RABBITMQ_PORT = 5672;
private final ObjectMapper objectMapper = new ObjectMapper();

@RegisterExtension
public static final RabbitMQContainer RMQ_CONTAINER =
new RabbitMQContainer(DockerImageName.parse(RABBITMQ))
.withExposedPorts(RABBITMQ_PORT)
.withLogConsumer(LOG_CONSUMER);

private Connection connection;
private Channel channel;

@BeforeEach
public void setup() throws Exception {
connection = getRMQConnection();
channel = connection.createChannel();
}

private static Connection getRMQConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(RMQ_CONTAINER.getAdminUsername());
factory.setPassword(RMQ_CONTAINER.getAdminPassword());
factory.setVirtualHost("/");
factory.setHost(RMQ_CONTAINER.getHost());
factory.setPort(RMQ_CONTAINER.getAmqpPort());
return factory.newConnection();
}

@AfterEach
public void after() throws Exception {
super.after();
// Delete topics for avoid reusing topics of pulsar cluster
deleteTopics();
channel.close();
connection.close();
}

protected Map<String, String> getBasicRabbitmqConfig() {
Map<String, String> config = new HashMap<>();
config.put(RabbitmqActionUtils.RABBITMQ_USERNAME.key(), RMQ_CONTAINER.getAdminUsername());
config.put(RabbitmqActionUtils.RABBITMQ_PASSWORD.key(), RMQ_CONTAINER.getAdminPassword());
config.put(RabbitmqActionUtils.RABBITMQ_VIRTUAL_HOST.key(), "/");
config.put(RabbitmqActionUtils.RABBITMQ_HOST.key(), RMQ_CONTAINER.getHost());
config.put(RabbitmqActionUtils.RABBITMQ_PORT.key(), RMQ_CONTAINER.getAmqpPort().toString());
return config;
}

protected void createQueue(String queueName) {
try {
channel.queueDeclare(queueName, true, false, false, null);
channel.queuePurge(queueName);
channel.txSelect();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void deleteTopics() throws Exception {}

protected List<String> getMessages(String resource) throws IOException {
URL url = RabbitmqActionITCaseBase.class.getClassLoader().getResource(resource);
assertThat(url).isNotNull();
java.nio.file.Path path = new File(url.getFile()).toPath();
List<String> lines = Files.readAllLines(path);
List<String> messages = new ArrayList<>();
for (String line : lines) {
try {
objectMapper.readTree(line);
if (!StringUtils.isEmpty(line)) {
messages.add(line);
}
} catch (Exception e) {
// ignore
}
}

return messages;
}

/**
* Send a list messages to Rabbitmq.
*
* @param queueName The name of the queue.
* @param messages The records need to be sent.
*/
public static void sendMessages(String queueName, List<String> messages)
throws IOException, TimeoutException {
AMQP.BasicProperties.Builder propertiesBuilder = new AMQP.BasicProperties.Builder();
try (Connection rmqConnection = getRMQConnection();
Channel channel = rmqConnection.createChannel()) {
for (String msg : messages) {
AMQP.BasicProperties properties = propertiesBuilder.correlationId(msg).build();
channel.basicPublish(
"", queueName, properties, msg.getBytes(StandardCharsets.UTF_8));
}
}
}

protected RabbitmqSyncTableActionBuilder syncTableActionBuilder(
Map<String, String> rabbitmqConfig) {
return new RabbitmqSyncTableActionBuilder(rabbitmqConfig);
}

/** Builder to build {@link RabbitmqSyncTableAction} from action arguments. */
protected class RabbitmqSyncTableActionBuilder
extends SyncTableActionBuilder<RabbitmqSyncTableAction> {

public RabbitmqSyncTableActionBuilder(Map<String, String> rabbitmqConfig) {
super(rabbitmqConfig);
}

public RabbitmqSyncTableAction build() {
List<String> args =
new ArrayList<>(
Arrays.asList(
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName));

args.addAll(mapToArgs("--rabbitmq-conf", sourceConfig));
args.addAll(mapToArgs("--catalog-conf", catalogConfig));
args.addAll(mapToArgs("--table-conf", tableConfig));

args.addAll(listToArgs("--partition-keys", partitionKeys));
args.addAll(listToArgs("--primary-keys", primaryKeys));
args.addAll(listToArgs("--type-mapping", typeMappingModes));

args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));

MultipleParameterTool params =
MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
return (RabbitmqSyncTableAction)
new RabbitmqSyncTableActionFactory()
.create(params)
.orElseThrow(RuntimeException::new);
}
}
}
Loading

0 comments on commit 1d401ba

Please sign in to comment.