Skip to content

Commit

Permalink
[Feature][Core] Add event notify for all connector (apache#7501)
Browse files Browse the repository at this point in the history
* [feature]add event notify

* [feature]add event notify

* [fixbug]fix some problem

* [feature]fix some problem

* [feature]fix some problem
  • Loading branch information
zhangshenghang authored Aug 28, 2024
1 parent 51c8e1a commit d71337b
Show file tree
Hide file tree
Showing 19 changed files with 118 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,26 @@ public String getPluginName() {
public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> createWriter(
SinkWriter.Context context) throws IOException {
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new HashMap<>();
Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new HashMap<>();
for (int i = 0; i < replicaNum; i++) {
for (String tableIdentifier : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tableIdentifier);
int index = context.getIndexOfSubtask() * replicaNum + i;
writers.put(
SinkIdentifier.of(tableIdentifier, index),
sink.createWriter(new SinkContextProxy(index, context)));
sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context);
}
}
return new MultiTableSinkWriter(writers, replicaNum);
return new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext);
}

@Override
public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWriter(
SinkWriter.Context context, List<MultiTableState> states) throws IOException {
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new HashMap<>();
Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new HashMap<>();

for (int i = 0; i < replicaNum; i++) {
for (String tableIdentifier : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tableIdentifier);
Expand All @@ -102,9 +106,10 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWr
sinkIdentifier,
sink.restoreWriter(new SinkContextProxy(index, context), state));
}
sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context);
}
}
return new MultiTableSinkWriter(writers, replicaNum);
return new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

Expand All @@ -45,6 +46,7 @@ public class MultiTableSinkWriter
implements SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> {

private final Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters;
private final Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext;
private final Map<String, Optional<Integer>> sinkPrimaryKeys = new HashMap<>();
private final List<Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>> sinkWritersWithIndex;
private final List<MultiTableWriterRunnable> runnable = new ArrayList<>();
Expand All @@ -55,8 +57,11 @@ public class MultiTableSinkWriter
private volatile boolean submitted = false;

public MultiTableSinkWriter(
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters, int queueSize) {
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters,
int queueSize,
Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext) {
this.sinkWriters = sinkWriters;
this.sinkWritersContext = sinkWritersContext;
AtomicInteger cnt = new AtomicInteger(0);
executorService =
Executors.newFixedThreadPool(
Expand Down Expand Up @@ -84,6 +89,7 @@ public MultiTableSinkWriter(
entry.getKey().getTableIdentifier(), entry.getValue());
sinkIdentifierMap.put(entry.getKey(), entry.getValue());
});

sinkWritersWithIndex.add(sinkIdentifierMap);
blockingQueues.add(queue);
MultiTableWriterRunnable r = new MultiTableWriterRunnable(tableIdWriterMap, queue);
Expand Down Expand Up @@ -267,26 +273,34 @@ public void abortPrepare() {

@Override
public void close() throws IOException {
Throwable firstE = null;
// The variables used in lambda expressions should be final or valid final, so they are
// modified to arrays
final Throwable[] firstE = {null};
try {
checkQueueRemain();
} catch (Exception e) {
firstE = e;
firstE[0] = e;
}
executorService.shutdownNow();
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
synchronized (runnable.get(i)) {
for (SinkWriter<SeaTunnelRow, ?, ?> sinkWriter :
sinkWritersWithIndex.get(i).values()) {
try {
sinkWriter.close();
} catch (Throwable e) {
if (firstE == null) {
firstE = e;
}
log.error("close error", e);
}
}
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkIdentifierSinkWriterMap =
sinkWritersWithIndex.get(i);
sinkIdentifierSinkWriterMap.forEach(
(identifier, sinkWriter) -> {
try {
sinkWriter.close();
sinkWritersContext
.get(identifier)
.getEventListener()
.onEvent(new WriterCloseEvent());
} catch (Throwable e) {
if (firstE[0] == null) {
firstE[0] = e;
}
log.error("close error", e);
}
});
}
}
try {
Expand All @@ -296,8 +310,8 @@ public void close() throws IOException {
} catch (Throwable e) {
log.error("close resourceManager error", e);
}
if (firstE != null) {
throw new RuntimeException(firstE);
if (firstE[0] != null) {
throw new RuntimeException(firstE[0]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
Expand Down Expand Up @@ -99,9 +98,7 @@ public void write(SeaTunnelRow element) {
}

@Override
public void close() {
context.getEventListener().onEvent(new WriterCloseEvent());
}
public void close() {}

private String fieldsInfo(SeaTunnelRowType seaTunnelRowType) {
String[] fieldsInfo = new String[seaTunnelRowType.getTotalFields()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
Expand Down Expand Up @@ -73,14 +71,10 @@ public FakeSourceReader(
}

@Override
public void open() {
context.getEventListener().onEvent(new ReaderOpenEvent());
}
public void open() {}

@Override
public void close() {
context.getEventListener().onEvent(new ReaderCloseEvent());
}
public void close() {}

@Override
@SuppressWarnings("MagicNumber")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;

import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
Expand Down Expand Up @@ -58,9 +56,7 @@ public FakeSourceSplitEnumerator(
}

@Override
public void open() {
enumeratorContext.getEventListener().onEvent(new EnumeratorOpenEvent());
}
public void open() {}

@Override
public void run() throws Exception {
Expand All @@ -69,9 +65,7 @@ public void run() throws Exception {
}

@Override
public void close() throws IOException {
enumeratorContext.getEventListener().onEvent(new EnumeratorCloseEvent());
}
public void close() throws IOException {}

@Override
public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testEventReport() throws IOException, InterruptedException {
arrayNode.elements().forEachRemaining(jsonNode -> events.add(jsonNode));
}
}
Assertions.assertEquals(8, events.size());
Assertions.assertEquals(10, events.size());
Set<String> eventTypes =
events.stream().map(e -> e.get("eventType").asText()).collect(Collectors.toSet());
Assertions.assertTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
Expand Down Expand Up @@ -121,6 +123,7 @@ public void close() throws IOException {
super.close();
if (enumerator != null) {
enumerator.close();
enumeratorContext.getEventListener().onEvent(new EnumeratorCloseEvent());
}
progress.done();
}
Expand Down Expand Up @@ -309,6 +312,7 @@ private void stateProcess() throws Exception {
if (startCalled && readerRegisterComplete) {
currState = STARTING;
enumerator.open();
enumeratorContext.getEventListener().onEvent(new EnumeratorOpenEvent());
} else {
Thread.sleep(100);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportResourceShare;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo

private final SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT> sinkAction;
private SinkWriter<T, CommitInfoT, StateT> writer;
private SinkWriter.Context writerContext;

private transient Optional<Serializer<CommitInfoT>> commitInfoSerializer;
private transient Optional<Serializer<StateT>> writerStateSerializer;
Expand Down Expand Up @@ -150,6 +152,7 @@ private Address getCommitterTaskAddress() throws ExecutionException, Interrupted
public void close() throws IOException {
super.close();
writer.close();
writerContext.getEventListener().onEvent(new WriterCloseEvent());
try {
if (resourceManager != null) {
resourceManager.close();
Expand Down Expand Up @@ -283,19 +286,11 @@ public void restoreState(List<ActionSubtaskState> actionStateList) throws Except
.deserialize(bytes)))
.collect(Collectors.toList());
}
this.writerContext = new SinkWriterContext(indexID, metricsContext, eventListener);
if (states.isEmpty()) {
this.writer =
sinkAction
.getSink()
.createWriter(
new SinkWriterContext(indexID, metricsContext, eventListener));
this.writer = sinkAction.getSink().createWriter(writerContext);
} else {
this.writer =
sinkAction
.getSink()
.restoreWriter(
new SinkWriterContext(indexID, metricsContext, eventListener),
states);
this.writer = sinkAction.getSink().restoreWriter(writerContext, states);
}
if (this.writer instanceof SupportResourceShare) {
resourceManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends ActionFl

private final MetricsContext metricsContext;
private final EventListener eventListener;
private SourceReader.Context context;

private final AtomicReference<SchemaChangePhase> schemaChangePhase = new AtomicReference<>();

Expand Down Expand Up @@ -111,21 +114,20 @@ public void setCollector(SeaTunnelSourceCollector<T> collector) {
@Override
public void init() throws Exception {
this.splitSerializer = sourceAction.getSource().getSplitSerializer();
this.reader =
sourceAction
.getSource()
.createReader(
new SourceReaderContext(
indexID,
sourceAction.getSource().getBoundedness(),
this,
metricsContext,
eventListener));
this.context =
new SourceReaderContext(
indexID,
sourceAction.getSource().getBoundedness(),
this,
metricsContext,
eventListener);
this.reader = sourceAction.getSource().createReader(context);
this.enumeratorTaskAddress = getEnumeratorTaskAddress();
}

@Override
public void open() throws Exception {
context.getEventListener().onEvent(new ReaderOpenEvent());
reader.open();
register();
}
Expand All @@ -140,6 +142,7 @@ private Address getEnumeratorTaskAddress() throws ExecutionException, Interrupte

@Override
public void close() throws IOException {
context.getEventListener().onEvent(new ReaderCloseEvent());
reader.close();
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -136,6 +140,7 @@ public void open() throws Exception {
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(
parallelism, "parallel-split-enumerator-executor");
splitEnumerator.open();
coordinatedEnumeratorContext.getEventListener().onEvent(new EnumeratorOpenEvent());
restoredSplitStateMap.forEach(
(subtaskId, splits) -> {
splitEnumerator.addSplitsBack(splits, subtaskId);
Expand All @@ -147,6 +152,10 @@ public void open() throws Exception {
entry -> {
try {
entry.getValue().open();
readerContextMap
.get(entry.getKey())
.getEventListener()
.onEvent(new ReaderOpenEvent());
splitEnumerator.registerReader(entry.getKey());
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -203,6 +212,7 @@ public void close() throws IOException {
for (Map.Entry<Integer, SourceReader<T, SplitT>> entry : readerMap.entrySet()) {
readerRunningMap.get(entry.getKey()).set(false);
entry.getValue().close();
readerContextMap.get(entry.getKey()).getEventListener().onEvent(new ReaderCloseEvent());
}

if (executorService != null) {
Expand All @@ -211,6 +221,7 @@ public void close() throws IOException {

try (SourceSplitEnumerator<SplitT, StateT> closed = splitEnumerator) {
// just close the resources
coordinatedEnumeratorContext.getEventListener().onEvent(new EnumeratorCloseEvent());
}
}

Expand Down
Loading

0 comments on commit d71337b

Please sign in to comment.