From 02ca8b498ca422b0bdb07980b2d2e4e65a3382e0 Mon Sep 17 00:00:00 2001 From: Fan Lin Date: Fri, 16 Sep 2022 05:16:29 +0000 Subject: [PATCH] support regex replacement for sink table naming --- .../connect/jdbc/sink/JdbcDbWriter.java | 4 +- .../connect/jdbc/util/NameUtils.java | 45 +++++++++++++++++++ .../connect/jdbc/util/NameUtilsTest.java | 25 +++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/confluent/connect/jdbc/util/NameUtils.java create mode 100644 src/test/java/io/confluent/connect/jdbc/util/NameUtilsTest.java diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java index 123bc2519..7750ee1a4 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java @@ -30,6 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.confluent.connect.jdbc.util.NameUtils.toTableName; + public class JdbcDbWriter { private static final Logger log = LoggerFactory.getLogger(JdbcDbWriter.class); @@ -97,7 +99,7 @@ void closeQuietly() { } TableId destinationTable(String topic) { - final String tableName = config.tableNameFormat.replace("${topic}", topic); + final String tableName = toTableName(config.tableNameFormat, topic); if (tableName.isEmpty()) { throw new ConnectException(String.format( "Destination table name for topic '%s' is empty using the format string '%s'", diff --git a/src/main/java/io/confluent/connect/jdbc/util/NameUtils.java b/src/main/java/io/confluent/connect/jdbc/util/NameUtils.java new file mode 100644 index 000000000..890848f68 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/NameUtils.java @@ -0,0 +1,45 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.util; + +public class NameUtils { + + private static final String TOPIC_PREFIX = "${topic"; + private static final String TOPIC_POSTFIX = "}"; + + public static String toTableName(String rawName, String topic) { + int topicStartPos = rawName.indexOf(TOPIC_PREFIX); + int topicEndPos = rawName.lastIndexOf(TOPIC_POSTFIX); + if (topicStartPos >= 0) { + if (topicEndPos == topicStartPos + TOPIC_PREFIX.length()) { + return rawName.replace(TOPIC_PREFIX + TOPIC_POSTFIX, topic); + } else if (topicEndPos > topicStartPos + TOPIC_PREFIX.length()) { + int subStartPos = topicStartPos + TOPIC_PREFIX.length() + 1; + String splitter = rawName.substring(subStartPos - 1, subStartPos); + int subMidPos = rawName.indexOf(splitter, subStartPos); + int subEndPos = rawName.indexOf(splitter, subMidPos + 1); + if (subMidPos > 0 && subEndPos > 0) { + return rawName.substring(0, topicStartPos) + + topic.replaceAll(rawName.substring(subStartPos, subMidPos), + rawName.substring(subMidPos + 1, subEndPos)) + + rawName.substring(topicEndPos + 1); + } + } + } + return rawName; + } + +} diff --git a/src/test/java/io/confluent/connect/jdbc/util/NameUtilsTest.java b/src/test/java/io/confluent/connect/jdbc/util/NameUtilsTest.java new file mode 100644 index 000000000..658f41547 --- /dev/null +++ b/src/test/java/io/confluent/connect/jdbc/util/NameUtilsTest.java @@ -0,0 +1,25 @@ +package io.confluent.connect.jdbc.util; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class NameUtilsTest { + + @Test + public void testToTableName() { + // No replacement + assertEquals("kafka_connect", + NameUtils.toTableName("kafka_connect", "anything")); + // Replace topic name + assertEquals("kafka_connect", + NameUtils.toTableName("${topic}", "kafka_connect")); + assertEquals("pre_kafka_connect_post", + NameUtils.toTableName("pre_${topic}_post", "kafka_connect")); + // Replace topic name with regex + assertEquals("pre_kafka_connect_post", + NameUtils.toTableName("${topic/^.+\\.ao_/pre_/}_post", "io.confluent.connect.jdbc.ao_kafka_connect")); + assertEquals("pre_kafka_connect_post", + NameUtils.toTableName("pre_${topic#^.+\\.ao_##}_post", "io.confluent.connect.jdbc.ao_kafka_connect")); + } +}