Skip to content

Commit

Permalink
Merge branch 'refs/heads/main' into B914
Browse files Browse the repository at this point in the history
  • Loading branch information
FMX committed Apr 7, 2024
2 parents bc4fce8 + 186899f commit 53555a1
Show file tree
Hide file tree
Showing 126 changed files with 3,837 additions and 873 deletions.
3 changes: 3 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ assets/img/* export-ignore
*.scala text eol=lf
*.xml text eol=lf
*.py text eol=lf
common/src/test/resources/ssl/generate_certs.sh text
common/src/test/resources/ssl/* -text
worker/src/test/resources/ssl/* -text
10 changes: 0 additions & 10 deletions DISCLAIMER

This file was deleted.

8 changes: 8 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,16 @@ Apache License 2.0
Apache Spark
./client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
./client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
./common/src/main/java/org/apache/celeborn/common/network/protocol/EncryptedMessageWithHeader.java
./common/src/main/java/org/apache/celeborn/common/network/protocol/SslMessageEncoder.java
./common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java
./common/src/main/java/org/apache/celeborn/common/network/util/NettyLogger.java
./common/src/main/java/org/apache/celeborn/common/unsafe/Platform.java
./common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
./common/src/main/scala/org/apache/celeborn/common/util/SignalUtils.scala
./common/src/test/java/org/apache/celeborn/common/network/protocol/EncryptedMessageWithHeaderSuiteJ.java
./common/src/test/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManagerSuiteJ.java
./common/src/test/java/org/apache/celeborn/common/network/ssl/SslSampleConfigs.java
./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DB.java
./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java
./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDB.java
Expand All @@ -226,6 +233,7 @@ Apache Spark
./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBIterator.java
./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDB.java
./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
./worker/src/main/scala/org/apache/celeborn/service/deploy/worker/profiler/JVMProfiler.scala

Apache Kyuubi
./common/src/main/java/org/apache/celeborn/reflect/DynClasses.java
Expand Down
25 changes: 25 additions & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ io.netty:netty-transport-native-unix-common
io.netty:netty-transport-rxtx
io.netty:netty-transport-sctp
io.netty:netty-transport-udt
io.swagger.core.v3:swagger-annotations
io.swagger.core.v3:swagger-core
io.swagger.core.v3:swagger-integration
io.swagger.core.v3:swagger-jaxrs2
io.swagger.core.v3:swagger-models
org.apache.commons:commons-crypto
org.apache.commons:commons-lang3
org.apache.hadoop:hadoop-client-api
Expand All @@ -267,6 +272,13 @@ org.apache.ratis:ratis-server
org.apache.ratis:ratis-server-api
org.apache.ratis:ratis-shell
org.apache.ratis:ratis-thirdparty-misc
org.eclipse.jetty:jetty-http
org.eclipse.jetty:jetty-io
org.eclipse.jetty:jetty-security
org.eclipse.jetty:jetty-server
org.eclipse.jetty:jetty-servlet
org.eclipse.jetty:jetty-util-ajax
org.eclipse.jetty:jetty-util
org.javassist:javassist
org.reflections:reflections
org.roaringbitmap:RoaringBitmap
Expand All @@ -275,6 +287,7 @@ org.rocksdb:rocksdbjni
org.scala-lang:scala-library
org.scala-lang:scala-reflect
org.slf4j:jcl-over-slf4j
org.webjars:swagger-ui
org.xerial.snappy:snappy-java
org.yaml:snakeyaml

Expand Down Expand Up @@ -308,3 +321,15 @@ org.slf4j:slf4j-api
------------
See licenses/LICENSE-javassist.txt for detail.
org.javassist:javassist

Eclipse Public License (EPL) 2.0
--------------------------------
jakarta.annotation:jakarta.annotation-api
jakarta.servlet:jakarta.servlet-api
jakarta.ws.rs:jakarta.ws.rs-api
org.glassfish.hk2:hk2-api
org.glassfish.hk2:hk2-locator
org.glassfish.hk2:hk2-utils
org.glassfish.hk2.external:aopalliance-repackaged
org.glassfish.hk2.external:jakarta.inject
org.glassfish.hk2:osgi-resource-locator
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Apache Celeborn (Incubating)
Apache Celeborn
Copyright 2022-2024 The Apache Software Foundation.

This product includes software developed at
Expand Down
2 changes: 1 addition & 1 deletion NOTICE-binary
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Apache Celeborn (Incubating)
Apache Celeborn
Copyright 2022-2024 The Apache Software Foundation.

This product includes software developed at
Expand Down
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Apache Celeborn (Incubating)
# Apache Celeborn

[![Celeborn CI](https://github.com/apache/incubator-celeborn/actions/workflows/maven.yml/badge.svg)](https://github.com/apache/incubator-celeborn/actions/workflows/maven.yml)
[![Celeborn CI](https://github.com/apache/celeborn/actions/workflows/maven.yml/badge.svg)](https://github.com/apache/celeborn/actions/workflows/maven.yml)
Celeborn (/ˈkeləbɔ:n/) is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, high-efficient
management service for intermediate data including shuffle data, spilled data, result data, etc. Currently, Celeborn is focusing on shuffle data.
Expand Down Expand Up @@ -79,6 +79,15 @@ To compile the client for Spark 2.4 with Scala 2.12, please use the following co
./build/make-distribution.sh -DskipTests -Pspark-2.4 -Dscala.version=${scala.version} -Dscala.binary.version=2.12
```

To compile for Spark 3.5 with Java21, please use the following command
```shell
./build/make-distribution.sh -Pspark-3.5 -Pjdk-21
```
```shell
./build/make-distribution.sh --sbt-enabled -Pspark-3.5 -Pjdk-21
```


### Package Details
Build procedure will create a compressed package.

Expand Down Expand Up @@ -370,7 +379,7 @@ Contact us through the following mailing list.

### Report Issues or Submit Pull Request

If you meet any questions, feel free to file a 🔗[Jira Ticket](https://issues.apache.org/jira/projects/CELEBORN/issues) or connect us and fix it by submitting a 🔗[Pull Request](https://github.com/apache/incubator-celeborn/pulls).
If you meet any questions, feel free to file a 🔗[Jira Ticket](https://issues.apache.org/jira/projects/CELEBORN/issues) or connect us and fix it by submitting a 🔗[Pull Request](https://github.com/apache/celeborn/pulls).

| IM | Contact Info |
|:---------|:------------------------------------------------------------------------------------------------------------------------------------------|
Expand Down
3 changes: 2 additions & 1 deletion build/make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ if [ "$SBT_ENABLED" == "true" ]; then
if [ "$RELEASE" == "true" ]; then
sbt_build_client -Pspark-2.4
sbt_build_client -Pspark-3.4
sbt_build_client -Pspark-3.5
sbt_build_client -Pflink-1.14
sbt_build_client -Pflink-1.15
sbt_build_client -Pflink-1.17
Expand Down Expand Up @@ -338,6 +339,7 @@ else
build_service
build_spark_client -Pspark-2.4
build_spark_client -Pspark-3.4
build_spark_client -Pspark-3.5
build_flink_client -Pflink-1.14
build_flink_client -Pflink-1.15
build_flink_client -Pflink-1.17
Expand Down Expand Up @@ -396,7 +398,6 @@ cp "$PROJECT_DIR/docker/Dockerfile" "$DIST_DIR/docker"
cp -r "$PROJECT_DIR/charts" "$DIST_DIR"

# Copy license files
cp "$PROJECT_DIR/DISCLAIMER" "$DIST_DIR/DISCLAIMER"
if [[ -f $"$PROJECT_DIR/LICENSE-binary" ]]; then
cp "$PROJECT_DIR/LICENSE-binary" "$DIST_DIR/LICENSE"
cp -r "$PROJECT_DIR/licenses-binary" "$DIST_DIR/licenses"
Expand Down
4 changes: 2 additions & 2 deletions build/release/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ fi

RELEASE_TAG="v${RELEASE_VERSION}-rc${RELEASE_RC_NO}"

SVN_STAGING_REPO="https://dist.apache.org/repos/dist/dev/incubator/celeborn"
SVN_RELEASE_REPO="https://dist.apache.org/repos/dist/release/incubator/celeborn"
SVN_STAGING_REPO="https://dist.apache.org/repos/dist/dev/celeborn"
SVN_RELEASE_REPO="https://dist.apache.org/repos/dist/release/celeborn"

RELEASE_DIR="${PROJECT_DIR}/tmp"
SVN_STAGING_DIR="${PROJECT_DIR}/tmp/svn-dev"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
/** Number of max concurrent reading channels. */
protected final int numConcurrentReading;

/** Codec used for compression / decompression. */
protected static final String compressionCodec = "LZ4";

/** Network buffer size. */
protected final int networkBufferSize;

Expand All @@ -66,18 +63,8 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
public AbstractRemoteShuffleInputGateFactory(
CelebornConf conf, NetworkBufferPool networkBufferPool, int networkBufferSize) {
this.celebornConf = conf;
long configuredMemorySize = celebornConf.clientFlinkMemoryPerInputGate();
long minConfiguredMemorySize = celebornConf.clientFlinkMemoryPerInputGateMin();
if (configuredMemorySize < minConfiguredMemorySize) {
throw new IllegalArgumentException(
String.format(
"Insufficient network memory per input gate, please increase %s to at " + "least %s.",
CelebornConf.CLIENT_MEMORY_PER_INPUT_GATE().key(),
celebornConf.clientFlinkMemoryPerInputGateMin()));
}

this.numBuffersPerGate = Utils.checkedDownCast(configuredMemorySize / networkBufferSize);
this.supportFloatingBuffers = celebornConf.clientFlinkInputGateSupportFloatingBuffer();
this.numBuffersPerGate =
Utils.checkedDownCast(celebornConf.clientFlinkMemoryPerInputGate() / networkBufferSize);
if (numBuffersPerGate < MIN_BUFFERS_PER_GATE) {
throw new IllegalArgumentException(
String.format(
Expand All @@ -86,7 +73,7 @@ public AbstractRemoteShuffleInputGateFactory(
CelebornConf.CLIENT_MEMORY_PER_INPUT_GATE().key(),
networkBufferSize * MIN_BUFFERS_PER_GATE));
}

this.supportFloatingBuffers = celebornConf.clientFlinkInputGateSupportFloatingBuffer();
this.networkBufferSize = networkBufferSize;
this.numConcurrentReading = celebornConf.clientFlinkNumConcurrentReading();
this.networkBufferPool = networkBufferPool;
Expand All @@ -104,7 +91,7 @@ public IndexedInputGate create(
SupplierWithException<BufferPool, IOException> bufferPoolFactory =
createBufferPoolFactory(networkBufferPool, numBuffersPerGate, supportFloatingBuffers);
BufferDecompressor bufferDecompressor =
new BufferDecompressor(networkBufferSize, compressionCodec);
new BufferDecompressor(networkBufferSize, celebornConf.shuffleCompressionCodec().name());

return createInputGate(owningTaskName, gateIndex, igdd, bufferPoolFactory, bufferDecompressor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,9 @@ public AbstractRemoteShuffleResultPartitionFactory(
ResultPartitionManager partitionManager,
BufferPoolFactory bufferPoolFactory,
int networkBufferSize) {
long configuredMemorySize = celebornConf.clientFlinkMemoryPerResultPartition();
long minConfiguredMemorySize = celebornConf.clientFlinkMemoryPerResultPartitionMin();
if (configuredMemorySize < minConfiguredMemorySize) {
throw new IllegalArgumentException(
String.format(
"Insufficient network memory per result partition, please increase %s "
+ "to at least %s.",
CelebornConf.CLIENT_MEMORY_PER_RESULT_PARTITION().key(), minConfiguredMemorySize));
}

this.numBuffersPerPartition = Utils.checkedDownCast(configuredMemorySize / networkBufferSize);
this.supportFloatingBuffers = celebornConf.clientFlinkResultPartitionSupportFloatingBuffer();
this.numBuffersPerPartition =
Utils.checkedDownCast(
celebornConf.clientFlinkMemoryPerResultPartition() / networkBufferSize);
if (numBuffersPerPartition < MIN_BUFFERS_PER_PARTITION) {
throw new IllegalArgumentException(
String.format(
Expand All @@ -91,11 +82,10 @@ public AbstractRemoteShuffleResultPartitionFactory(
CelebornConf.CLIENT_MEMORY_PER_RESULT_PARTITION().key(),
networkBufferSize * MIN_BUFFERS_PER_PARTITION));
}

this.supportFloatingBuffers = celebornConf.clientFlinkResultPartitionSupportFloatingBuffer();
this.partitionManager = partitionManager;
this.bufferPoolFactory = bufferPoolFactory;
this.networkBufferSize = networkBufferSize;

this.compressionCodec = celebornConf.shuffleCompressionCodec().name();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.celeborn.plugin.flink.buffer;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

Expand Down Expand Up @@ -64,4 +65,9 @@ public ManagedBuffer release() {
public Object convertToNetty() {
return buf.duplicate().retain();
}

@Override
public Object convertToNettyForSsl() throws IOException {
return buf.duplicate().retain();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,13 @@ public class PluginSideConfSuiteJ {
public void testCoalesce() {
Configuration flinkConf = new Configuration();
CelebornConf celebornConf = FlinkUtils.toCelebornConf(flinkConf);
Assert.assertEquals(8 * 1024 * 1024, celebornConf.clientFlinkMemoryPerResultPartitionMin());
Assert.assertEquals(8 * 1024 * 1024, celebornConf.clientFlinkMemoryPerInputGateMin());
Assert.assertEquals(Integer.MAX_VALUE, celebornConf.clientFlinkNumConcurrentReading());
Assert.assertEquals(64 * 1024 * 1024, celebornConf.clientFlinkMemoryPerResultPartition());
Assert.assertEquals(32 * 1024 * 1024, celebornConf.clientFlinkMemoryPerInputGate());

Assert.assertTrue(celebornConf.clientFlinkDataCompressionEnabled());
Assert.assertEquals("LZ4", celebornConf.shuffleCompressionCodec().name());

flinkConf.setString("remote-shuffle.job.min.memory-per-partition", "16m");
flinkConf.setString("remote-shuffle.job.min.memory-per-gate", "17m");
flinkConf.setString("remote-shuffle.job.concurrent-readings-per-gate", "12323");
flinkConf.setString("remote-shuffle.job.memory-per-partition", "1888m");
flinkConf.setString("remote-shuffle.job.memory-per-gate", "176m");
Expand All @@ -48,8 +44,6 @@ public void testCoalesce() {
flinkConf.setString("remote-shuffle.job.compression.codec", "ZSTD");

celebornConf = FlinkUtils.toCelebornConf(flinkConf);
Assert.assertEquals(16 * 1024 * 1024, celebornConf.clientFlinkMemoryPerResultPartitionMin());
Assert.assertEquals(17 * 1024 * 1024, celebornConf.clientFlinkMemoryPerInputGateMin());
Assert.assertEquals(12323, celebornConf.clientFlinkNumConcurrentReading());
Assert.assertEquals(1888 * 1024 * 1024, celebornConf.clientFlinkMemoryPerResultPartition());
Assert.assertEquals(176 * 1024 * 1024, celebornConf.clientFlinkMemoryPerInputGate());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Apache Celeborn (Incubating)
Apache Celeborn
Copyright 2022-2024 The Apache Software Foundation.

This product includes software developed at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Apache Celeborn (Incubating)
Apache Celeborn
Copyright 2022-2024 The Apache Software Foundation.

This product includes software developed at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Apache Celeborn (Incubating)
Apache Celeborn
Copyright 2022-2024 The Apache Software Foundation.

This product includes software developed at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Apache Celeborn (Incubating)
Apache Celeborn
Copyright 2022-2024 The Apache Software Foundation.

This product includes software developed at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Apache Celeborn (Incubating)
Apache Celeborn
Copyright 2022-2024 The Apache Software Foundation.

This product includes software developed at
Expand Down
2 changes: 1 addition & 1 deletion client-mr/mr-shaded/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Apache Celeborn (Incubating)
Apache Celeborn
Copyright 2022-2024 The Apache Software Foundation.

This product includes software developed at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Apache Celeborn (Incubating)
Apache Celeborn
Copyright 2022-2024 The Apache Software Foundation.

This product includes software developed at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Apache Celeborn (Incubating)
Apache Celeborn
Copyright 2022-2024 The Apache Software Foundation.

This product includes software developed at
Expand Down
11 changes: 11 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@
<artifactId>log4j-1.2-api</artifactId>
<scope>test</scope>
</dependency>
<!-- for SSL support -->
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ private boolean shouldRetry(@Nullable RpcEndpointRef oldRef, Throwable e) {
// 'CelebornException: Exception thrown in awaitResult'
if (e.getCause() instanceof MasterNotLeaderException) {
MasterNotLeaderException exception = (MasterNotLeaderException) e.getCause();
String leaderAddr = exception.getSuggestedLeaderAddress();
String leaderAddr =
isWorker
? exception.getSuggestedInternalLeaderAddress()
: exception.getSuggestedLeaderAddress();
if (!leaderAddr.equals(MasterNotLeaderException.LEADER_NOT_PRESENTED)) {
setRpcEndpointRef(leaderAddr);
} else {
Expand Down
Loading

0 comments on commit 53555a1

Please sign in to comment.