Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hdfs datasource tasker implementation #91

Open
wants to merge 62 commits into
base: main
Choose a base branch
from

Conversation

Tiihott
Copy link

@Tiihott Tiihott commented Sep 20, 2024

HDFS datasource feature to allow for querying the semi latest data from HDFS. Latest records that are not yet present in HDFS are queried from Kafka, old data is queried from S3 as usual.

The included planner and scheduler components were already reviewed outside GitHub.

Includes:

  • Migration of the existing hdfs datasource dev branch to GitHub, which included the planner and scheduler component implementations.
  • New tasker component implementation which reads records from avro-files that are stored in the HDFS.
  • Time based record inclusion for handling the cutoff between S3 and HDFS.
  • Kerberized HDFS access configurations.
  • New HdfsQueryProcessor.incrementAndGetLatestOffset() method for going through the offsets incrementally to split the data into appropriate sized batches.

Missing:

  • additional filtering of HDFS records based on queryXML which couldn't be implemented in the previously implemented planner and scheduler components.

@Tiihott Tiihott self-assigned this Sep 20, 2024
@Tiihott Tiihott requested a review from eemhu September 23, 2024 08:18
@Tiihott Tiihott requested a review from eemhu October 24, 2024 09:51
serializedKafkaOffset = new HashMap<>(offset.size());
for (Map.Entry<TopicPartition, Long> entry : offset.entrySet()) {

serializedKafkaOffset.put(entry.getKey().toString(), entry.getValue()); // offset
}
this.stub = stub;
}

public KafkaOffset(String s) {
Gson gson = new Gson();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be moved up and use primary constructor with this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map<TopicPartition, Long> is not serializable which is why it is converted to serializable Map<String, Long> in the constructor. And because of this logic being inside the constructors the KafkaOffset(String s) which is used by serialization testing can't be simply refactored to a secondary constructor.
The code must be refactored so the conversion to serializable map is done before the object is initialized, that way the constructor can be refactored into a secondary constructor.
Solved identical issue of HdfsOffset in commit: b928695
Solved in commit: cc07931

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this potentially use a decorator object or similar?

Copy link
Author

@Tiihott Tiihott Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class extends the abstract class Offset and implements the interface Serializable. AFAIK, because the class is serializable it can't encapsulate the Map<TopicPartition, Long> and the same goes for decorators.

It would be possible to use transient keyword with the Map<TopicPartition, Long> which makes serialization ignore the class field. This way Map<TopicPartition, Long> could be used as input parameter for the constructor, and a new method can be added for converting the Map<TopicPartition, Long> into a Map<String, Long> before the serialization is done. That way at least the code duplication for converting the map to serializable format would be solved.
Implemented serializable Map conversion to HdfsOffset and KafkaOffset in commit 014cc64

…a and HdfsRecordConversionImpl.java to make all objects immutable. Time based inclusion refactoring is WIP.
…erializedDatasourceOffset.java to reduce the amount of if-statements.
…fset.java to remove null usages. Fixed error in initialOffset() method.
…empty BatchSliceCollection. Replaced initialization of BatchSliceCollection as null with StubBatchSliceCollection.
…tead of separate clear() method. Removed clear() from AvroRead interface.
…ructor. Refactored KafkaOffset constructors.
…ded missing hashCode() overrides and other minor fixes.
…factored Exceptions to conform to Checkstyle standards.
…ap conversion to HdfsOffset and KafkaOffset, solving code duplication of the conversion. Implemented additional OffsetInterface.java for HdfsOffset and KafkaOffset.
@Tiihott
Copy link
Author

Tiihott commented Nov 11, 2024

Rebased again to current main branch. Applied Checkstyle cleanup to the new files from rebase.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants