From d71337b0e9931db66076be5fcda49db00a31486a Mon Sep 17 00:00:00 2001 From: Jast Date: Wed, 28 Aug 2024 10:52:32 +0800 Subject: [PATCH] [Feature][Core] Add event notify for all connector (#7501) * [feature]add event notify * [feature]add event notify * [fixbug]fix some problem * [feature]fix some problem * [feature]fix some problem --- .../sink/multitablesink/MultiTableSink.java | 9 +++- .../multitablesink/MultiTableSinkWriter.java | 46 ++++++++++++------- .../console/sink/ConsoleSinkWriter.java | 5 +- .../fake/source/FakeSourceReader.java | 10 +--- .../source/FakeSourceSplitEnumerator.java | 10 +--- .../FakeSourceToConsoleWithEventReportIT.java | 2 +- .../task/SourceSplitEnumeratorTask.java | 4 ++ .../server/task/flow/SinkFlowLifeCycle.java | 17 +++---- .../server/task/flow/SourceFlowLifeCycle.java | 23 ++++++---- .../translation/source/CoordinatedSource.java | 11 +++++ .../translation/source/ParallelSource.java | 8 ++++ .../translation/flink/sink/FlinkSink.java | 7 +-- .../flink/sink/FlinkSinkWriter.java | 8 +++- .../flink/source/FlinkSourceEnumerator.java | 6 +++ .../flink/source/FlinkSourceReader.java | 4 ++ .../spark/sink/writer/SparkDataWriter.java | 7 ++- .../sink/writer/SparkDataWriterFactory.java | 2 +- .../sink/write/SeaTunnelSparkDataWriter.java | 7 ++- .../SeaTunnelSparkDataWriterFactory.java | 2 +- 19 files changed, 118 insertions(+), 70 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java index 923ecff8b88..3f7f7fa9c6a 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java @@ -64,6 +64,7 @@ public String getPluginName() { public SinkWriter createWriter( SinkWriter.Context context) throws IOException { Map> writers = new HashMap<>(); + Map sinkWritersContext = new HashMap<>(); for (int i = 0; i < replicaNum; i++) { for (String tableIdentifier : sinks.keySet()) { SeaTunnelSink sink = sinks.get(tableIdentifier); @@ -71,15 +72,18 @@ public SinkWriter createWri writers.put( SinkIdentifier.of(tableIdentifier, index), sink.createWriter(new SinkContextProxy(index, context))); + sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context); } } - return new MultiTableSinkWriter(writers, replicaNum); + return new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext); } @Override public SinkWriter restoreWriter( SinkWriter.Context context, List states) throws IOException { Map> writers = new HashMap<>(); + Map sinkWritersContext = new HashMap<>(); + for (int i = 0; i < replicaNum; i++) { for (String tableIdentifier : sinks.keySet()) { SeaTunnelSink sink = sinks.get(tableIdentifier); @@ -102,9 +106,10 @@ public SinkWriter restoreWr sinkIdentifier, sink.restoreWriter(new SinkContextProxy(index, context), state)); } + sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context); } } - return new MultiTableSinkWriter(writers, replicaNum); + return new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext); } @Override diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java index 3c73435fafb..38234e220c5 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.sink.MultiTableResourceManager; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -45,6 +46,7 @@ public class MultiTableSinkWriter implements SinkWriter { private final Map> sinkWriters; + private final Map sinkWritersContext; private final Map> sinkPrimaryKeys = new HashMap<>(); private final List>> sinkWritersWithIndex; private final List runnable = new ArrayList<>(); @@ -55,8 +57,11 @@ public class MultiTableSinkWriter private volatile boolean submitted = false; public MultiTableSinkWriter( - Map> sinkWriters, int queueSize) { + Map> sinkWriters, + int queueSize, + Map sinkWritersContext) { this.sinkWriters = sinkWriters; + this.sinkWritersContext = sinkWritersContext; AtomicInteger cnt = new AtomicInteger(0); executorService = Executors.newFixedThreadPool( @@ -84,6 +89,7 @@ public MultiTableSinkWriter( entry.getKey().getTableIdentifier(), entry.getValue()); sinkIdentifierMap.put(entry.getKey(), entry.getValue()); }); + sinkWritersWithIndex.add(sinkIdentifierMap); blockingQueues.add(queue); MultiTableWriterRunnable r = new MultiTableWriterRunnable(tableIdWriterMap, queue); @@ -267,26 +273,34 @@ public void abortPrepare() { @Override public void close() throws IOException { - Throwable firstE = null; + // The variables used in lambda expressions should be final or valid final, so they are + // modified to arrays + final Throwable[] firstE = {null}; try { checkQueueRemain(); } catch (Exception e) { - firstE = e; + firstE[0] = e; } executorService.shutdownNow(); for (int i = 0; i < sinkWritersWithIndex.size(); i++) { synchronized (runnable.get(i)) { - for (SinkWriter sinkWriter : - sinkWritersWithIndex.get(i).values()) { - try { - sinkWriter.close(); - } catch (Throwable e) { - if (firstE == null) { - firstE = e; - } - log.error("close error", e); - } - } + Map> sinkIdentifierSinkWriterMap = + sinkWritersWithIndex.get(i); + sinkIdentifierSinkWriterMap.forEach( + (identifier, sinkWriter) -> { + try { + sinkWriter.close(); + sinkWritersContext + .get(identifier) + .getEventListener() + .onEvent(new WriterCloseEvent()); + } catch (Throwable e) { + if (firstE[0] == null) { + firstE[0] = e; + } + log.error("close error", e); + } + }); } } try { @@ -296,8 +310,8 @@ public void close() throws IOException { } catch (Throwable e) { log.error("close resourceManager error", e); } - if (firstE != null) { - throw new RuntimeException(firstE); + if (firstE[0] != null) { + throw new RuntimeException(firstE[0]); } } diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java index 4c9e6f47605..d83e8b5c96b 100644 --- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java +++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; -import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler; @@ -99,9 +98,7 @@ public void write(SeaTunnelRow element) { } @Override - public void close() { - context.getEventListener().onEvent(new WriterCloseEvent()); - } + public void close() {} private String fieldsInfo(SeaTunnelRowType seaTunnelRowType) { String[] fieldsInfo = new String[seaTunnelRowType.getTotalFields()]; diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java index 95758cb971e..063ece63d2e 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java @@ -20,8 +20,6 @@ import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; -import org.apache.seatunnel.api.source.event.ReaderCloseEvent; -import org.apache.seatunnel.api.source.event.ReaderOpenEvent; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig; import org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig; @@ -73,14 +71,10 @@ public FakeSourceReader( } @Override - public void open() { - context.getEventListener().onEvent(new ReaderOpenEvent()); - } + public void open() {} @Override - public void close() { - context.getEventListener().onEvent(new ReaderCloseEvent()); - } + public void close() {} @Override @SuppressWarnings("MagicNumber") diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java index ecd6d509149..311e8183766 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java @@ -18,8 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent; -import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent; import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig; import org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig; import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState; @@ -58,9 +56,7 @@ public FakeSourceSplitEnumerator( } @Override - public void open() { - enumeratorContext.getEventListener().onEvent(new EnumeratorOpenEvent()); - } + public void open() {} @Override public void run() throws Exception { @@ -69,9 +65,7 @@ public void run() throws Exception { } @Override - public void close() throws IOException { - enumeratorContext.getEventListener().onEvent(new EnumeratorCloseEvent()); - } + public void close() throws IOException {} @Override public void addSplitsBack(List splits, int subtaskId) { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java index 8389cb3c058..8e45bbf9de5 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java @@ -109,7 +109,7 @@ public void testEventReport() throws IOException, InterruptedException { arrayNode.elements().forEachRemaining(jsonNode -> events.add(jsonNode)); } } - Assertions.assertEquals(8, events.size()); + Assertions.assertEquals(10, events.size()); Set eventTypes = events.stream().map(e -> e.get("eventType").asText()).collect(Collectors.toSet()); Assertions.assertTrue( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index d1fc333ade2..8004068ce68 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.SourceEvent; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent; +import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier; import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey; @@ -121,6 +123,7 @@ public void close() throws IOException { super.close(); if (enumerator != null) { enumerator.close(); + enumeratorContext.getEventListener().onEvent(new EnumeratorCloseEvent()); } progress.done(); } @@ -309,6 +312,7 @@ private void stateProcess() throws Exception { if (startCalled && readerRegisterComplete) { currState = STARTING; enumerator.open(); + enumeratorContext.getEventListener().onEvent(new EnumeratorOpenEvent()); } else { Thread.sleep(100); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index cacaa75aaef..3234560fe4b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportResourceShare; +import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; @@ -69,6 +70,7 @@ public class SinkFlowLifeCycle sinkAction; private SinkWriter writer; + private SinkWriter.Context writerContext; private transient Optional> commitInfoSerializer; private transient Optional> writerStateSerializer; @@ -150,6 +152,7 @@ private Address getCommitterTaskAddress() throws ExecutionException, Interrupted public void close() throws IOException { super.close(); writer.close(); + writerContext.getEventListener().onEvent(new WriterCloseEvent()); try { if (resourceManager != null) { resourceManager.close(); @@ -283,19 +286,11 @@ public void restoreState(List actionStateList) throws Except .deserialize(bytes))) .collect(Collectors.toList()); } + this.writerContext = new SinkWriterContext(indexID, metricsContext, eventListener); if (states.isEmpty()) { - this.writer = - sinkAction - .getSink() - .createWriter( - new SinkWriterContext(indexID, metricsContext, eventListener)); + this.writer = sinkAction.getSink().createWriter(writerContext); } else { - this.writer = - sinkAction - .getSink() - .restoreWriter( - new SinkWriterContext(indexID, metricsContext, eventListener), - states); + this.writer = sinkAction.getSink().restoreWriter(writerContext, states); } if (this.writer instanceof SupportResourceShare) { resourceManager = diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java index ca137b3e069..6c596da0c33 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java @@ -23,6 +23,8 @@ import org.apache.seatunnel.api.source.SourceEvent; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.event.ReaderCloseEvent; +import org.apache.seatunnel.api.source.event.ReaderOpenEvent; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.engine.core.checkpoint.CheckpointType; import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener; @@ -83,6 +85,7 @@ public class SourceFlowLifeCycle extends ActionFl private final MetricsContext metricsContext; private final EventListener eventListener; + private SourceReader.Context context; private final AtomicReference schemaChangePhase = new AtomicReference<>(); @@ -111,21 +114,20 @@ public void setCollector(SeaTunnelSourceCollector collector) { @Override public void init() throws Exception { this.splitSerializer = sourceAction.getSource().getSplitSerializer(); - this.reader = - sourceAction - .getSource() - .createReader( - new SourceReaderContext( - indexID, - sourceAction.getSource().getBoundedness(), - this, - metricsContext, - eventListener)); + this.context = + new SourceReaderContext( + indexID, + sourceAction.getSource().getBoundedness(), + this, + metricsContext, + eventListener); + this.reader = sourceAction.getSource().createReader(context); this.enumeratorTaskAddress = getEnumeratorTaskAddress(); } @Override public void open() throws Exception { + context.getEventListener().onEvent(new ReaderOpenEvent()); reader.open(); register(); } @@ -140,6 +142,7 @@ private Address getEnumeratorTaskAddress() throws ExecutionException, Interrupte @Override public void close() throws IOException { + context.getEventListener().onEvent(new ReaderCloseEvent()); reader.close(); super.close(); } diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java index 11b240dd993..4e5d864369f 100644 --- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java @@ -24,6 +24,10 @@ import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent; +import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent; +import org.apache.seatunnel.api.source.event.ReaderCloseEvent; +import org.apache.seatunnel.api.source.event.ReaderOpenEvent; import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory; import lombok.extern.slf4j.Slf4j; @@ -136,6 +140,7 @@ public void open() throws Exception { ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor( parallelism, "parallel-split-enumerator-executor"); splitEnumerator.open(); + coordinatedEnumeratorContext.getEventListener().onEvent(new EnumeratorOpenEvent()); restoredSplitStateMap.forEach( (subtaskId, splits) -> { splitEnumerator.addSplitsBack(splits, subtaskId); @@ -147,6 +152,10 @@ public void open() throws Exception { entry -> { try { entry.getValue().open(); + readerContextMap + .get(entry.getKey()) + .getEventListener() + .onEvent(new ReaderOpenEvent()); splitEnumerator.registerReader(entry.getKey()); } catch (Exception e) { throw new RuntimeException(e); @@ -203,6 +212,7 @@ public void close() throws IOException { for (Map.Entry> entry : readerMap.entrySet()) { readerRunningMap.get(entry.getKey()).set(false); entry.getValue().close(); + readerContextMap.get(entry.getKey()).getEventListener().onEvent(new ReaderCloseEvent()); } if (executorService != null) { @@ -211,6 +221,7 @@ public void close() throws IOException { try (SourceSplitEnumerator closed = splitEnumerator) { // just close the resources + coordinatedEnumeratorContext.getEventListener().onEvent(new EnumeratorCloseEvent()); } } diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java index 4cc1bfd1418..ed794a5b6cb 100644 --- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java @@ -23,6 +23,10 @@ import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent; +import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent; +import org.apache.seatunnel.api.source.event.ReaderCloseEvent; +import org.apache.seatunnel.api.source.event.ReaderOpenEvent; import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory; import org.slf4j.Logger; @@ -115,7 +119,9 @@ public void open() throws Exception { splitEnumerator.addSplitsBack(restoredSplitState, subtaskId); } reader.open(); + readerContext.getEventListener().onEvent(new ReaderOpenEvent()); parallelEnumeratorContext.register(); + parallelEnumeratorContext.getEventListener().onEvent(new EnumeratorOpenEvent()); splitEnumerator.registerReader(subtaskId); } @@ -170,6 +176,8 @@ public void close() throws IOException { if (reader != null) { LOG.debug("Close the data reader for the Apache SeaTunnel source."); reader.close(); + readerContext.getEventListener().onEvent(new ReaderCloseEvent()); + parallelEnumeratorContext.getEventListener().onEvent(new EnumeratorCloseEvent()); } } diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java index 4a720e347b2..2ebbcba4f91 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java @@ -66,10 +66,7 @@ public SinkWriter, FlinkWriterState> if (states == null || states.isEmpty()) { return new FlinkSinkWriter<>( - sink.createWriter(stContext), - 1, - catalogTable.getSeaTunnelRowType(), - stContext.getMetricsContext()); + sink.createWriter(stContext), 1, catalogTable.getSeaTunnelRowType(), stContext); } else { List restoredState = states.stream().map(FlinkWriterState::getState).collect(Collectors.toList()); @@ -77,7 +74,7 @@ public SinkWriter, FlinkWriterState> sink.restoreWriter(stContext, restoredState), states.get(0).getCheckpointId() + 1, catalogTable.getSeaTunnelRowType(), - stContext.getMetricsContext()); + stContext); } } diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java index 725bf606f93..8de831aee17 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.sink.MultiTableResourceManager; import org.apache.seatunnel.api.sink.SupportResourceShare; +import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -53,6 +54,8 @@ public class FlinkSinkWriter private final org.apache.seatunnel.api.sink.SinkWriter sinkWriter; + private final org.apache.seatunnel.api.sink.SinkWriter.Context context; + private final Counter sinkWriteCount; private final Counter sinkWriteBytes; @@ -67,9 +70,11 @@ public class FlinkSinkWriter org.apache.seatunnel.api.sink.SinkWriter sinkWriter, long checkpointId, SeaTunnelDataType dataType, - MetricsContext metricsContext) { + org.apache.seatunnel.api.sink.SinkWriter.Context context) { + this.context = context; this.sinkWriter = sinkWriter; this.checkpointId = checkpointId; + MetricsContext metricsContext = context.getMetricsContext(); this.sinkWriteCount = metricsContext.counter(MetricNames.SINK_WRITE_COUNT); this.sinkWriteBytes = metricsContext.counter(MetricNames.SINK_WRITE_BYTES); this.sinkWriterQPS = metricsContext.meter(MetricNames.SINK_WRITE_QPS); @@ -118,6 +123,7 @@ public List> snapshotState() throws IOException { @Override public void close() throws Exception { sinkWriter.close(); + context.getEventListener().onEvent(new WriterCloseEvent()); try { if (resourceManager != null) { resourceManager.close(); diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java index e457d69f27c..7d8052bfd18 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java @@ -19,6 +19,8 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent; +import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; @@ -49,6 +51,7 @@ public class FlinkSourceEnumerator private final SplitEnumeratorContext> enumeratorContext; + private final SourceSplitEnumerator.Context context; private final int parallelism; private final Object lock = new Object(); @@ -62,12 +65,14 @@ public FlinkSourceEnumerator( SplitEnumeratorContext> enumContext) { this.sourceSplitEnumerator = enumerator; this.enumeratorContext = enumContext; + this.context = new FlinkSourceSplitEnumeratorContext<>(enumeratorContext); this.parallelism = enumeratorContext.currentParallelism(); } @Override public void start() { sourceSplitEnumerator.open(); + context.getEventListener().onEvent(new EnumeratorOpenEvent()); } @Override @@ -106,6 +111,7 @@ public EnumStateT snapshotState(long checkpointId) throws Exception { @Override public void close() throws IOException { sourceSplitEnumerator.close(); + context.getEventListener().onEvent(new EnumeratorCloseEvent()); } @Override diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java index c2f9cde5005..fb1dc85174e 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.event.ReaderCloseEvent; +import org.apache.seatunnel.api.source.event.ReaderOpenEvent; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.flink.api.connector.source.ReaderOutput; @@ -66,6 +68,7 @@ public FlinkSourceReader( public void start() { try { sourceReader.open(); + context.getEventListener().onEvent(new ReaderOpenEvent()); } catch (Exception e) { throw new RuntimeException(e); } @@ -121,6 +124,7 @@ public void handleSourceEvents(SourceEvent sourceEvent) { @Override public void close() throws Exception { sourceReader.close(); + context.getEventListener().onEvent(new ReaderCloseEvent()); } @Override diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java index 434b1ef9799..a9eac500629 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportResourceShare; +import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.translation.spark.execution.MultiTableManager; @@ -47,16 +48,19 @@ public class SparkDataWriter implements DataWriter sinkWriter, @Nullable SinkCommitter sinkCommitter, MultiTableManager multiTableManager, - long epochId) { + long epochId, + org.apache.seatunnel.api.sink.SinkWriter.Context context) { this.sinkWriter = sinkWriter; this.sinkCommitter = sinkCommitter; this.epochId = epochId == 0 ? 1 : epochId; this.multiTableManager = multiTableManager; + this.context = context; initResourceManger(); } @@ -97,6 +101,7 @@ public WriterCommitMessage commit() throws IOException { new SparkWriterCommitMessage<>(latestCommitInfoT); cleanCommitInfo(); sinkWriter.close(); + context.getEventListener().onEvent(new WriterCloseEvent()); try { if (resourceManager != null) { resourceManager.close(); diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java index 3a646f3aca2..b684654103a 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java @@ -63,6 +63,6 @@ public DataWriter createDataWriter(int partitionId, long taskId, lo throw new RuntimeException("Failed to create SinkCommitter.", e); } return new SparkDataWriter<>( - writer, committer, new MultiTableManager(catalogTables), epochId); + writer, committer, new MultiTableManager(catalogTables), epochId, context); } } diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java index 59f931e38f1..c2c24aa9147 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportResourceShare; +import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.translation.spark.execution.MultiTableManager; @@ -47,16 +48,19 @@ public class SeaTunnelSparkDataWriter implements DataWriter protected volatile MultiTableResourceManager resourceManager; private final MultiTableManager multiTableManager; + private final SinkWriter.Context context; public SeaTunnelSparkDataWriter( SinkWriter sinkWriter, @Nullable SinkCommitter sinkCommitter, MultiTableManager multiTableManager, - long epochId) { + long epochId, + SinkWriter.Context context) { this.sinkWriter = sinkWriter; this.sinkCommitter = sinkCommitter; this.multiTableManager = multiTableManager; this.epochId = epochId == 0 ? 1 : epochId; + this.context = context; initResourceManger(); } @@ -89,6 +93,7 @@ public WriterCommitMessage commit() throws IOException { new SeaTunnelSparkWriterCommitMessage<>(latestCommitInfoT); cleanCommitInfo(); sinkWriter.close(); + context.getEventListener().onEvent(new WriterCloseEvent()); try { if (resourceManager != null) { resourceManager.close(); diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java index b83787cac1e..255a9cd339f 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java @@ -64,7 +64,7 @@ public DataWriter createWriter(int partitionId, long taskId) { throw new RuntimeException("Failed to create SinkCommitter.", e); } return new SeaTunnelSparkDataWriter<>( - writer, committer, new MultiTableManager(catalogTables), 0); + writer, committer, new MultiTableManager(catalogTables), 0, context); } @Override