Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hdfs datasource tasker implementation #91

Open
wants to merge 62 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
a569977
Migration of the branch to github, InstantiationTest passed successfu…
Tiihott Sep 11, 2024
f82ed6c
Spotless
Tiihott Sep 13, 2024
212a6a0
Implemented incrementAndGetLatestOffset() method to HdfsQuery interfa…
Tiihott Sep 13, 2024
2677f4a
Implemented Kerberized hdfs access for HdfsDBClient. Implemented requ…
Tiihott Sep 16, 2024
d2337ab
Added avro dependencies and schema.
Tiihott Sep 17, 2024
b2ee255
Updated mock hdfs files with files built using the updated schema.
Tiihott Sep 17, 2024
e90e57c
Implemented preliminary code for tasker component. S3-HDFS cutoff fil…
Tiihott Sep 17, 2024
66b420a
Implemented preliminary S3-HDFS cutoff filtering.
Tiihott Sep 18, 2024
b4d58bb
Fixed error in InternalRow formatting.
Tiihott Sep 18, 2024
d8b07d7
Implemented HdfsMicroBatchInputPartitionReaderTest.java. Improved Moc…
Tiihott Sep 18, 2024
ae3b651
Added missing .close() call and fixed error in commenting.
Tiihott Sep 18, 2024
03b1ed0
Added more tests to HdfsMicroBatchInputPartitionReaderTest.java.
Tiihott Sep 19, 2024
4118414
Implemented AvroReaderTest.java. Moved hdfs tasker tests to their own…
Tiihott Sep 19, 2024
3b3c703
Removed obsolete test code from MockHDFS.
Tiihott Sep 19, 2024
baf2dde
Refactored HdfsRecordConverter class to HdfsRecordConversionImpl clas…
Tiihott Sep 19, 2024
388aaba
Implemented FileSystemFactoryImpl and FileSystemFactory interface.
Tiihott Sep 19, 2024
f8047d0
Moved FileSystem initialization logic from constructors of HdfsDBClie…
Tiihott Sep 19, 2024
15a9849
Refactored class attributes into local variables.
Tiihott Sep 19, 2024
3aa980f
Renamed AvroReader to AvroReadImpl. Implemented AvroRead interface.
Tiihott Sep 20, 2024
7c5ee1b
Added logging
Tiihott Sep 20, 2024
f8d85d4
Added new test to HdfsMicroBatchInputPartitionReaderTest
Tiihott Sep 20, 2024
0952390
Implemented FileSystemFactoryTest.java
Tiihott Sep 20, 2024
c8073b0
Implemented InstantationKafkaDisabledTest.java
Tiihott Sep 20, 2024
77ba9e0
Improved comments
Tiihott Sep 20, 2024
d217b08
Removed obsolete comments.
Tiihott Oct 14, 2024
bae6969
Converted private static literal mapType to a local variable.
Tiihott Oct 15, 2024
b6b8741
Renamed create() method to fileSystem() in FileSystemFactory interface.
Tiihott Oct 15, 2024
b7e5508
Renamed HdfsTopicPartitionOffsetMetadata.java to HdfsFileMetadata.java.
Tiihott Oct 15, 2024
f4bfe07
Refactored UseHdfsHostname naming usage to useHdfsHostname.
Tiihott Oct 15, 2024
a42b2d1
Cleaned up comments.
Tiihott Oct 16, 2024
4bc1ecd
Refactored HdfsDBClient constructor into secondary and primary constr…
Tiihott Oct 16, 2024
4c007bb
Refactored PathFilter interface usage in HdfsDBClient by implementing…
Tiihott Oct 17, 2024
f34678b
Renamed get() to record() in AvroRead interface.
Tiihott Oct 17, 2024
6c91b0d
Renamed get() to row() in HdfsRecordConversion interface.
Tiihott Oct 17, 2024
ce46083
Refactored primary constructor to only initialize all the encapsulate…
Tiihott Oct 17, 2024
30a4220
Refactoring AvroReadImpl.java, HdfsMicroBatchInputPartitionReader.jav…
Tiihott Oct 21, 2024
1a1063c
Refactored ArchiveMicroStreamReader.java, DatasourceOffset.java and S…
Tiihott Oct 22, 2024
72e3f77
Refactored ArchiveMicroStreamReader.java, HdfsOffset.java and KafkaOf…
Tiihott Oct 22, 2024
4c34f54
Replaced KafkaOffset and HdfsOffset null references with stubs.
Tiihott Oct 23, 2024
95ec8db
Removed serializable map conversion logic away from HdfsOffset constr…
Tiihott Oct 23, 2024
a5268f0
Implemented stub to replace null usage for HdfsQuery.
Tiihott Oct 23, 2024
e1379c5
Implemented StubBatchSliceCollection.java to allow initialization of …
Tiihott Oct 23, 2024
3c01d45
Refactored AvroRead and its implementation to restore hdfs time based…
Tiihott Oct 24, 2024
b389223
Combined 3 for-loops into one.
Tiihott Oct 24, 2024
efe3f8b
Fixed unintentional capital letter usage in config key.
Tiihott Oct 25, 2024
a83ba84
Moved TopicFilter initialization to secondary constructor.
Tiihott Oct 25, 2024
05304eb
Initialize topicsRegexString with all topics instead of empty string.
Tiihott Oct 25, 2024
968ab85
Implemented clearing of syslogRecordBuffer list as part of next() ins…
Tiihott Oct 25, 2024
56b3ef2
Refactored StubBatchSliceCollection to throw exception when it is pro…
Tiihott Oct 25, 2024
d0cef90
Refactored HdfsOffset constructors and added hdfs offset serde test.
Tiihott Oct 28, 2024
26e3987
Removed serializable map conversion logic away from KafkaOffset const…
Tiihott Oct 28, 2024
8d9d937
Replaced returning of stub with exception throw.
Tiihott Oct 28, 2024
3194415
Checkstyle plugin implementation and Checkstyle code cleanup. WIP
Tiihott Oct 30, 2024
b0461c1
Checkstyle plugin implementation and Checkstyle code cleanup, WIP. Ad…
Tiihott Nov 1, 2024
9060b6b
Checkstyle plugin implementation and Checkstyle code cleanup, WIP. Ad…
Tiihott Nov 4, 2024
f8c9c7a
Checkstyle plugin implementation and Checkstyle code cleanup, WIP. Re…
Tiihott Nov 4, 2024
efafc2e
Checkstyle plugin implementation complete. Implemented serializable M…
Tiihott Nov 5, 2024
932c39b
Refactored hashCode() methods to use Objects.hash().
Tiihott Nov 7, 2024
45c4a70
Renamed OffsetInterface to Offset. Spotless.
Tiihott Nov 8, 2024
abe5823
Replaced RuntimeException with more descriptive UncheckedIOException.
Tiihott Nov 8, 2024
d83af23
Rebase. Fixed leftover conflicts from rebase.
Tiihott Nov 8, 2024
d7550fa
Rebase Checkstyle code cleanup.
Tiihott Nov 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 304 additions & 0 deletions pom.xml

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions src/main/avro/KafkaRecord.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{"namespace": "com.teragrep.pth_06.avro",
"type": "record",
"name": "SyslogRecord",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "directory", "type": "string"},
{"name": "stream", "type": "string"},
{"name": "host", "type": "string"},
{"name": "input", "type": "string"},
{"name": "partition", "type": "string"},
{"name": "offset", "type": "long"},
{"name": "origin", "type": "string"},
{"name": "payload", "type": "string"}
]
}
150 changes: 97 additions & 53 deletions src/main/java/com/teragrep/pth_06/ArchiveMicroStreamReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@
package com.teragrep.pth_06;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.JsonArray;
import com.teragrep.pth_06.config.Config;
import com.teragrep.pth_06.planner.*;
import com.teragrep.pth_06.planner.offset.DatasourceOffset;
import com.teragrep.pth_06.planner.offset.HdfsOffset;
import com.teragrep.pth_06.planner.offset.KafkaOffset;
import com.teragrep.pth_06.scheduler.*;
import com.teragrep.pth_06.task.ArchiveMicroBatchInputPartition;
import com.teragrep.pth_06.task.HdfsMicroBatchInputPartition;
import com.teragrep.pth_06.task.TeragrepPartitionReaderFactory;
import com.teragrep.pth_06.task.KafkaMicroBatchInputPartition;
import org.apache.spark.sql.connector.read.InputPartition;
Expand Down Expand Up @@ -87,6 +90,8 @@ public final class ArchiveMicroStreamReader implements MicroBatchStream {
private final Config config;
private final ArchiveQuery aq;
private final KafkaQuery kq;
private final HdfsQuery hq;
private final JsonArray hdfsOffsets;

/**
* Constructor for ArchiveMicroStreamReader
Expand All @@ -98,6 +103,15 @@ public final class ArchiveMicroStreamReader implements MicroBatchStream {

this.config = config;

if (config.isHdfsEnabled) {
this.hq = new HdfsQueryProcessor(config);
hdfsOffsets = hq.hdfsOffsetMapToJSON();
}
else {
this.hq = new HdfsQueryProcessor();
hdfsOffsets = new JsonArray();
}

if (config.isArchiveEnabled) {
this.aq = new ArchiveQueryProcessor(config);
}
Expand All @@ -106,7 +120,13 @@ public final class ArchiveMicroStreamReader implements MicroBatchStream {
}

if (config.isKafkaEnabled) {
this.kq = new KafkaQueryProcessor(config);
if (config.isHdfsEnabled) {
this.kq = new KafkaQueryProcessor(config);
this.kq.seekToHdfsOffsets(hdfsOffsets);
}
else {
this.kq = new KafkaQueryProcessor(config);
}
}
else {
this.kq = null;
Expand All @@ -119,10 +139,18 @@ public final class ArchiveMicroStreamReader implements MicroBatchStream {
* Used for testing.
*/
@VisibleForTesting
ArchiveMicroStreamReader(ArchiveQuery aq, KafkaQuery kq, Config config) {
ArchiveMicroStreamReader(HdfsQuery hq, ArchiveQuery aq, KafkaQuery kq, Config config) {
this.config = config;
this.aq = aq;
this.kq = kq;
this.aq = aq; // Uses its own hardcoded query string defined in MockTeragrepDatasource.
this.kq = kq; // Skips using query string (and thus topic filtering) altogether.
this.hq = hq; // Uses the query string from config for topic filtering.
if (!this.hq.isStub() && this.kq != null) {
hdfsOffsets = this.hq.hdfsOffsetMapToJSON();
this.kq.seekToHdfsOffsets(hdfsOffsets);
}
else {
hdfsOffsets = new JsonArray();
}

LOGGER.debug("@VisibleForTesting MicroBatchReader> initialized");
}
Expand All @@ -137,25 +165,37 @@ public final class ArchiveMicroStreamReader implements MicroBatchStream {
public Offset initialOffset() {
// archive only: subtract 3600s (1 hour) from earliest to return first row (start exclusive)
DatasourceOffset rv;
if (this.config.isArchiveEnabled && !this.config.isKafkaEnabled) {
// only archive
rv = new DatasourceOffset(new LongOffset(this.aq.getInitialOffset() - 3600L));
HdfsOffset hdfsOffset = new HdfsOffset(); // stub
LongOffset longOffset = null; // Refactor null usage
KafkaOffset kafkaOffset = new KafkaOffset(); // stub

if (this.config.isHdfsEnabled) {
hdfsOffset = new HdfsOffset(this.hq.getBeginningOffsets().getOffsetMap());
hdfsOffset.serialize();
}
else if (!this.config.isArchiveEnabled && this.config.isKafkaEnabled) {
// only kafka
rv = new DatasourceOffset(new KafkaOffset(this.kq.getBeginningOffsets(null)));
if (this.config.isArchiveEnabled) {
longOffset = new LongOffset(this.aq.getInitialOffset() - 3600L);
}
else if (this.config.isArchiveEnabled) {
// both
rv = new DatasourceOffset(
new LongOffset(this.aq.getInitialOffset() - 3600L),
new KafkaOffset(this.kq.getBeginningOffsets(null))
);
if (this.config.isKafkaEnabled) {
if (this.config.isHdfsEnabled) {
if (hdfsOffsets.size() > 0) {
kafkaOffset = new KafkaOffset(this.kq.getConsumerPositions(hdfsOffsets));
kafkaOffset.serialize();
}
else {
kafkaOffset = new KafkaOffset(this.kq.getBeginningOffsets(null));
kafkaOffset.serialize();
}
}
else {
kafkaOffset = new KafkaOffset(this.kq.getBeginningOffsets(null));
kafkaOffset.serialize();
}
}
else {
// neither
throw new IllegalStateException("no datasources enabled, can't get initial offset");
if (hdfsOffset.isStub() && longOffset == null && kafkaOffset.isStub()) {
throw new IllegalStateException("no datasources enabled, can't get latest offset");
}
rv = new DatasourceOffset(hdfsOffset, longOffset, kafkaOffset);
LOGGER.debug("offset[initial]= {}", rv);
return rv;
}
Expand All @@ -172,6 +212,9 @@ public void commit(Offset offset) {
if (this.config.isArchiveEnabled) {
this.aq.commit(((DatasourceOffset) offset).getArchiveOffset().offset());
}
if (this.config.isHdfsEnabled) {
this.hq.commit(((DatasourceOffset) offset).getHdfsOffset());
}
}

/** {@inheritDoc} */
Expand All @@ -188,26 +231,25 @@ public void stop() {
@Override
public Offset latestOffset() {
DatasourceOffset rv;
if (this.config.isArchiveEnabled && !this.config.isKafkaEnabled) {
// only archive
rv = new DatasourceOffset(new LongOffset(this.aq.incrementAndGetLatestOffset()));
HdfsOffset hdfsOffset = new HdfsOffset();
LongOffset longOffset = null;
KafkaOffset kafkaOffset = new KafkaOffset();

if (this.config.isHdfsEnabled) {
hdfsOffset = new HdfsOffset(this.hq.incrementAndGetLatestOffset().getOffsetMap());
hdfsOffset.serialize();
}
else if (!this.config.isArchiveEnabled && this.config.isKafkaEnabled) {
// only kafka
rv = new DatasourceOffset(new KafkaOffset(this.kq.getInitialEndOffsets()));
if (this.config.isArchiveEnabled) {
longOffset = new LongOffset(this.aq.incrementAndGetLatestOffset());
}
else if (this.config.isArchiveEnabled) {
// both
rv = new DatasourceOffset(
new LongOffset(this.aq.incrementAndGetLatestOffset()),
new KafkaOffset(this.kq.getInitialEndOffsets())
);
if (this.config.isKafkaEnabled) {
kafkaOffset = new KafkaOffset(this.kq.getInitialEndOffsets());
kafkaOffset.serialize();
}
else {
// neither
if (hdfsOffset.isStub() && longOffset == null && kafkaOffset.isStub()) {
throw new IllegalStateException("no datasources enabled, can't get latest offset");
}

rv = new DatasourceOffset(hdfsOffset, longOffset, kafkaOffset);
LOGGER.debug("offset[latest]= {}", rv);
return rv;
}
Expand All @@ -223,18 +265,34 @@ else if (this.config.isArchiveEnabled) {
public InputPartition[] planInputPartitions(Offset start, Offset end) {
List<InputPartition> inputPartitions = new LinkedList<>();

Batch currentBatch = new Batch(config, aq, kq).processRange(start, end);
Batch currentBatch = new Batch(config, hq, aq, kq).processRange(start, end);

for (LinkedList<BatchSlice> taskObjectList : currentBatch) {

// archive tasks
LinkedList<ArchiveS3ObjectMetadata> archiveTaskList = new LinkedList<>();
// HDFS tasks
LinkedList<HdfsFileMetadata> hdfsTaskList = new LinkedList<>();
for (BatchSlice batchSlice : taskObjectList) {
if (batchSlice.type.equals(BatchSlice.Type.ARCHIVE)) {
archiveTaskList.add(batchSlice.archiveS3ObjectMetadata);
}
if (batchSlice.type.equals(BatchSlice.Type.HDFS)) {
hdfsTaskList.add(batchSlice.hdfsFileMetadata);
}
if (batchSlice.type.equals(BatchSlice.Type.KAFKA)) {
inputPartitions
.add(
new KafkaMicroBatchInputPartition(
config.kafkaConfig.executorOpts,
batchSlice.kafkaTopicPartitionOffsetMetadata.topicPartition,
batchSlice.kafkaTopicPartitionOffsetMetadata.startOffset,
batchSlice.kafkaTopicPartitionOffsetMetadata.endOffset,
config.kafkaConfig.executorConfig,
config.kafkaConfig.skipNonRFC5424Records
)
);
}
}

if (!archiveTaskList.isEmpty()) {
inputPartitions
.add(
Expand All @@ -251,22 +309,8 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) {
)
);
}

// kafka tasks
for (BatchSlice batchSlice : taskObjectList) {
if (batchSlice.type.equals(BatchSlice.Type.KAFKA)) {
inputPartitions
.add(
new KafkaMicroBatchInputPartition(
config.kafkaConfig.executorOpts,
batchSlice.kafkaTopicPartitionOffsetMetadata.topicPartition,
batchSlice.kafkaTopicPartitionOffsetMetadata.startOffset,
batchSlice.kafkaTopicPartitionOffsetMetadata.endOffset,
config.kafkaConfig.executorConfig,
config.kafkaConfig.skipNonRFC5424Records
)
);
}
if (!hdfsTaskList.isEmpty()) {
inputPartitions.add(new HdfsMicroBatchInputPartition(config.hdfsConfig, hdfsTaskList));
}
}

Expand Down
56 changes: 56 additions & 0 deletions src/main/java/com/teragrep/pth_06/FileSystemFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Teragrep Archive Datasource (pth_06)
* Copyright (C) 2021-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth_06;

import org.apache.hadoop.fs.FileSystem;

import java.io.IOException;

public interface FileSystemFactory {

public abstract FileSystem fileSystem(boolean initializeUGI) throws IOException;

}
Loading