diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FileFetchJob.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FileFetchJob.java index 3be049fb3a..5f189ffbc0 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FileFetchJob.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FileFetchJob.java @@ -71,6 +71,19 @@ public FileExtent call() throws Exception { private FileExtent doWork() throws IOException, StorageBackendException { final ByteBuffer byteBuffer = objectFetcher.readToByteBuffer(objectFetcher.fetch(key, range)); + if (range != null && !range.empty()) { + // Why this exists: + // We read from a stream/channel until EOF; EOF can happen before the requested range is fully delivered + // due to network/transient backend issues. + // SDKs often throw on transport failures, but not all cases surface as exceptions, so we validate length. + final int expectedBytes = range.bufferSize(); + final int actualBytes = byteBuffer.remaining(); + if (actualBytes != expectedBytes) { + throw new StorageBackendException( + "Short read for " + key + " range " + range + ": expected " + expectedBytes + " bytes, got " + actualBytes + ); + } + } return createFileExtent(key, range, byteBuffer); } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FileFetchJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FileFetchJobTest.java index a7cda9ecc5..ea10b36734 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FileFetchJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FileFetchJobTest.java @@ -39,6 +39,7 @@ import io.aiven.inkless.common.PlainObjectKey; import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; +import io.aiven.inkless.storage_backend.common.StorageBackendException; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -79,6 +80,20 @@ public void testFetch() throws Exception { assertThat(actualFile).isEqualTo(expectedFile); } + @Test + public void testShortReadThrowsStorageBackendException() throws Exception { + final int expectedSize = 10; + final ByteRange range = new ByteRange(0, expectedSize); + final FileFetchJob job = new FileFetchJob(time, fetcher, objectA, range, durationMs -> { }); + + final ReadableByteChannel channel = mock(ReadableByteChannel.class); + when(fetcher.fetch(objectA, range)).thenReturn(channel); + // Return fewer bytes than requested (simulates short read / truncated stream). + when(fetcher.readToByteBuffer(channel)).thenReturn(ByteBuffer.allocate(expectedSize - 1)); + + assertThrows(StorageBackendException.class, job::call); + } + private List createCacheAlignedFileExtents(int fileSize, int blockSize) { byte[] array = new byte[fileSize]; for (int i = 0; i < fileSize; i++) {