Skip to content

Commit

Permalink
[CELEBORN-1469] Support writing shuffle data to OSS(S3 only)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

as title

### Why are the changes needed?

Now, Celeborn doesn't support sinking shuffle data directly to Amazon S3, which could be a limitation when we're trying to move on-premises servers to AWS and use S3 as a data sink for shuffled data.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Closes #2579 from zhaohehuhu/dev-0619.

Authored-by: zhaohehuhu <luoyedeyi@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
  • Loading branch information
zhaohehuhu authored and FMX committed Jul 24, 2024
1 parent 98c9ba3 commit 7a596bb
Show file tree
Hide file tree
Showing 40 changed files with 685 additions and 243 deletions.
2 changes: 2 additions & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ org.apache.commons:commons-crypto
org.apache.commons:commons-lang3
org.apache.hadoop:hadoop-client-api
org.apache.hadoop:hadoop-client-runtime
org.apache.hadoop:hadoop-aws
org.apache.ibatis:mybatis
org.apache.logging.log4j:log4j-1.2-api
org.apache.logging.log4j:log4j-api
Expand Down Expand Up @@ -307,6 +308,7 @@ org.slf4j:jcl-over-slf4j
org.webjars:swagger-ui
org.xerial.snappy:snappy-java
org.yaml:snakeyaml
com.amazonaws:aws-java-sdk-bundle

------------------------------------------------------------------------------------
This product bundles various third-party components under other open source licenses.
Expand Down
4 changes: 4 additions & 0 deletions NOTICE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,7 @@ Copyright (c) 2022 Luke Hutchison

mimepool
Copyright (c) 2018, 2022 Oracle and/or its affiliates.


aws-java-sdk
Copyright 2010-2024 Amazon.com, Inc. or its affiliates.
10 changes: 9 additions & 1 deletion build/make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ RELEASE="false"
MVN="$PROJECT_DIR/build/mvn"
SBT="$PROJECT_DIR/build/sbt"
SBT_ENABLED="false"
HADOOP_AWS_ENABLED="false"

function exit_with_usage {
echo "make-distribution.sh - tool for making binary distributions of Celeborn"
Expand Down Expand Up @@ -62,6 +63,11 @@ while (( "$#" )); do
echo "Error: $1 is not supported"
exit_with_usage
;;
-P*)
if [[ "$1" == *"hadoop-aws"* ]]; then
HADOOP_AWS_ENABLED="true"
fi
;;
-*)
break
;;
Expand Down Expand Up @@ -256,7 +262,9 @@ function sbt_build_service {

echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE"
echo "Build flags: $@" >> "$DIST_DIR/RELEASE"

if [[ "$HADOOP_AWS_ENABLED" == "true" ]]; then
export SBT_MAVEN_PROFILES="hadoop-aws"
fi
BUILD_COMMAND=("$SBT" clean package)

# Actually build the jar
Expand Down
18 changes: 10 additions & 8 deletions client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;

Expand All @@ -34,6 +35,7 @@
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.PbStreamHandler;
import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.util.CelebornHadoopUtils;
import org.apache.celeborn.common.util.ExceptionMaker;
Expand All @@ -47,15 +49,15 @@ public abstract class ShuffleClient {
private static Logger logger = LoggerFactory.getLogger(ShuffleClient.class);
private static volatile ShuffleClient _instance;
private static volatile boolean initialized = false;
private static volatile FileSystem hdfsFs;
private static volatile Map<StorageInfo.Type, FileSystem> hadoopFs;
private static LongAdder totalReadCounter = new LongAdder();
private static LongAdder localShuffleReadCounter = new LongAdder();

// for testing
public static void reset() {
_instance = null;
initialized = false;
hdfsFs = null;
hadoopFs = null;
}

protected ShuffleClient() {}
Expand Down Expand Up @@ -101,19 +103,19 @@ public static ShuffleClient get(
return _instance;
}

public static FileSystem getHdfsFs(CelebornConf conf) {
if (null == hdfsFs) {
public static Map<StorageInfo.Type, FileSystem> getHadoopFs(CelebornConf conf) {
if (null == hadoopFs) {
synchronized (ShuffleClient.class) {
if (null == hdfsFs) {
if (null == hadoopFs) {
try {
hdfsFs = CelebornHadoopUtils.getHadoopFS(conf);
hadoopFs = CelebornHadoopUtils.getHadoopFS(conf);
} catch (Exception e) {
logger.error("Celeborn initialize HDFS failed.", e);
logger.error("Celeborn initialize DFS failed.", e);
}
}
}
}
return hdfsFs;
return hadoopFs;
}

public static void incrementLocalReadCounter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCounted;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,6 +45,7 @@
import org.apache.celeborn.common.protocol.PbBufferStreamEnd;
import org.apache.celeborn.common.protocol.PbOpenStream;
import org.apache.celeborn.common.protocol.PbStreamHandler;
import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.protocol.StreamType;
import org.apache.celeborn.common.util.ShuffleBlockInfoUtils;
import org.apache.celeborn.common.util.ThreadUtils;
Expand All @@ -60,14 +62,15 @@ public class DfsPartitionReader implements PartitionReader {
private volatile boolean closed = false;
private ExecutorService fetchThread;
private boolean fetchThreadStarted;
private FSDataInputStream hdfsInputStream;
private FSDataInputStream dfsInputStream;
private int numChunks = 0;
private int returnedChunks = 0;
private int currentChunkIndex = 0;
private final List<Long> chunkOffsets = new ArrayList<>();
private TransportClient client;
private PbStreamHandler streamHandler;
private MetricsCallback metricsCallback;
private FileSystem hadoopFs;

public DfsPartitionReader(
CelebornConf conf,
Expand All @@ -85,6 +88,12 @@ public DfsPartitionReader(

this.metricsCallback = metricsCallback;
this.location = location;
if (location.getStorageInfo() != null
&& location.getStorageInfo().getType() == StorageInfo.Type.S3) {
this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
} else {
this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS);
}

if (endMapIndex != Integer.MAX_VALUE) {
long fetchTimeoutMs = conf.clientFetchTimeoutMs();
Expand All @@ -105,18 +114,17 @@ public DfsPartitionReader(
// Parse this message to ensure sort is done.
} catch (IOException | InterruptedException e) {
throw new IOException(
"read shuffle file from HDFS failed, filePath: "
"read shuffle file from DFS failed, filePath: "
+ location.getStorageInfo().getFilePath(),
e);
}
hdfsInputStream =
ShuffleClient.getHdfsFs(conf)
.open(new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));

dfsInputStream =
hadoopFs.open(new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
chunkOffsets.addAll(
getChunkOffsetsFromSortedIndex(conf, location, startMapIndex, endMapIndex));
} else {
hdfsInputStream =
ShuffleClient.getHdfsFs(conf).open(new Path(location.getStorageInfo().getFilePath()));
dfsInputStream = hadoopFs.open(new Path(location.getStorageInfo().getFilePath()));
chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
}
logger.debug(
Expand All @@ -138,8 +146,7 @@ private List<Long> getChunkOffsetsFromUnsortedIndex(CelebornConf conf, Partition
throws IOException {
List<Long> offsets;
try (FSDataInputStream indexInputStream =
ShuffleClient.getHdfsFs(conf)
.open(new Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) {
hadoopFs.open(new Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) {
offsets = new ArrayList<>();
int offsetCount = indexInputStream.readInt();
for (int i = 0; i < offsetCount; i++) {
Expand All @@ -154,10 +161,9 @@ private List<Long> getChunkOffsetsFromSortedIndex(
throws IOException {
String indexPath = Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
List<Long> offsets;
try (FSDataInputStream indexInputStream =
ShuffleClient.getHdfsFs(conf).open(new Path(indexPath))) {
try (FSDataInputStream indexInputStream = hadoopFs.open(new Path(indexPath))) {
logger.debug("read sorted index {}", indexPath);
long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new Path(indexPath)).getLen();
long indexSize = hadoopFs.getFileStatus(new Path(indexPath)).getLen();
// Index size won't be large, so it's safe to do the conversion.
byte[] indexBuffer = new byte[(int) indexSize];
indexInputStream.readFully(0L, indexBuffer);
Expand Down Expand Up @@ -196,24 +202,22 @@ public ByteBuf next() throws IOException, InterruptedException {
logger.debug("read {} offset {} length {}", currentChunkIndex, offset, length);
byte[] buffer = new byte[(int) length];
try {
hdfsInputStream.readFully(offset, buffer);
dfsInputStream.readFully(offset, buffer);
} catch (IOException e) {
logger.warn(
"read HDFS {} failed will retry, error detail {}",
"read DFS {} failed will retry, error detail {}",
location.getStorageInfo().getFilePath(),
e);
try {
hdfsInputStream.close();
hdfsInputStream =
ShuffleClient.getHdfsFs(conf)
.open(
new Path(
Utils.getSortedFilePath(
location.getStorageInfo().getFilePath())));
hdfsInputStream.readFully(offset, buffer);
dfsInputStream.close();
dfsInputStream =
hadoopFs.open(
new Path(
Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
dfsInputStream.readFully(offset, buffer);
} catch (IOException ex) {
logger.warn(
"retry read HDFS {} failed, error detail {} ",
"retry read DFS {} failed, error detail {} ",
location.getStorageInfo().getFilePath(),
e);
exception.set(ex);
Expand Down Expand Up @@ -261,9 +265,9 @@ public void close() {
fetchThread.shutdownNow();
}
try {
hdfsInputStream.close();
dfsInputStream.close();
} catch (IOException e) {
logger.warn("close HDFS input stream failed.", e);
logger.warn("close DFS input stream failed.", e);
}
if (results.size() > 0) {
results.forEach(ReferenceCounted::release);
Expand Down
20 changes: 20 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,5 +204,25 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hadoop-aws</id>
<activation>
<property>
<name>hadoop-aws-deps</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -96,41 +96,41 @@ public String getIndexPath() {
return Utils.getIndexFilePath(filePath);
}

public Path getHdfsPath() {
public Path getDfsPath() {
return new Path(filePath);
}

public Path getHdfsIndexPath() {
public Path getDfsIndexPath() {
return new Path(Utils.getIndexFilePath(filePath));
}

public Path getHdfsSortedPath() {
public Path getDfsSortedPath() {
return new Path(Utils.getSortedFilePath(filePath));
}

public Path getHdfsWriterSuccessPath() {
public Path getDfsWriterSuccessPath() {
return new Path(Utils.getWriteSuccessFilePath(filePath));
}

public Path getHdfsPeerWriterSuccessPath() {
public Path getDfsPeerWriterSuccessPath() {
return new Path(Utils.getWriteSuccessFilePath(Utils.getPeerPath(filePath)));
}

public void deleteAllFiles(FileSystem hdfsFs) {
if (isHdfs()) {
public void deleteAllFiles(FileSystem dfsFs) {
if (isDFS()) {
try {
hdfsFs.delete(getHdfsPath(), false);
hdfsFs.delete(getHdfsWriterSuccessPath(), false);
hdfsFs.delete(getHdfsIndexPath(), false);
hdfsFs.delete(getHdfsSortedPath(), false);
dfsFs.delete(getDfsPath(), false);
dfsFs.delete(getDfsWriterSuccessPath(), false);
dfsFs.delete(getDfsIndexPath(), false);
dfsFs.delete(getDfsSortedPath(), false);
} catch (Exception e) {
// ignore delete exceptions because some other workers might be deleting the directory
logger.debug(
"delete HDFS file {},{},{},{} failed {}",
getHdfsPath(),
getHdfsWriterSuccessPath(),
getHdfsIndexPath(),
getHdfsSortedPath(),
"delete DFS file {},{},{},{} failed {}",
getDfsPath(),
getDfsWriterSuccessPath(),
getDfsIndexPath(),
getDfsSortedPath(),
e);
}
} else {
Expand All @@ -151,4 +151,12 @@ public void setMountPoint(String mountPoint) {
public boolean isHdfs() {
return Utils.isHdfsPath(filePath);
}

public boolean isS3() {
return Utils.isS3Path(filePath);
}

public boolean isDFS() {
return Utils.isS3Path(filePath) || Utils.isHdfsPath(filePath);
}
}
Loading

0 comments on commit 7a596bb

Please sign in to comment.