From 7a596bbed1f94ef0ad983b51b9f4ebe41053075e Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Wed, 24 Jul 2024 11:59:15 +0800 Subject: [PATCH] [CELEBORN-1469] Support writing shuffle data to OSS(S3 only) ### What changes were proposed in this pull request? as title ### Why are the changes needed? Now, Celeborn doesn't support sinking shuffle data directly to Amazon S3, which could be a limitation when we're trying to move on-premises servers to AWS and use S3 as a data sink for shuffled data. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Closes #2579 from zhaohehuhu/dev-0619. Authored-by: zhaohehuhu Signed-off-by: mingji --- LICENSE-binary | 2 + NOTICE-binary | 4 + build/make-distribution.sh | 10 +- .../apache/celeborn/client/ShuffleClient.java | 18 +-- .../client/read/DfsPartitionReader.java | 54 ++++---- common/pom.xml | 20 +++ .../celeborn/common/meta/DiskFileInfo.java | 40 +++--- .../celeborn/common/protocol/StorageInfo.java | 21 ++- common/src/main/proto/TransportMessages.proto | 2 +- .../apache/celeborn/common/CelebornConf.scala | 82 ++++++++++- .../celeborn/common/meta/WorkerInfo.scala | 62 +++++---- .../protocol/message/ControlMessages.scala | 10 +- .../common/util/CelebornHadoopUtils.scala | 36 ++++- .../apache/celeborn/common/util/Utils.scala | 7 +- dev/deps/dependencies-server | 2 + docs/configuration/client.md | 4 + docs/configuration/master.md | 5 + docs/configuration/worker.md | 6 + master/pom.xml | 20 +++ .../service/deploy/master/SlotsAllocator.java | 20 ++- .../clustermeta/AbstractMetaManager.java | 8 +- .../service/deploy/master/Master.scala | 49 ++++--- .../src/main/openapi3/worker_rest_v1.yaml | 1 + pom.xml | 20 ++- project/CelebornBuild.scala | 12 +- .../server/common/http/api/v1/ApiUtils.scala | 2 + .../deploy/worker/memory/MemoryManager.java | 2 +- .../storage/MapPartitionDataWriter.java | 45 +++--- .../worker/storage/PartitionDataWriter.java | 45 +++--- .../worker/storage/PartitionFilesSorter.java | 102 ++++++++------ .../storage/ReducePartitionDataWriter.java | 13 +- .../service/deploy/worker/Controller.scala | 2 +- .../service/deploy/worker/FetchHandler.scala | 6 + .../deploy/worker/PushDataHandler.scala | 5 +- .../service/deploy/worker/Worker.scala | 2 +- .../deploy/worker/storage/FlushTask.scala | 35 ++++- .../deploy/worker/storage/Flusher.scala | 19 +++ .../worker/storage/StorageManager.scala | 131 ++++++++++++++---- .../deploy/worker/storage/StoragePolicy.scala | 2 +- ...MemoryReducePartitionDataWriterSuiteJ.java | 2 +- 40 files changed, 685 insertions(+), 243 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 221f632dd8..eabd3db4a2 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -263,6 +263,7 @@ org.apache.commons:commons-crypto org.apache.commons:commons-lang3 org.apache.hadoop:hadoop-client-api org.apache.hadoop:hadoop-client-runtime +org.apache.hadoop:hadoop-aws org.apache.ibatis:mybatis org.apache.logging.log4j:log4j-1.2-api org.apache.logging.log4j:log4j-api @@ -307,6 +308,7 @@ org.slf4j:jcl-over-slf4j org.webjars:swagger-ui org.xerial.snappy:snappy-java org.yaml:snakeyaml +com.amazonaws:aws-java-sdk-bundle ------------------------------------------------------------------------------------ This product bundles various third-party components under other open source licenses. diff --git a/NOTICE-binary b/NOTICE-binary index e4f0de0afb..9942e14406 100644 --- a/NOTICE-binary +++ b/NOTICE-binary @@ -202,3 +202,7 @@ Copyright (c) 2022 Luke Hutchison mimepool Copyright (c) 2018, 2022 Oracle and/or its affiliates. + + +aws-java-sdk +Copyright 2010-2024 Amazon.com, Inc. or its affiliates. \ No newline at end of file diff --git a/build/make-distribution.sh b/build/make-distribution.sh index e5e39ec6ef..1f093bd129 100755 --- a/build/make-distribution.sh +++ b/build/make-distribution.sh @@ -27,6 +27,7 @@ RELEASE="false" MVN="$PROJECT_DIR/build/mvn" SBT="$PROJECT_DIR/build/sbt" SBT_ENABLED="false" +HADOOP_AWS_ENABLED="false" function exit_with_usage { echo "make-distribution.sh - tool for making binary distributions of Celeborn" @@ -62,6 +63,11 @@ while (( "$#" )); do echo "Error: $1 is not supported" exit_with_usage ;; + -P*) + if [[ "$1" == *"hadoop-aws"* ]]; then + HADOOP_AWS_ENABLED="true" + fi + ;; -*) break ;; @@ -256,7 +262,9 @@ function sbt_build_service { echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE" echo "Build flags: $@" >> "$DIST_DIR/RELEASE" - + if [[ "$HADOOP_AWS_ENABLED" == "true" ]]; then + export SBT_MAVEN_PROFILES="hadoop-aws" + fi BUILD_COMMAND=("$SBT" clean package) # Actually build the jar diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index 0ea484deb8..0738368ff6 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.LongAdder; @@ -34,6 +35,7 @@ import org.apache.celeborn.common.network.client.TransportClientFactory; import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.common.protocol.PbStreamHandler; +import org.apache.celeborn.common.protocol.StorageInfo; import org.apache.celeborn.common.rpc.RpcEndpointRef; import org.apache.celeborn.common.util.CelebornHadoopUtils; import org.apache.celeborn.common.util.ExceptionMaker; @@ -47,7 +49,7 @@ public abstract class ShuffleClient { private static Logger logger = LoggerFactory.getLogger(ShuffleClient.class); private static volatile ShuffleClient _instance; private static volatile boolean initialized = false; - private static volatile FileSystem hdfsFs; + private static volatile Map hadoopFs; private static LongAdder totalReadCounter = new LongAdder(); private static LongAdder localShuffleReadCounter = new LongAdder(); @@ -55,7 +57,7 @@ public abstract class ShuffleClient { public static void reset() { _instance = null; initialized = false; - hdfsFs = null; + hadoopFs = null; } protected ShuffleClient() {} @@ -101,19 +103,19 @@ public static ShuffleClient get( return _instance; } - public static FileSystem getHdfsFs(CelebornConf conf) { - if (null == hdfsFs) { + public static Map getHadoopFs(CelebornConf conf) { + if (null == hadoopFs) { synchronized (ShuffleClient.class) { - if (null == hdfsFs) { + if (null == hadoopFs) { try { - hdfsFs = CelebornHadoopUtils.getHadoopFS(conf); + hadoopFs = CelebornHadoopUtils.getHadoopFS(conf); } catch (Exception e) { - logger.error("Celeborn initialize HDFS failed.", e); + logger.error("Celeborn initialize DFS failed.", e); } } } } - return hdfsFs; + return hadoopFs; } public static void incrementLocalReadCounter() { diff --git a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java index 712bd82e4f..b69cf580fa 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java @@ -30,6 +30,7 @@ import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCounted; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,7 @@ import org.apache.celeborn.common.protocol.PbBufferStreamEnd; import org.apache.celeborn.common.protocol.PbOpenStream; import org.apache.celeborn.common.protocol.PbStreamHandler; +import org.apache.celeborn.common.protocol.StorageInfo; import org.apache.celeborn.common.protocol.StreamType; import org.apache.celeborn.common.util.ShuffleBlockInfoUtils; import org.apache.celeborn.common.util.ThreadUtils; @@ -60,7 +62,7 @@ public class DfsPartitionReader implements PartitionReader { private volatile boolean closed = false; private ExecutorService fetchThread; private boolean fetchThreadStarted; - private FSDataInputStream hdfsInputStream; + private FSDataInputStream dfsInputStream; private int numChunks = 0; private int returnedChunks = 0; private int currentChunkIndex = 0; @@ -68,6 +70,7 @@ public class DfsPartitionReader implements PartitionReader { private TransportClient client; private PbStreamHandler streamHandler; private MetricsCallback metricsCallback; + private FileSystem hadoopFs; public DfsPartitionReader( CelebornConf conf, @@ -85,6 +88,12 @@ public DfsPartitionReader( this.metricsCallback = metricsCallback; this.location = location; + if (location.getStorageInfo() != null + && location.getStorageInfo().getType() == StorageInfo.Type.S3) { + this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3); + } else { + this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS); + } if (endMapIndex != Integer.MAX_VALUE) { long fetchTimeoutMs = conf.clientFetchTimeoutMs(); @@ -105,18 +114,17 @@ public DfsPartitionReader( // Parse this message to ensure sort is done. } catch (IOException | InterruptedException e) { throw new IOException( - "read shuffle file from HDFS failed, filePath: " + "read shuffle file from DFS failed, filePath: " + location.getStorageInfo().getFilePath(), e); } - hdfsInputStream = - ShuffleClient.getHdfsFs(conf) - .open(new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath()))); + + dfsInputStream = + hadoopFs.open(new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath()))); chunkOffsets.addAll( getChunkOffsetsFromSortedIndex(conf, location, startMapIndex, endMapIndex)); } else { - hdfsInputStream = - ShuffleClient.getHdfsFs(conf).open(new Path(location.getStorageInfo().getFilePath())); + dfsInputStream = hadoopFs.open(new Path(location.getStorageInfo().getFilePath())); chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location)); } logger.debug( @@ -138,8 +146,7 @@ private List getChunkOffsetsFromUnsortedIndex(CelebornConf conf, Partition throws IOException { List offsets; try (FSDataInputStream indexInputStream = - ShuffleClient.getHdfsFs(conf) - .open(new Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) { + hadoopFs.open(new Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) { offsets = new ArrayList<>(); int offsetCount = indexInputStream.readInt(); for (int i = 0; i < offsetCount; i++) { @@ -154,10 +161,9 @@ private List getChunkOffsetsFromSortedIndex( throws IOException { String indexPath = Utils.getIndexFilePath(location.getStorageInfo().getFilePath()); List offsets; - try (FSDataInputStream indexInputStream = - ShuffleClient.getHdfsFs(conf).open(new Path(indexPath))) { + try (FSDataInputStream indexInputStream = hadoopFs.open(new Path(indexPath))) { logger.debug("read sorted index {}", indexPath); - long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new Path(indexPath)).getLen(); + long indexSize = hadoopFs.getFileStatus(new Path(indexPath)).getLen(); // Index size won't be large, so it's safe to do the conversion. byte[] indexBuffer = new byte[(int) indexSize]; indexInputStream.readFully(0L, indexBuffer); @@ -196,24 +202,22 @@ public ByteBuf next() throws IOException, InterruptedException { logger.debug("read {} offset {} length {}", currentChunkIndex, offset, length); byte[] buffer = new byte[(int) length]; try { - hdfsInputStream.readFully(offset, buffer); + dfsInputStream.readFully(offset, buffer); } catch (IOException e) { logger.warn( - "read HDFS {} failed will retry, error detail {}", + "read DFS {} failed will retry, error detail {}", location.getStorageInfo().getFilePath(), e); try { - hdfsInputStream.close(); - hdfsInputStream = - ShuffleClient.getHdfsFs(conf) - .open( - new Path( - Utils.getSortedFilePath( - location.getStorageInfo().getFilePath()))); - hdfsInputStream.readFully(offset, buffer); + dfsInputStream.close(); + dfsInputStream = + hadoopFs.open( + new Path( + Utils.getSortedFilePath(location.getStorageInfo().getFilePath()))); + dfsInputStream.readFully(offset, buffer); } catch (IOException ex) { logger.warn( - "retry read HDFS {} failed, error detail {} ", + "retry read DFS {} failed, error detail {} ", location.getStorageInfo().getFilePath(), e); exception.set(ex); @@ -261,9 +265,9 @@ public void close() { fetchThread.shutdownNow(); } try { - hdfsInputStream.close(); + dfsInputStream.close(); } catch (IOException e) { - logger.warn("close HDFS input stream failed.", e); + logger.warn("close DFS input stream failed.", e); } if (results.size() > 0) { results.forEach(ReferenceCounted::release); diff --git a/common/pom.xml b/common/pom.xml index 29baba769b..9db43a17b1 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -204,5 +204,25 @@ + + hadoop-aws + + + hadoop-aws-deps + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.amazonaws + aws-java-sdk-bundle + ${aws.version} + + + diff --git a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java index ce99192c6e..51978ac2e9 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java @@ -96,41 +96,41 @@ public String getIndexPath() { return Utils.getIndexFilePath(filePath); } - public Path getHdfsPath() { + public Path getDfsPath() { return new Path(filePath); } - public Path getHdfsIndexPath() { + public Path getDfsIndexPath() { return new Path(Utils.getIndexFilePath(filePath)); } - public Path getHdfsSortedPath() { + public Path getDfsSortedPath() { return new Path(Utils.getSortedFilePath(filePath)); } - public Path getHdfsWriterSuccessPath() { + public Path getDfsWriterSuccessPath() { return new Path(Utils.getWriteSuccessFilePath(filePath)); } - public Path getHdfsPeerWriterSuccessPath() { + public Path getDfsPeerWriterSuccessPath() { return new Path(Utils.getWriteSuccessFilePath(Utils.getPeerPath(filePath))); } - public void deleteAllFiles(FileSystem hdfsFs) { - if (isHdfs()) { + public void deleteAllFiles(FileSystem dfsFs) { + if (isDFS()) { try { - hdfsFs.delete(getHdfsPath(), false); - hdfsFs.delete(getHdfsWriterSuccessPath(), false); - hdfsFs.delete(getHdfsIndexPath(), false); - hdfsFs.delete(getHdfsSortedPath(), false); + dfsFs.delete(getDfsPath(), false); + dfsFs.delete(getDfsWriterSuccessPath(), false); + dfsFs.delete(getDfsIndexPath(), false); + dfsFs.delete(getDfsSortedPath(), false); } catch (Exception e) { // ignore delete exceptions because some other workers might be deleting the directory logger.debug( - "delete HDFS file {},{},{},{} failed {}", - getHdfsPath(), - getHdfsWriterSuccessPath(), - getHdfsIndexPath(), - getHdfsSortedPath(), + "delete DFS file {},{},{},{} failed {}", + getDfsPath(), + getDfsWriterSuccessPath(), + getDfsIndexPath(), + getDfsSortedPath(), e); } } else { @@ -151,4 +151,12 @@ public void setMountPoint(String mountPoint) { public boolean isHdfs() { return Utils.isHdfsPath(filePath); } + + public boolean isS3() { + return Utils.isS3Path(filePath); + } + + public boolean isDFS() { + return Utils.isS3Path(filePath) || Utils.isHdfsPath(filePath); + } } diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java index 621edb774f..28cb652565 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java @@ -26,7 +26,9 @@ public enum Type { HDD(1), SSD(2), HDFS(3), - OSS(4); + OSS(4), + S3(5); + private final int value; Type(int value) { @@ -54,6 +56,7 @@ public int getValue() { public static final int LOCAL_DISK_MASK = 0b10; public static final int HDFS_MASK = 0b100; public static final int OSS_MASK = 0b1000; + public static final int S3_MASK = 0b10000; public static final int ALL_TYPES_AVAILABLE_MASK = 0; // Default storage Type is MEMORY. @@ -162,15 +165,28 @@ public boolean HDFSOnly() { return StorageInfo.HDFSOnly(availableStorageTypes); } + public static boolean S3Only(int availableStorageTypes) { + return availableStorageTypes == S3_MASK; + } + public static boolean OSSAvailable(int availableStorageTypes) { return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK || (availableStorageTypes & OSS_MASK) > 0; } + public static boolean S3Available(int availableStorageTypes) { + return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK + || (availableStorageTypes & S3_MASK) > 0; + } + public boolean OSSAvailable() { return StorageInfo.OSSAvailable(availableStorageTypes); } + public boolean S3Available() { + return StorageInfo.S3Available(availableStorageTypes); + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -232,6 +248,9 @@ public static int getAvailableTypes(List types) { case OSS: ava = ava | OSS_MASK; break; + case S3: + ava = ava | S3_MASK; + break; } } return ava; diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index f8de50486a..2b8432c44c 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -70,7 +70,7 @@ enum MessageType { PARTITION_SPLIT = 47; REGISTER_MAP_PARTITION_TASK = 48; HEARTBEAT_FROM_APPLICATION_RESPONSE = 49; - CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT = 50; + CHECK_FOR_DFS_EXPIRED_DIRS_TIMEOUT = 50; OPEN_STREAM = 51; STREAM_HANDLER = 52; CHECK_WORKERS_AVAILABLE = 53; diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 9f05c96bf8..ff21a311f6 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -26,6 +26,7 @@ import scala.collection.mutable import scala.concurrent.duration._ import scala.util.Try +import org.apache.celeborn.common.CelebornConf.{MASTER_INTERNAL_ENDPOINTS, S3_ACCESS_KEY, S3_DIR, S3_SECRET_KEY} import org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl import org.apache.celeborn.common.identity.{DefaultIdentityProvider, IdentityProvider} import org.apache.celeborn.common.internal.Logging @@ -641,6 +642,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se } def hasHDFSStorage: Boolean = get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) && get(HDFS_DIR).isDefined + def hasS3Storage: Boolean = + get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.S3.name()) && get(S3_DIR).isDefined def masterSlotAssignLoadAwareDiskGroupNum: Int = get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM) def masterSlotAssignLoadAwareDiskGroupGradient: Double = get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT) @@ -886,6 +889,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientCommitFilesIgnoreExcludedWorkers: Boolean = get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS) def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT) def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT) + def dfsExpireDirsTimeoutMS: Long = get(DFS_EXPIRE_DIRS_TIMEOUT) def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL) def applicationUnregisterEnabled: Boolean = get(APPLICATION_UNREGISTER_ENABLED) @@ -1093,7 +1097,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se (dir, maxCapacity, flushThread, diskType) } }.getOrElse { - if (!hasHDFSStorage) { + if (!hasHDFSStorage && !hasS3Storage) { val prefix = workerStorageBaseDirPrefix val number = workerStorageBaseDirNumber (1 to number).map { i => @@ -1108,6 +1112,24 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def partitionSplitMinimumSize: Long = get(WORKER_PARTITION_SPLIT_MIN_SIZE) def partitionSplitMaximumSize: Long = get(WORKER_PARTITION_SPLIT_MAX_SIZE) + def s3AccessKey: String = get(S3_ACCESS_KEY).getOrElse("") + + def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("") + + def s3Endpoint: String = get(S3_ENDPOINT).getOrElse("") + + def s3Dir: String = { + get(S3_DIR).map { + s3Dir => + if (!Utils.isS3Path(s3Dir)) { + log.error(s"${S3_DIR.key} configuration is wrong $s3Dir. Disable S3 support.") + "" + } else { + s3Dir + } + }.getOrElse("") + } + def hdfsDir: String = { get(HDFS_DIR).map { hdfsDir => @@ -1177,10 +1199,12 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se // ////////////////////////////////////////////////////// def workerFlusherBufferSize: Long = get(WORKER_FLUSHER_BUFFER_SIZE) def workerHdfsFlusherBufferSize: Long = get(WORKER_HDFS_FLUSHER_BUFFER_SIZE) + def workerS3FlusherBufferSize: Long = get(WORKER_S3_FLUSHER_BUFFER_SIZE) def workerWriterCloseTimeoutMs: Long = get(WORKER_WRITER_CLOSE_TIMEOUT) def workerHddFlusherThreads: Int = get(WORKER_FLUSHER_HDD_THREADS) def workerSsdFlusherThreads: Int = get(WORKER_FLUSHER_SSD_THREADS) def workerHdfsFlusherThreads: Int = get(WORKER_FLUSHER_HDFS_THREADS) + def workerS3FlusherThreads: Int = get(WORKER_FLUSHER_S3_THREADS) def workerCreateWriterMaxAttempts: Int = get(WORKER_WRITER_CREATE_MAX_ATTEMPTS) // ////////////////////////////////////////////////////// @@ -2134,6 +2158,14 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1h") + val DFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] = + buildConf("celeborn.master.dfs.expireDirs.timeout") + .categories("master") + .version("0.6.0") + .doc("The timeout for a expire dirs to be deleted on S3 or HDFS.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1h") + val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.master.heartbeat.worker.timeout") .withAlternative("celeborn.worker.heartbeat.timeout") @@ -2801,6 +2833,38 @@ object CelebornConf extends Logging { .stringConf .createOptional + val S3_DIR: OptionalConfigEntry[String] = + buildConf("celeborn.storage.s3.dir") + .categories("worker", "master", "client") + .version("0.6.0") + .doc("S3 base directory for Celeborn to store shuffle data.") + .stringConf + .createOptional + + val S3_SECRET_KEY: OptionalConfigEntry[String] = + buildConf("celeborn.storage.s3.secret.key") + .categories("worker", "master", "client") + .version("0.6.0") + .doc("S3 secret key for Celeborn to store shuffle data.") + .stringConf + .createOptional + + val S3_ACCESS_KEY: OptionalConfigEntry[String] = + buildConf("celeborn.storage.s3.access.key") + .categories("worker", "master", "client") + .version("0.6.0") + .doc("S3 access key for Celeborn to store shuffle data.") + .stringConf + .createOptional + + val S3_ENDPOINT: OptionalConfigEntry[String] = + buildConf("celeborn.storage.s3.endpoint") + .categories("worker", "master", "client") + .version("0.6.0") + .doc("S3 endpoint for Celeborn to store shuffle data.") + .stringConf + .createOptional + val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] = buildConf("celeborn.worker.storage.disk.reserve.size") .withAlternative("celeborn.worker.disk.reserve.size") @@ -3219,6 +3283,14 @@ object CelebornConf extends Logging { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("4m") + val WORKER_S3_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] = + buildConf("celeborn.worker.flusher.s3.buffer.size") + .categories("worker") + .version("0.6.0") + .doc("Size of buffer used by a S3 flusher.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("4m") + val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.worker.writer.close.timeout") .categories("worker") @@ -3259,6 +3331,14 @@ object CelebornConf extends Logging { .intConf .createWithDefault(8) + val WORKER_FLUSHER_S3_THREADS: ConfigEntry[Int] = + buildConf("celeborn.worker.flusher.s3.threads") + .categories("worker") + .doc("Flusher's thread count used for write data to S3.") + .version("0.6.0") + .intConf + .createWithDefault(8) + val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.worker.flusher.shutdownTimeout") .categories("worker") diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala index a0eedc3f9d..bc801b7c80 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala @@ -46,7 +46,8 @@ class WorkerInfo( var lastHeartbeat: Long = 0 var workerStatus = WorkerStatus.normalWorkerStatus() val diskInfos = - if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String, DiskInfo](_diskInfos) else null + if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String, DiskInfo](_diskInfos) + else null val userResourceConsumption = if (_userResourceConsumption != null) JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](_userResourceConsumption) @@ -198,40 +199,41 @@ class WorkerInfo( def updateThenGetDiskInfos( newDiskInfos: java.util.Map[String, DiskInfo], - estimatedPartitionSize: Option[Long] = None): util.Map[String, DiskInfo] = this.synchronized { - import scala.collection.JavaConverters._ - for (newDisk <- newDiskInfos.values().asScala) { - val mountPoint: String = newDisk.mountPoint - val curDisk = diskInfos.get(mountPoint) - if (curDisk != null) { - curDisk.actualUsableSpace = newDisk.actualUsableSpace - curDisk.totalSpace = newDisk.totalSpace - // Update master's diskinfo activeslots to worker's value - curDisk.activeSlots = newDisk.activeSlots - curDisk.avgFlushTime = newDisk.avgFlushTime - curDisk.avgFetchTime = newDisk.avgFetchTime - if (estimatedPartitionSize.nonEmpty && curDisk.storageType != StorageInfo.Type.HDFS) { - curDisk.maxSlots = curDisk.actualUsableSpace / estimatedPartitionSize.get - } - curDisk.setStatus(newDisk.status) - } else { - if (estimatedPartitionSize.nonEmpty && newDisk.storageType != StorageInfo.Type.HDFS) { - newDisk.maxSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get + estimatedPartitionSize: Option[Long] = None): util.Map[String, DiskInfo] = + this.synchronized { + import scala.collection.JavaConverters._ + for (newDisk <- newDiskInfos.values().asScala) { + val mountPoint: String = newDisk.mountPoint + val curDisk = diskInfos.get(mountPoint) + if (curDisk != null) { + curDisk.actualUsableSpace = newDisk.actualUsableSpace + curDisk.totalSpace = newDisk.totalSpace + // Update master's diskinfo activeslots to worker's value + curDisk.activeSlots = newDisk.activeSlots + curDisk.avgFlushTime = newDisk.avgFlushTime + curDisk.avgFetchTime = newDisk.avgFetchTime + if (estimatedPartitionSize.nonEmpty && curDisk.storageType != StorageInfo.Type.HDFS && curDisk.storageType != StorageInfo.Type.S3) { + curDisk.maxSlots = curDisk.actualUsableSpace / estimatedPartitionSize.get + } + curDisk.setStatus(newDisk.status) + } else { + if (estimatedPartitionSize.nonEmpty && newDisk.storageType != StorageInfo.Type.HDFS && newDisk.storageType != StorageInfo.Type.S3) { + newDisk.maxSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get + } + diskInfos.put(mountPoint, newDisk) } - diskInfos.put(mountPoint, newDisk) } - } - val nonExistsMountPoints: java.util.Set[String] = new util.HashSet[String] - nonExistsMountPoints.addAll(diskInfos.keySet) - nonExistsMountPoints.removeAll(newDiskInfos.keySet) - if (!nonExistsMountPoints.isEmpty) { - for (nonExistsMountPoint <- nonExistsMountPoints.asScala) { - diskInfos.remove(nonExistsMountPoint) + val nonExistsMountPoints: java.util.Set[String] = new util.HashSet[String] + nonExistsMountPoints.addAll(diskInfos.keySet) + nonExistsMountPoints.removeAll(newDiskInfos.keySet) + if (!nonExistsMountPoints.isEmpty) { + for (nonExistsMountPoint <- nonExistsMountPoints.asScala) { + diskInfos.remove(nonExistsMountPoint) + } } + JavaUtils.newConcurrentHashMap[String, DiskInfo](diskInfos) } - JavaUtils.newConcurrentHashMap[String, DiskInfo](diskInfos) - } def updateThenGetUserResourceConsumption(resourceConsumptions: util.Map[ UserIdentifier, diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 94d4a22a85..c94744bb43 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -68,7 +68,7 @@ object ControlMessages extends Logging { case object CheckForWorkerUnavailableInfoTimeout extends Message - case object CheckForHDFSExpiredDirsTimeout extends Message + case object CheckForDFSExpiredDirsTimeout extends Message case object RemoveExpiredShuffle extends Message @@ -509,8 +509,8 @@ object ControlMessages extends Logging { case CheckForApplicationTimeOut => new TransportMessage(MessageType.CHECK_APPLICATION_TIMEOUT, null) - case CheckForHDFSExpiredDirsTimeout => - new TransportMessage(MessageType.CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT, null) + case CheckForDFSExpiredDirsTimeout => + new TransportMessage(MessageType.CHECK_FOR_DFS_EXPIRED_DIRS_TIMEOUT, null) case RemoveExpiredShuffle => new TransportMessage(MessageType.REMOVE_EXPIRED_SHUFFLE, null) @@ -1263,8 +1263,8 @@ object ControlMessages extends Logging { case CHECK_APPLICATION_TIMEOUT_VALUE => CheckForApplicationTimeOut - case CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT_VALUE => - CheckForHDFSExpiredDirsTimeout + case CHECK_FOR_DFS_EXPIRED_DIRS_TIMEOUT_VALUE => + CheckForDFSExpiredDirsTimeout case WORKER_LOST_VALUE => PbWorkerLost.parseFrom(message.getPayload) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index ae07187e7f..cf15709a76 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -20,6 +20,8 @@ package org.apache.celeborn.common.util import java.io.{File, IOException} import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.UserGroupInformation @@ -27,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.exception.CelebornException import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.protocol.StorageInfo object CelebornHadoopUtils extends Logging { private var logPrinted = new AtomicBoolean(false) @@ -46,6 +49,20 @@ object CelebornHadoopUtils extends Logging { "prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'") } } + + if (conf.s3Dir.nonEmpty) { + if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || conf.s3Endpoint.isEmpty) { + throw new CelebornException( + "S3 storage is enabled but s3AccessKey, s3SecretKey, or s3Endpoint is not set") + } + hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + hadoopConf.set( + "fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") + hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey) + hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey) + hadoopConf.set("fs.s3a.endpoint", conf.s3Endpoint) + } appendSparkHadoopConfigs(conf, hadoopConf) hadoopConf } @@ -57,22 +74,31 @@ object CelebornHadoopUtils extends Logging { } } - def getHadoopFS(conf: CelebornConf): FileSystem = { + def getHadoopFS(conf: CelebornConf): java.util.Map[StorageInfo.Type, FileSystem] = { val hadoopConf = newConfiguration(conf) initKerberos(conf, hadoopConf) - new Path(conf.hdfsDir).getFileSystem(hadoopConf) + val hadoopFs = new java.util.HashMap[StorageInfo.Type, FileSystem]() + if (conf.hasHDFSStorage) { + val hdfsDir = new Path(conf.hdfsDir) + hadoopFs.put(StorageInfo.Type.HDFS, hdfsDir.getFileSystem(hadoopConf)) + } + if (conf.hasS3Storage) { + val s3Dir = new Path(conf.s3Dir) + hadoopFs.put(StorageInfo.Type.S3, s3Dir.getFileSystem(hadoopConf)) + } + hadoopFs } - def deleteHDFSPathOrLogError(hadoopFs: FileSystem, path: Path, recursive: Boolean): Unit = { + def deleteDFSPathOrLogError(hadoopFs: FileSystem, path: Path, recursive: Boolean): Unit = { try { val startTime = System.currentTimeMillis() hadoopFs.delete(path, recursive) logInfo( - s"Delete HDFS ${path}(recursive=$recursive) costs " + + s"Delete DFS ${path}(recursive=$recursive) costs " + Utils.msDurationToString(System.currentTimeMillis() - startTime)) } catch { case e: IOException => - logError(s"Failed to delete HDFS ${path}(recursive=$recursive) due to: ", e) + logError(s"Failed to delete DFS ${path}(recursive=$recursive) due to: ", e) } } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 494c77ab9f..d390561d7e 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -1119,7 +1119,8 @@ object Utils extends Logging { val SORTED_SUFFIX = ".sorted" val INDEX_SUFFIX = ".index" val SUFFIX_HDFS_WRITE_SUCCESS = ".success" - val COMPATIBLE_HDFS_REGEX = "^[a-zA-Z0-9]+://.*" + val COMPATIBLE_HDFS_REGEX = "^(?!s3a://)[a-zA-Z0-9]+://.*" + val S3_REGEX = "^s3[a]?://([a-z0-9][a-z0-9-]{1,61}[a-z0-9])(/.*)?$" val UNKNOWN_APP_SHUFFLE_ID = -1 @@ -1127,6 +1128,10 @@ object Utils extends Logging { path.matches(COMPATIBLE_HDFS_REGEX) } + def isS3Path(path: String): Boolean = { + path.matches(S3_REGEX) + } + def getSortedFilePath(path: String): String = { path + SORTED_SUFFIX } diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server index 4e305005e9..521e9872d5 100644 --- a/dev/deps/dependencies-server +++ b/dev/deps/dependencies-server @@ -19,6 +19,7 @@ HikariCP/4.0.3//HikariCP-4.0.3.jar RoaringBitmap/1.0.6//RoaringBitmap-1.0.6.jar aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar +aws-java-sdk-bundle/1.12.367//aws-java-sdk-bundle-1.12.367.jar classgraph/4.8.138//classgraph-4.8.138.jar commons-cli/1.5.0//commons-cli-1.5.0.jar commons-crypto/1.0.0//commons-crypto-1.0.0.jar @@ -27,6 +28,7 @@ commons-lang3/3.13.0//commons-lang3-3.13.0.jar commons-logging/1.1.3//commons-logging-1.1.3.jar failureaccess/1.0.2//failureaccess-1.0.2.jar guava/33.1.0-jre//guava-33.1.0-jre.jar +hadoop-aws/3.3.6//hadoop-aws-3.3.6.jar hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar hk2-api/2.6.1//hk2-api-2.6.1.jar diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 947a79db69..eda8edce21 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -122,4 +122,8 @@ license: | | celeborn.quota.identity.user-specific.userName | default | false | User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | | | celeborn.storage.availableTypes | HDD | false | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | celeborn.storage.activeTypes | | celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | | +| celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 5c5bd8ac2a..11d2e09f3b 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -34,6 +34,7 @@ license: | | celeborn.dynamicConfig.store.fs.path | <undefined> | false | The path of dynamic config file for fs store backend. The file format should be yaml. The default path is `${CELEBORN_CONF_DIR}/dynamicConfig.yaml`. | 0.5.0 | | | celeborn.internal.port.enabled | false | false | Whether to create a internal port on Masters/Workers for inter-Masters/Workers communication. This is beneficial when SASL authentication is enforced for all interactions between clients and Celeborn Services, but the services can exchange messages without being subject to SASL authentication. | 0.5.0 | | | celeborn.logConf.enabled | false | false | When `true`, log the CelebornConf for debugging purposes. | 0.5.0 | | +| celeborn.master.dfs.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on S3 or HDFS. | 0.6.0 | | | celeborn.master.estimatedPartitionSize.initialSize | 64mb | false | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize | | celeborn.master.estimatedPartitionSize.maxSize | <undefined> | false | Max partition size for estimation. Default value should be celeborn.worker.shuffle.partitionSplit.max * 2. | 0.4.1 | | | celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate | @@ -74,4 +75,8 @@ license: | | celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | | | celeborn.storage.hdfs.kerberos.keytab | <undefined> | false | Kerberos keytab file path for HDFS storage connection. | 0.3.2 | | | celeborn.storage.hdfs.kerberos.principal | <undefined> | false | Kerberos principal for HDFS storage connection. | 0.3.2 | | +| celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 2b6ec3df82..ed3d15454e 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -44,6 +44,10 @@ license: | | celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | | | celeborn.storage.hdfs.kerberos.keytab | <undefined> | false | Kerberos keytab file path for HDFS storage connection. | 0.3.2 | | | celeborn.storage.hdfs.kerberos.principal | <undefined> | false | Kerberos principal for HDFS storage connection. | 0.3.2 | | +| celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.worker.activeConnection.max | <undefined> | false | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 | | | celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size of the application registry on Workers. | 0.5.0 | | | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | false | Threads count for read buffer per mount point. | 0.3.0 | | @@ -74,6 +78,8 @@ license: | | celeborn.worker.flusher.hdd.threads | 1 | false | Flusher's thread count per disk used for write data to HDD disks. | 0.2.0 | | | celeborn.worker.flusher.hdfs.buffer.size | 4m | false | Size of buffer used by a HDFS flusher. | 0.3.0 | | | celeborn.worker.flusher.hdfs.threads | 8 | false | Flusher's thread count used for write data to HDFS. | 0.2.0 | | +| celeborn.worker.flusher.s3.buffer.size | 4m | false | Size of buffer used by a S3 flusher. | 0.6.0 | | +| celeborn.worker.flusher.s3.threads | 8 | false | Flusher's thread count used for write data to S3. | 0.6.0 | | | celeborn.worker.flusher.shutdownTimeout | 3s | false | Timeout for a flusher to shutdown. | 0.2.0 | | | celeborn.worker.flusher.ssd.threads | 16 | false | Flusher's thread count per disk used for write data to SSD disks. | 0.2.0 | | | celeborn.worker.flusher.threads | 16 | false | Flusher's thread count per disk for unknown-type disks. | 0.2.0 | | diff --git a/master/pom.xml b/master/pom.xml index abe0614739..a2573e277a 100644 --- a/master/pom.xml +++ b/master/pom.xml @@ -158,6 +158,26 @@ + + hadoop-aws + + + hadoop-aws-deps + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.amazonaws + aws-java-sdk-bundle + ${aws.version} + + + hadoop-2 diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java index ced728b2f0..eda5a3a2aa 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java @@ -73,7 +73,8 @@ static class UsableDiskInfo { for (Map.Entry diskInfoEntry : worker.diskInfos().entrySet()) { if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) { if (StorageInfo.localDiskAvailable(availableStorageTypes) - && diskInfoEntry.getValue().storageType() != StorageInfo.Type.HDFS) { + && diskInfoEntry.getValue().storageType() != StorageInfo.Type.HDFS + && diskInfoEntry.getValue().storageType() != StorageInfo.Type.S3) { usableDisks.add( new UsableDiskInfo( diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); @@ -82,6 +83,11 @@ static class UsableDiskInfo { usableDisks.add( new UsableDiskInfo( diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); + } else if (StorageInfo.S3Available(availableStorageTypes) + && diskInfoEntry.getValue().storageType() == StorageInfo.Type.S3) { + usableDisks.add( + new UsableDiskInfo( + diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); } } } @@ -123,6 +129,10 @@ static class UsableDiskInfo { return offerSlotsRoundRobin( workers, partitionIds, shouldReplicate, shouldRackAware, availableStorageTypes); } + if (StorageInfo.S3Only(availableStorageTypes)) { + return offerSlotsRoundRobin( + workers, partitionIds, shouldReplicate, shouldRackAware, availableStorageTypes); + } List usableDisks = new ArrayList<>(); Map diskToWorkerMap = new HashMap<>(); @@ -141,7 +151,8 @@ static class UsableDiskInfo { ? Option.empty() : Option.apply(diskReserveRatio.get())) && diskInfo.status().equals(DiskStatus.HEALTHY) - && diskInfo.storageType() != StorageInfo.Type.HDFS) { + && diskInfo.storageType() != StorageInfo.Type.HDFS + && diskInfo.storageType() != StorageInfo.Type.S3) { usableDisks.add(diskInfo); } })); @@ -198,6 +209,8 @@ private static StorageInfo getStorageInfo( DiskInfo selectedDiskInfo = usableDiskInfos.get(diskIndex).diskInfo; if (selectedDiskInfo.storageType() == StorageInfo.Type.HDFS) { storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, availableStorageTypes); + } else if (selectedDiskInfo.storageType() == StorageInfo.Type.S3) { + storageInfo = new StorageInfo("", StorageInfo.Type.S3, availableStorageTypes); } else { storageInfo = new StorageInfo( @@ -211,6 +224,7 @@ private static StorageInfo getStorageInfo( DiskInfo[] diskInfos = selectedWorker.diskInfos().values().stream() .filter(p -> p.storageType() != StorageInfo.Type.HDFS) + .filter(p -> p.storageType() != StorageInfo.Type.S3) .collect(Collectors.toList()) .toArray(new DiskInfo[0]); storageInfo = @@ -219,6 +233,8 @@ private static StorageInfo getStorageInfo( diskInfos[diskIndex].storageType(), availableStorageTypes); workerDiskIndex.put(selectedWorker, (diskIndex + 1) % diskInfos.length); + } else if (StorageInfo.S3Available(availableStorageTypes)) { + storageInfo = new StorageInfo("", StorageInfo.Type.S3, availableStorageTypes); } else { storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, availableStorageTypes); } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index f3ab34b671..371e891627 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -212,10 +212,14 @@ public void updateWorkerHeartbeatMeta( long healthyDiskNum = disks.values().stream().filter(s -> s.status().equals(DiskStatus.HEALTHY)).count(); if (!excludedWorkers.contains(worker) - && (((disks.isEmpty() || healthyDiskNum <= 0) && !conf.hasHDFSStorage()) || highWorkload)) { + && (((disks.isEmpty() || healthyDiskNum <= 0) + && (!conf.hasHDFSStorage()) + && (!conf.hasS3Storage())) + || highWorkload)) { LOG.debug("Worker: {} num total slots is 0, add to excluded list", worker); excludedWorkers.add(worker); - } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage()) && !highWorkload) { + } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage() || conf.hasS3Storage()) + && !highWorkload) { // only unblack if numSlots larger than 0 excludedWorkers.remove(worker); } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 2ce5fc83fd..1e99956e96 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -165,6 +165,7 @@ private[celeborn] class Master( private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _ private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _ private var checkForHDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _ + private var checkForS3RemnantDirsTimeOutTask: ScheduledFuture[_] = _ private val nonEagerHandler = ThreadUtils.newDaemonCachedThreadPool("master-noneager-handler", 64) // Config constants @@ -172,8 +173,9 @@ private[celeborn] class Master( private val appHeartbeatTimeoutMs = conf.appHeartbeatTimeoutMs private val workerUnavailableInfoExpireTimeoutMs = conf.workerUnavailableInfoExpireTimeout - private val hdfsExpireDirsTimeoutMS = conf.hdfsExpireDirsTimeoutMS + private val dfsExpireDirsTimeoutMS = conf.dfsExpireDirsTimeoutMS private val hasHDFSStorage = conf.hasHDFSStorage + private val hasS3Storage = conf.hasS3Storage private val quotaManager = new QuotaManager(conf, configService) private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval @@ -211,7 +213,7 @@ private[celeborn] class Master( TimeUnit.MILLISECONDS) private val slotsAssignPolicy = conf.masterSlotAssignPolicy - private var hadoopFs: FileSystem = _ + private var hadoopFs: util.Map[StorageInfo.Type, FileSystem] = _ masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () => statusSystem.registeredShuffle.size } @@ -319,15 +321,15 @@ private[celeborn] class Master( workerUnavailableInfoExpireTimeoutMs / 2, TimeUnit.MILLISECONDS) - if (hasHDFSStorage) { - checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleWithFixedDelay( + if (hasHDFSStorage || hasS3Storage) { + checkForS3RemnantDirsTimeOutTask = forwardMessageThread.scheduleWithFixedDelay( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - self.send(CheckForHDFSExpiredDirsTimeout) + self.send(CheckForDFSExpiredDirsTimeout) } }, - hdfsExpireDirsTimeoutMS, - hdfsExpireDirsTimeoutMS, + dfsExpireDirsTimeoutMS, + dfsExpireDirsTimeoutMS, TimeUnit.MILLISECONDS) } @@ -350,6 +352,9 @@ private[celeborn] class Master( if (checkForHDFSRemnantDirsTimeOutTask != null) { checkForHDFSRemnantDirsTimeOutTask.cancel(true) } + if (checkForS3RemnantDirsTimeOutTask != null) { + checkForS3RemnantDirsTimeOutTask.cancel(true) + } forwardMessageThread.shutdownNow() rackResolver.stop() if (authEnabled) { @@ -380,8 +385,8 @@ private[celeborn] class Master( executeWithLeaderChecker(null, timeoutWorkerUnavailableInfos()) case CheckForApplicationTimeOut => executeWithLeaderChecker(null, timeoutDeadApplications()) - case CheckForHDFSExpiredDirsTimeout => - executeWithLeaderChecker(null, checkAndCleanExpiredAppDirsOnHDFS()) + case CheckForDFSExpiredDirsTimeout => + executeWithLeaderChecker(null, checkAndCleanExpiredAppDirsOnDFS()) case pb: PbWorkerLost => val host = pb.getHost val rpcPort = pb.getRpcPort @@ -984,38 +989,44 @@ private[celeborn] class Master( workersAssignedToApp.remove(appId) statusSystem.handleAppLost(appId, requestId) logInfo(s"Removed application $appId") - if (hasHDFSStorage) { - checkAndCleanExpiredAppDirsOnHDFS(appId) + if (hasHDFSStorage || hasS3Storage) { + checkAndCleanExpiredAppDirsOnDFS(appId) } context.reply(ApplicationLostResponse(StatusCode.SUCCESS)) } }) } - private def checkAndCleanExpiredAppDirsOnHDFS(expiredDir: String = ""): Unit = { + private def checkAndCleanExpiredAppDirsOnDFS(expiredDir: String = ""): Unit = { if (hadoopFs == null) { try { hadoopFs = CelebornHadoopUtils.getHadoopFS(conf) } catch { case e: Exception => - logError("Celeborn initialize HDFS failed.", e) + logError("Celeborn initialize DFS failed.", e) throw e } } - val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir) - if (hadoopFs.exists(hdfsWorkPath)) { + if (hasHDFSStorage) processDir(conf.hdfsDir, expiredDir) + if (hasS3Storage) processDir(conf.s3Dir, expiredDir) + } + + private def processDir(dfsDir: String, expiredDir: String): Unit = { + val dfsWorkPath = new Path(dfsDir, conf.workerWorkingDir) + hadoopFs.asScala.map(_._2).filter(_.exists(dfsWorkPath)).foreach { fs => if (expiredDir.nonEmpty) { - val dirToDelete = new Path(hdfsWorkPath, expiredDir) + val dirToDelete = new Path(dfsWorkPath, expiredDir) // delete specific app dir on application lost - CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, dirToDelete, true) + CelebornHadoopUtils.deleteDFSPathOrLogError(fs, dirToDelete, true) } else { - val iter = hadoopFs.listStatusIterator(hdfsWorkPath) + val iter = fs.listStatusIterator(dfsWorkPath) while (iter.hasNext && isMasterActive == 1) { val fileStatus = iter.next() if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { - CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, fileStatus.getPath, true) + CelebornHadoopUtils.deleteDFSPathOrLogError(fs, fileStatus.getPath, true) } } + } } } diff --git a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml index e854f0a2cf..3db792a54f 100644 --- a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml +++ b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml @@ -541,6 +541,7 @@ components: - SSD - HDFS - OSS + - S3 mapIdBitMap: type: string description: The map id bitmap hint. diff --git a/pom.xml b/pom.xml index 85b136aae3..ee31504f75 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,6 @@ 3.3.6 -