Skip to content

Commit 2489f64

Browse files
authored
[Improve][Connector] Add multi-table sink option check (apache#7360)
* [Improve][Connector] Add multi-table sink option check * fix
1 parent fa34ac9 commit 2489f64

File tree

16 files changed

+58
-12
lines changed

16 files changed

+58
-12
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ public class SinkCommonOptions {
2828
Options.key("multi_table_sink_replica")
2929
.intType()
3030
.defaultValue(1)
31-
.withDescription("The replica number of multi table sink");
31+
.withDescription("The replica number of multi table sink writer");
3232
}

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2122
import org.apache.seatunnel.api.table.connector.TableSink;
2223
import org.apache.seatunnel.api.table.factory.Factory;
2324
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -37,7 +38,10 @@ public String factoryIdentifier() {
3738

3839
@Override
3940
public OptionRule optionRule() {
40-
return OptionRule.builder().required(RULES).build();
41+
return OptionRule.builder()
42+
.required(RULES)
43+
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
44+
.build();
4145
}
4246

4347
@Override

seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.seatunnel.api.configuration.Options;
2222
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2323
import org.apache.seatunnel.api.configuration.util.OptionRule;
24+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2425
import org.apache.seatunnel.api.table.connector.TableSink;
2526
import org.apache.seatunnel.api.table.factory.Factory;
2627
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -52,7 +53,10 @@ public String factoryIdentifier() {
5253

5354
@Override
5455
public OptionRule optionRule() {
55-
return OptionRule.builder().optional(LOG_PRINT_DATA, LOG_PRINT_DELAY).build();
56+
return OptionRule.builder()
57+
.optional(
58+
LOG_PRINT_DATA, LOG_PRINT_DELAY, SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
59+
.build();
5660
}
5761

5862
@Override

seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2222
import org.apache.seatunnel.api.configuration.util.OptionRule;
23+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2324
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2425
import org.apache.seatunnel.api.table.connector.TableSink;
2526
import org.apache.seatunnel.api.table.factory.Factory;
@@ -40,7 +41,10 @@ public String factoryIdentifier() {
4041

4142
@Override
4243
public OptionRule optionRule() {
43-
return OptionRule.builder().required(COORDINATOR_URL, DATASOURCE).build();
44+
return OptionRule.builder()
45+
.required(COORDINATOR_URL, DATASOURCE)
46+
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
47+
.build();
4448
}
4549

4650
@Override

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2223
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2324
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2425
import org.apache.seatunnel.api.table.connector.TableSink;
@@ -69,7 +70,8 @@ public OptionRule optionRule() {
6970
TLS_KEY_STORE_PATH,
7071
TLS_KEY_STORE_PASSWORD,
7172
TLS_TRUST_STORE_PATH,
72-
TLS_TRUST_STORE_PASSWORD)
73+
TLS_TRUST_STORE_PASSWORD,
74+
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
7375
.build();
7476
}
7577

seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2223
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2324
import org.apache.seatunnel.api.table.connector.TableSink;
2425
import org.apache.seatunnel.api.table.factory.Factory;
@@ -48,6 +49,7 @@ public OptionRule optionRule() {
4849
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
4950
.optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
5051
.optional(BaseSinkConfig.DATA_SAVE_MODE)
52+
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
5153
.conditional(
5254
BaseSinkConfig.FILE_FORMAT_TYPE,
5355
FileFormat.TEXT,

seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2223
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2324
import org.apache.seatunnel.api.table.connector.TableSink;
2425
import org.apache.seatunnel.api.table.factory.Factory;
@@ -102,6 +103,7 @@ public OptionRule optionRule() {
102103
.optional(BaseSinkConfig.DATE_FORMAT)
103104
.optional(BaseSinkConfig.DATETIME_FORMAT)
104105
.optional(BaseSinkConfig.TIME_FORMAT)
106+
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
105107
.build();
106108
}
107109
}

seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2223
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2324
import org.apache.seatunnel.api.table.connector.TableSink;
2425
import org.apache.seatunnel.api.table.factory.Factory;
@@ -103,6 +104,7 @@ public OptionRule optionRule() {
103104
.optional(BaseSinkConfig.DATETIME_FORMAT)
104105
.optional(BaseSinkConfig.TIME_FORMAT)
105106
.optional(BaseSinkConfig.TMP_PATH)
107+
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
106108
.build();
107109
}
108110

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.http.sink;
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2122
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2223
import org.apache.seatunnel.api.table.connector.TableSink;
2324
import org.apache.seatunnel.api.table.factory.Factory;
@@ -49,6 +50,7 @@ public OptionRule optionRule() {
4950
.optional(HttpConfig.RETRY)
5051
.optional(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)
5152
.optional(HttpConfig.RETRY_BACKOFF_MAX_MS)
53+
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
5254
.build();
5355
}
5456
}

seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.seatunnel.connectors.seatunnel.hudi.sink;
2020

2121
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2223
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2324
import org.apache.seatunnel.api.table.connector.TableSink;
2425
import org.apache.seatunnel.api.table.factory.Factory;
@@ -61,7 +62,8 @@ public OptionRule optionRule() {
6162
INSERT_SHUFFLE_PARALLELISM,
6263
UPSERT_SHUFFLE_PARALLELISM,
6364
MIN_COMMITS_TO_KEEP,
64-
MAX_COMMITS_TO_KEEP)
65+
MAX_COMMITS_TO_KEEP,
66+
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
6567
.build();
6668
}
6769

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2223
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2324
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2425
import org.apache.seatunnel.api.table.connector.TableSink;
@@ -57,7 +58,8 @@ public OptionRule optionRule() {
5758
SinkConfig.TABLE_DEFAULT_PARTITION_KEYS,
5859
SinkConfig.TABLE_UPSERT_MODE_ENABLED_PROP,
5960
SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP,
60-
SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH)
61+
SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH,
62+
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
6163
.build();
6264
}
6365

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2223
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2324
import org.apache.seatunnel.api.table.connector.TableSink;
2425
import org.apache.seatunnel.api.table.factory.Factory;
@@ -65,7 +66,8 @@ public OptionRule optionRule() {
6566
KEY_TIME,
6667
BATCH_SIZE,
6768
MAX_RETRIES,
68-
RETRY_BACKOFF_MULTIPLIER_MS)
69+
RETRY_BACKOFF_MULTIPLIER_MS,
70+
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
6971
.build();
7072
}
7173

seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2223
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2324
import org.apache.seatunnel.api.table.connector.TableSink;
2425
import org.apache.seatunnel.api.table.factory.Factory;
@@ -56,6 +57,7 @@ public OptionRule optionRule() {
5657
.optional(KuduSinkConfig.IGNORE_DUPLICATE)
5758
.optional(KuduSinkConfig.ENABLE_KERBEROS)
5859
.optional(KuduSinkConfig.KERBEROS_KRB5_CONF)
60+
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
5961
.conditional(
6062
KuduSinkConfig.FLUSH_MODE,
6163
Arrays.asList(AUTO_FLUSH_BACKGROUND.name(), MANUAL_FLUSH.name()),

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2223
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2324
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2425
import org.apache.seatunnel.api.table.connector.TableSink;
@@ -54,7 +55,8 @@ public OptionRule optionRule() {
5455
PaimonSinkConfig.DATA_SAVE_MODE,
5556
PaimonSinkConfig.PRIMARY_KEYS,
5657
PaimonSinkConfig.PARTITION_KEYS,
57-
PaimonSinkConfig.WRITE_PROPS)
58+
PaimonSinkConfig.WRITE_PROPS,
59+
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
5860
.conditional(
5961
PaimonConfig.CATALOG_TYPE, PaimonCatalogEnum.HIVE, PaimonConfig.CATALOG_URI)
6062
.build();

seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.redis.sink;
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2122
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2223
import org.apache.seatunnel.api.table.connector.TableSink;
2324
import org.apache.seatunnel.api.table.factory.Factory;
@@ -51,7 +52,8 @@ public OptionRule optionRule() {
5152
RedisConfig.USER,
5253
RedisConfig.KEY_PATTERN,
5354
RedisConfig.FORMAT,
54-
RedisConfig.EXPIRE)
55+
RedisConfig.EXPIRE,
56+
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
5557
.conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, RedisConfig.NODES)
5658
.build();
5759
}

seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.seatunnel.api.connector;
1919

20+
import org.apache.seatunnel.api.configuration.util.OptionRule;
2021
import org.apache.seatunnel.api.sink.SeaTunnelSink;
22+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2123
import org.apache.seatunnel.api.sink.SinkWriter;
2224
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
2325
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
@@ -152,16 +154,26 @@ public void testAllConnectorImplementFactoryWithUpToDateMethod() throws ClassNot
152154
log.info(
153155
"Check sink connector {} successfully", factory.getClass().getSimpleName());
154156

155-
checkSupportMultiTableSink(sinkClass);
157+
checkSupportMultiTableSink(factory, sinkClass);
156158
}
157159
}
158160
}
159161

160-
private void checkSupportMultiTableSink(Class<? extends SeaTunnelSink> sinkClass) {
162+
private void checkSupportMultiTableSink(
163+
TableSinkFactory sinkFactory, Class<? extends SeaTunnelSink> sinkClass) {
161164
if (!SupportMultiTableSink.class.isAssignableFrom(sinkClass)) {
162165
return;
163166
}
164167

168+
OptionRule sinkOptionRule = sinkFactory.optionRule();
169+
Assertions.assertTrue(
170+
sinkOptionRule
171+
.getOptionalOptions()
172+
.contains(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA),
173+
"Please add `SinkCommonOptions.MULTI_TABLE_SINK_REPLICA` optional into the `optionRule` method optional of `"
174+
+ sinkFactory.getClass().getSimpleName()
175+
+ "`");
176+
165177
// Validate the `createWriter` method return type
166178
Optional<Method> createWriter =
167179
ReflectionUtils.getDeclaredMethod(

0 commit comments

Comments
 (0)