Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ public FileExtent call() throws Exception {

private FileExtent doWork() throws IOException, StorageBackendException {
final ByteBuffer byteBuffer = objectFetcher.readToByteBuffer(objectFetcher.fetch(key, range));
if (range != null && !range.empty()) {
// Why this exists:
// We read from a stream/channel until EOF; EOF can happen before the requested range is fully delivered
// due to network/transient backend issues.
// SDKs often throw on transport failures, but not all cases surface as exceptions, so we validate length.
final int expectedBytes = range.bufferSize();
final int actualBytes = byteBuffer.remaining();
if (actualBytes != expectedBytes) {
throw new StorageBackendException(
"Short read for " + key + " range " + range + ": expected " + expectedBytes + " bytes, got " + actualBytes
);
}
}
return createFileExtent(key, range, byteBuffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.aiven.inkless.common.PlainObjectKey;
import io.aiven.inkless.generated.FileExtent;
import io.aiven.inkless.storage_backend.common.ObjectFetcher;
import io.aiven.inkless.storage_backend.common.StorageBackendException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -79,6 +80,20 @@ public void testFetch() throws Exception {
assertThat(actualFile).isEqualTo(expectedFile);
}

@Test
public void testShortReadThrowsStorageBackendException() throws Exception {
final int expectedSize = 10;
final ByteRange range = new ByteRange(0, expectedSize);
final FileFetchJob job = new FileFetchJob(time, fetcher, objectA, range, durationMs -> { });

final ReadableByteChannel channel = mock(ReadableByteChannel.class);
when(fetcher.fetch(objectA, range)).thenReturn(channel);
// Return fewer bytes than requested (simulates short read / truncated stream).
when(fetcher.readToByteBuffer(channel)).thenReturn(ByteBuffer.allocate(expectedSize - 1));

assertThrows(StorageBackendException.class, job::call);
}

private List<FileExtent> createCacheAlignedFileExtents(int fileSize, int blockSize) {
byte[] array = new byte[fileSize];
for (int i = 0; i < fileSize; i++) {
Expand Down
Loading