-
Notifications
You must be signed in to change notification settings - Fork 68
FLIP 27 integration
Flink 1.11 introduced new Data Source API as part of FLIP-27. Here is the API introduction documents. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/sources.html
The Source
is a factory class to create the instances of the below concepts. It manages a reader group with a builder style constructor with user provided ReaderGroupConfig
. We can reuse the current AbstractStreamingReaderBuilder
to build such a source.
public class FlinkPravegaSource<T> implements Source<T, PravegaSplit, Checkpoint>, ResultTypeQueryable<T> {}
A Split
represents an EventStreamReader
, but with no operations and keep stateless. The splitId will be the Pravega reader Id. To keep it serializable, we will not keep the reader inside.
public class PravegaSplit implements SourceSplit, Serializable {
private int subtaskId;
private String readerGroupName;
@Override
public String splitId() {
return readerGroupName + "-" + subtaskId;
}
}
The SplitEnumerator
is a single instance on Flink jobmanager. It connects to a Pravega reader group with the pravega stream. It is the "brain" of the source to initialize the reader group when it starts, then discover and assign the subtasks.
public class PravegaSplitEnumerator implements SplitEnumerator<PravegaSplit, Checkpoint> {}
start()
method will help to initiate reader group manager, reader group and reset the group to the checkpoint position if checkpoint isn't null.
snapshotState()
moethod will deal with the data recovery, it will call readerGroup::initiateCheckpoint to get a checkpoint for recovery. This call will be handled in another thread pool instead of the split enumerator thread.
addSplitsBack(List<PravegaSplit> splits, int subtaskId)
method is called when it tries to "regionly" recover from a failure to assign unassigned splits. We will throw an intentional exception here to trigger a global recovery. This will shutdown the reader group, recreate it and recover it from the latest checkpoint.
addReader(int subtaskId)
function will check with the current parallelism and update the assignment (always one-to-one mapping).
The SourceReader
has a default recommended Flink implementation SourceReaderBase
. This one constructs with three major components, SplitReader
, SplitFetcherManager
and RecordEmitter
. With this recommended API, it allows us to just provide a SplitReader
abstraction to implement this as a whole.
public class PravegaSourceReader<T>
extends SourceReaderBase<EventRead<T>, T, PravegaSplit, PravegaSplit> {}
We can will use PravegaFetcherManager
to handle the split fetch. It is based on Flink SingleThreadFetcherManager
, each reader instance will have this fetcher which is single threaded and this fetcher will supply the split reader and assign all the splits assigned by the enumerator to it. Because Flink does not provide a method to close the split reader so far, when supply the split reader, we also need to record the splitReader and override its close method to close all the readers.
The SplitReader
is actually an instance of a EventStreamReader
. It has a fetch()
interface to read one or more EventRead<T>
from an EventStreamReader
.
public class PravegaSplitReader<T> implements SplitReader<EventRead<T>, PravegaSplit>, AutoCloseable {}
handleSplitsChanges
is the method the reader gets the assigned split. That's when we know the split(subtask) ID, so we will create the EventStreamReader
instance in this method rather than the constructor of the class.
TODO:
RecordEmitter
is actually a function to turn EventRead<T>
into T
. We should offer a default implementation for just get the event, but we can also let user to DIY, especially when they want to index with the Pravega metadata. We have similar implementation in https://github.com/pravega/flink-connectors/issues/180