From b8f275d888cfcbe4aa036b2d0021d7117599812a Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 20 Aug 2024 12:29:53 +0800 Subject: [PATCH] [CELEBORN-1567] Support throw FetchFailedException when Data corruption detected ### What changes were proposed in this pull request? ### Why are the changes needed? https://github.com/apache/celeborn/pull/2655#pullrequestreview-2213124224 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA Closes #2691 from cxzl25/CELEBORN-1567. Authored-by: sychen Signed-off-by: Shaoyun Chen --- .../celeborn/client/read/CelebornInputStream.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java index 588fbdceef..bd0164cd6b 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java +++ b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java @@ -27,8 +27,10 @@ import scala.Tuple2; +import com.github.luben.zstd.ZstdException; import com.google.common.util.concurrent.Uninterruptibles; import io.netty.buffer.ByteBuf; +import net.jpountz.lz4.LZ4Exception; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -665,7 +667,7 @@ private boolean fillBuffer() throws IOException { } return hasData; - } catch (IOException e) { + } catch (LZ4Exception | ZstdException | IOException e) { logger.error( "Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {}, partitionId {}, location {}", appShuffleId, @@ -673,7 +675,12 @@ private boolean fillBuffer() throws IOException { partitionId, currentReader.getLocation(), e); - IOException ioe = e; + IOException ioe; + if (e instanceof IOException) { + ioe = (IOException) e; + } else { + ioe = new IOException(e); + } if (exceptionMaker != null) { if (shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId)) { /* @@ -690,7 +697,7 @@ private boolean fillBuffer() throws IOException { throw ioe; } catch (Exception e) { logger.error( - "Failed to read data from chunk. AppShuffleId {}, shuffleId {}, partitionId {}, location {}", + "Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {}, partitionId {}, location {}", appShuffleId, shuffleId, partitionId,