Skip to content

Commit

Permalink
null check for uncompressedSize in ArchiveBatchSliceCollection (#127)
Browse files Browse the repository at this point in the history
* null check for uncompressedSize in ArchiveBatchSliceCollection

* add NULL value for uncompressed in tests; check in metadataTest that NULL value is -1 in results
  • Loading branch information
eemhu authored Nov 26, 2024
1 parent 4429cbc commit 7d324c7
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public ArchiveBatchSliceCollection processRange(Offset start, Offset end) {
);

for (Record r : result) {
// uncompressed size can be null
long uncompressedSize = -1L;
if (r.get(10) != null) {
uncompressedSize = r.get(10, Long.class);
}

this
.add(new BatchSlice(new ArchiveS3ObjectMetadata(r.get(0, String.class), // id
r.get(6, String.class), // bucket
Expand All @@ -89,7 +95,7 @@ public ArchiveBatchSliceCollection processRange(Offset start, Offset end) {
r.get(3, String.class), // host
r.get(8, Long.class), // logtime
r.get(9, Long.class), // compressedSize
r.get(10, Long.class) // uncompressedSize
uncompressedSize // uncompressedSize
)));
}
return this;
Expand Down
39 changes: 21 additions & 18 deletions src/test/java/com/teragrep/pth_06/InstantiationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@
import java.sql.Date;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class InstantiationTest {
Expand Down Expand Up @@ -188,11 +186,12 @@ else if (streamingQuery.lastProgress().sources().length != 0) {
}
}
}
Assertions.assertEquals(expectedRows, rowCount);
assertEquals(expectedRows, rowCount);
}

@Test
public void metadataTest() throws StreamingQueryException, TimeoutException {
Map<String, String> partitionToUncompressedMapping = new HashMap<>();
// please notice that JAVA_HOME=/usr/lib/jvm/java-1.8.0 mvn clean test -Pdev is required
Dataset<Row> df = spark
.readStream()
Expand Down Expand Up @@ -225,28 +224,32 @@ public void metadataTest() throws StreamingQueryException, TimeoutException {
.load();

StreamingQuery sq = df.writeStream().foreachBatch((ds, i) -> {
ds.show(false);
List<String> rawCol = ds
.select("_raw")
ds
.select("partition", "_raw")
.collectAsList()
.stream()
.map(r -> r.getAs(0).toString())
.collect(Collectors.toList());
assertFalse(rawCol.isEmpty());
for (String c : rawCol) {
assertFalse(c.isEmpty());
JsonObject jo = new Gson().fromJson(c, JsonObject.class);
assertTrue(jo.has("compressed"));
assertTrue(jo.has("uncompressed"));
}
.forEach(r -> partitionToUncompressedMapping.put(r.getAs(0), r.getAs(1)));
}).start();
sq.processAllAvailable();
sq.stop();
sq.awaitTermination();

int loops = 0;
for (Map.Entry<String, String> entry : partitionToUncompressedMapping.entrySet()) {
assertFalse(entry.getValue().isEmpty());
JsonObject jo = new Gson().fromJson(entry.getValue(), JsonObject.class);
assertTrue(jo.has("compressed"));
assertTrue(jo.has("uncompressed"));
loops++;
}
Assertions.assertEquals(33, loops);
//partition=19181 has NULL for uncompressed size
assertEquals(
-1L, new Gson().fromJson(partitionToUncompressedMapping.get("19181"), JsonObject.class).get("uncompressed").getAsLong()
);
}

private boolean isArchiveDone(StreamingQuery outQ) {
Boolean archiveDone = true;
boolean archiveDone = true;
for (int i = 0; i < outQ.lastProgress().sources().length; i++) {
String startOffset = outQ.lastProgress().sources()[i].startOffset();
String endOffset = outQ.lastProgress().sources()[i].endOffset();
Expand Down
8 changes: 6 additions & 2 deletions src/test/java/com/teragrep/pth_06/planner/MockDBData.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public MockDBData() {
generateResult(
"19181", "f17_v2", "log:f17_v2:0", "sc-99-99-14-40", "f17_v2", "2010-01-08",
"hundred-year", "2010/01-08/sc-99-99-14-40/f17_v2/f17_v2.logGLOB-2010010801.log.gz",
"1262905200", "28306039", "283060390"
"1262905200", "28306039", null
)
);
virtualDatabaseMap
Expand Down Expand Up @@ -423,7 +423,11 @@ public MockResult[] execute(MockExecuteContext ctx) throws SQLException {
newRecord.set(StreamDBClient.SliceTable.path, path);
newRecord.set(StreamDBClient.SliceTable.logtime, Long.valueOf(logtime));
newRecord.set(StreamDBClient.SliceTable.filesize, ULong.valueOf(filesize));
newRecord.set(StreamDBClient.SliceTable.uncompressedFilesize, ULong.valueOf(uncompressedFilesize));
newRecord
.set(
StreamDBClient.SliceTable.uncompressedFilesize,
uncompressedFilesize != null ? ULong.valueOf(uncompressedFilesize) : null
);

result.add(newRecord);
} // else empty set
Expand Down

0 comments on commit 7d324c7

Please sign in to comment.