Skip to content

Commit f00fae8

Browse files
committed
Merge branch 'main' into branch-0.1
# Conflicts: # client-spark/shuffle-manager-2/pom.xml # client-spark/shuffle-manager-3/pom.xml # client-spark/shuffle-manager-common/pom.xml # client/pom.xml # common/pom.xml # pom.xml # server-common/pom.xml # server-master/pom.xml # server-worker/pom.xml
2 parents 71ce1ae + ad6458b commit f00fae8

File tree

58 files changed

+193
-2522
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+193
-2522
lines changed

CONFIGURATION_GUIDE.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ off-heap-memory = numDirs * queueCapacity * bufferSize + network memory
2020

2121
For example, if an RSS worker has 10 storage directories, each directory has a queue whose capacity
2222
is 4096, and the buffer size is set to 256 kilobytes. The necessary off-heap memory is 10 gigabytes.
23-
NetWorker memory will be consumed when netty reads from a TPC channel, there will need some extra
24-
memory. In conclusion, RSS worker off-heap memory should be set to `(numDirs * queueCapacity * bufferSize * 1.2)`.
23+
Network memory will be consumed when netty reads from a TPC channel, there will need some extra
24+
memory. Empirically, RSS worker off-heap memory should be set to `(numDirs * queueCapacity * bufferSize * 1.2)`.
2525

2626
### Client-Side Configurations
2727

@@ -39,20 +39,21 @@ memory. In conclusion, RSS worker off-heap memory should be set to `(numDirs * q
3939
| spark.rss.fetch.chunk.maxReqsInFlight | 3 | Amount of in-flight chunk fetch request. |
4040
| spark.rss.data.io.threads | 8 | Amount of thread count for task to push data. |
4141
| spark.rss.push.data.replicate | true | When true the RSS worker will replicate shuffle data to another RSS worker to ensure shuffle data won't be lost after the node failure. |
42+
| spark.rss.application.heartbeatInterval | 10s | Application heartbeat interval. |
43+
| spark.rss.stage.end.timeout | 240s | Time out for StageEnd. |
44+
| spark.rss.shuffle.writer.mode | hash | RSS support two different shuffle writers. Hash-based shuffle writer works fine when shuffle partition count is normal. Sort-based shuffle writer works fine when memory pressure is high or shuffle partition count it huge. |
4245

4346
### RSS Master Configurations
4447

4548
| Item | Default | Description |
4649
| :---: | :---: | :--: |
4750
| rss.worker.timeout | 120s | |
4851
| rss.application.timeout | 120s | |
49-
| rss.stage.end.timeout | 120s | |
50-
| rss.shuffle.writer.mode | hash | RSS support two different shuffle writers. Hash-based shuffle writer works fine when shuffle partition count is normal. Sort-based shuffle writer works fine when memory pressure is high or shuffle partition count it huge. |
5152
| rss.rpc.io.clientThreads | min{64, availableCores} | |
5253
| rss.rpc.io.serverThreads | min{64, availableCores} | |
5354
| rss.master.port.maxretry | 1 | When RSS master port is occupied,we will retry for maxretry times. |
5455
| rss.rpc.io.numConnectionsPerPeer | 1 | Connections between hosts are reused in order to reduce connection. |
55-
| rss.ha.enabled | true | When true, RSS will activate raft implementation and sync shared data on master clusters. |
56+
| rss.ha.enabled | false | When true, RSS will activate raft implementation and sync shared data on master clusters. |
5657
| rss.ha.master.hosts | | Master hosts address list. |
5758
| rss.ha.service.id | | When this config is empty, RSS master will refuse to startup. |
5859
| rss.ha.nodes.{serviceId} | | Nodes list that deploy RSS master. ServiceId is `rss.ha.service.id` |
@@ -66,7 +67,7 @@ memory. In conclusion, RSS worker off-heap memory should be set to `(numDirs * q
6667

6768
| Item | Default | Description |
6869
| :---: | :---: | :--: |
69-
| rss.worker.base.dirs | | Directory list to store shuffle data. For the sake of performance, there should be no more than 2 directories on the same disk partition. |
70+
| rss.worker.base.dirs | | Directory list to store shuffle data. For the sake of performance, there should be one directory per HDD and eight per SDD. |
7071
| rss.worker.flush.buffer.size | 256K | |
7172
| rss.worker.flush.queue.capacity | 512 | Size of buffer queue attached to each storage directory. Each flush buffer queue consumes `rss.worker.flush.buffer.size` * `rss.worker.flush.queue.capacity`(256K * 512 = 128M) off-heap memory. This config can be used to estimate RSS worker's off-heap memory demands. |
7273
| rss.worker.fetch.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128 M and the data will need 16 fetch chunk requests to fetch. |
@@ -96,7 +97,7 @@ memory. In conclusion, RSS worker off-heap memory should be set to `(numDirs * q
9697

9798
Assume we have a cluster described as below:
9899
5 RSS Workers with 20 GB off-heap memory and 10 disks.
99-
As we need to reserver 20% off-heap memory for netty, so we could assume 16 GB off-heap memory can be used for flush buffers.
100+
As we need to reserve 20% off-heap memory for netty, so we could assume 16 GB off-heap memory can be used for flush buffers.
100101

101102
If `spark.rss.push.data.buffer.size` is 64 KB, we can have in-flight requests up to 1310720.
102103
If you have 8192 mapper tasks , you could set `spark.rss.push.data.maxReqsInFlight=160` to gain performance improvements.
@@ -173,7 +174,7 @@ So we should set `rss.worker.flush.queue.capacity=6553` and each RSS worker has
173174
| `rss.worker.prometheus.metric.port` | 9096 | int | |
174175
| `rss.merge.push.data.threshold` | 1 MiB | String | |
175176
| `rss.driver.metaService.port` | 0 | int | |
176-
| `rss.worker.closeIdleConnections` | true | bool | |
177+
| `rss.worker.closeIdleConnections` | false | bool | |
177178
| `rss.ha.enabled` | false | bool | |
178179
| `rss.ha.master.hosts` | `rss.master.host` 的值 | String | |
179180
| `rss.ha.service.id` | | String | |

METRICS.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ scrape_configs:
5757
## Implementation
5858
5959
RSS master metric : `com/aliyun/emr/rss/service/deploy/master/MasterSource.scala`
60-
RSS worker metric : `com/aliyun/emr/rss/service/deploy/master/MasterSource.scala`
60+
RSS worker metric : `com/aliyun/emr/rss/service/deploy/worker/WorkerSource.scala`
6161
and `com.aliyun.emr.rss.common.metrics.source.NetWorkSource`
6262

6363
## Grafana Dashboard
6464

65-
We provide a grafana dashboard for RSS [Grafana-Dashboard](assets/grafana/rss-dashboard.json). The dashboard was generated by grafana which version is 8.5.0.
65+
We provide a grafana dashboard for RSS [Grafana-Dashboard](assets/grafana/rss-dashboard.json). The dashboard was generated by grafana of version 8.5.0.
6666
Here are some snapshots:
6767
![d1](assets/img/dashboard1.png)
6868
![d2](assets/img/dashboard2.png)

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ EXAMPLE: single master cluster
9595
rss.master.address master-host:port
9696
rss.metrics.system.enabled true
9797
rss.worker.flush.buffer.size 256k
98-
rss.worker.flush.queue.capacity 512
98+
rss.worker.flush.queue.capacity 4096
9999
rss.worker.base.dirs /mnt/disk1/,/mnt/disk2
100100
# If your hosts have disk raid or use lvm, set rss.device.monitor.enabled to false
101101
rss.device.monitor.enabled false
@@ -217,3 +217,13 @@ RSS have various metrics. [METRICS](METRICS.md)
217217
## Contribution
218218
This is an active open-source project. We are always open to developers who want to use the system or contribute to it.
219219
See more detail in [Contributing](CONTRIBUTING.md).
220+
221+
## NOTICE
222+
If you need to fully restart an RSS cluster in HA mode, you must clean ratis meta storage first because ratis meta will store expired states of the last running cluster.
223+
224+
Here are some instructions:
225+
1. Stop all workers.
226+
2. Stop all masters.
227+
3. Clean all master`s ratis meta storage directory(rss.ha.storage.dir).
228+
4. Start all masters.
229+
5. Start all workers.

client-spark/shuffle-manager-2/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>com.aliyun.emr</groupId>
2323
<artifactId>remote-shuffle-service</artifactId>
24-
<version>0.1.0</version>
24+
<version>0.1.1</version>
2525
<relativePath>../../pom.xml</relativePath>
2626
</parent>
2727

client-spark/shuffle-manager-3/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>com.aliyun.emr</groupId>
2323
<artifactId>remote-shuffle-service</artifactId>
24-
<version>0.1.0</version>
24+
<version>0.1.1</version>
2525
<relativePath>../../pom.xml</relativePath>
2626
</parent>
2727

client-spark/shuffle-manager-common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>com.aliyun.emr</groupId>
2323
<artifactId>remote-shuffle-service</artifactId>
24-
<version>0.1.0</version>
24+
<version>0.1.1</version>
2525
<relativePath>../../pom.xml</relativePath>
2626
</parent>
2727

client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
<parent>
2424
<groupId>com.aliyun.emr</groupId>
2525
<artifactId>remote-shuffle-service</artifactId>
26-
<version>0.1.0</version>
26+
<version>0.1.1</version>
2727
<relativePath>../pom.xml</relativePath>
2828
</parent>
2929

client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
8484
private var getBlacklist: ScheduledFuture[_] = _
8585

8686
// Use independent app heartbeat threads to avoid being blocked by other operations.
87+
private val heartbeatIntervalMs = RssConf.applicationHeatbeatIntervalMs(conf)
8788
private val heartbeatThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("app-heartbeat")
8889
private var appHeartbeat: ScheduledFuture[_] = _
8990
private val responseCheckerThread = ThreadUtils.
@@ -123,7 +124,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
123124
logError("Error while send heartbeat", t)
124125
}
125126
}
126-
}, 0, 30, TimeUnit.SECONDS)
127+
}, 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS)
127128
}
128129

129130
override def onStart(): Unit = {
@@ -173,10 +174,10 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
173174
case msg: GetBlacklist =>
174175
handleGetBlacklist(msg)
175176
case StageEnd(applicationId, shuffleId) =>
176-
logInfo(s"Received StageEnd request, ${Utils.makeShuffleKey(applicationId, shuffleId)}.")
177+
logDebug(s"Received StageEnd request, ${Utils.makeShuffleKey(applicationId, shuffleId)}.")
177178
handleStageEnd(null, applicationId, shuffleId)
178179
case UnregisterShuffle(applicationId, shuffleId, _) =>
179-
logInfo(s"Received UnregisterShuffle request," +
180+
logDebug(s"Received UnregisterShuffle request," +
180181
s"${Utils.makeShuffleKey(applicationId, shuffleId)}.")
181182
handleUnregisterShuffle(null, applicationId, shuffleId)
182183
}
@@ -189,19 +190,19 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
189190
numPartitions)
190191

191192
case Revive(applicationId, shuffleId, mapId, attemptId, reduceId, epoch, oldPartition, cause) =>
192-
logDebug(s"Received Revive request, " +
193+
logTrace(s"Received Revive request, " +
193194
s"$applicationId, $shuffleId, $mapId, $attemptId, ,$reduceId," +
194195
s" $epoch, $oldPartition, $cause.")
195196
handleRevive(context, applicationId, shuffleId, mapId, attemptId,
196197
reduceId, epoch, oldPartition, cause)
197198

198199
case PartitionSplit(applicationId, shuffleId, reduceId, epoch, oldPartition) =>
199-
logDebug(s"Received split request, " +
200+
logTrace(s"Received split request, " +
200201
s"$applicationId, $shuffleId, $reduceId, $epoch, $oldPartition")
201202
handlePartitionSplitRequest(context, applicationId, shuffleId, reduceId, epoch, oldPartition)
202203

203204
case MapperEnd(applicationId, shuffleId, mapId, attemptId, numMappers) =>
204-
logDebug(s"Received MapperEnd request, " +
205+
logTrace(s"Received MapperEnd request, " +
205206
s"${Utils.makeMapKey(applicationId, shuffleId, mapId, attemptId)}.")
206207
handleMapperEnd(context, applicationId, shuffleId, mapId, attemptId, numMappers)
207208

@@ -211,7 +212,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
211212
handleGetReducerFileGroup(context, shuffleId)
212213

213214
case StageEnd(applicationId, shuffleId) =>
214-
logInfo(s"Received StageEnd request, ${Utils.makeShuffleKey(applicationId, shuffleId)}.")
215+
logDebug(s"Received StageEnd request, ${Utils.makeShuffleKey(applicationId, shuffleId)}.")
215216
handleStageEnd(context, applicationId, shuffleId)
216217
}
217218

@@ -229,7 +230,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
229230
// If do, just register and return
230231
registerShuffleRequest.synchronized {
231232
if (registerShuffleRequest.containsKey(shuffleId)) {
232-
logInfo("[handleRegisterShuffle] request for same shuffleKey exists, just register")
233+
logDebug("[handleRegisterShuffle] request for same shuffleKey exists, just register")
233234
registerShuffleRequest.get(shuffleId).add(context)
234235
return
235236
} else {
@@ -242,7 +243,6 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
242243
.filter(_.getEpoch == 0)
243244
.toList
244245
.asJava
245-
logDebug(s"Shuffle $shuffleId already registered, just return.")
246246
if (initialLocs.size != numPartitions) {
247247
logWarning(s"Shuffle $shuffleId location size ${initialLocs.size} not equal to " +
248248
s"numPartitions: $numPartitions!")
@@ -400,15 +400,15 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
400400
shuffleReviving.synchronized {
401401
if (shuffleReviving.containsKey(reduceId)) {
402402
shuffleReviving.get(reduceId).add(context)
403-
logInfo(s"For $shuffleId, same partition $reduceId-$oldEpoch is reviving," +
403+
logTrace(s"For $shuffleId, same partition $reduceId-$oldEpoch is reviving," +
404404
s"register context.")
405405
return
406406
} else {
407407
// check if new slot for the partition has allocated
408408
val latestLoc = getLatestPartition(shuffleId, reduceId, oldEpoch)
409409
if (latestLoc != null) {
410410
context.reply(ChangeLocationResponse(StatusCode.Success, latestLoc))
411-
logInfo(s"New partition found, old partition $reduceId-$oldEpoch return it." +
411+
logDebug(s"New partition found, old partition $reduceId-$oldEpoch return it." +
412412
s" shuffleId: $shuffleId $latestLoc")
413413
return
414414
}
@@ -467,7 +467,6 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
467467
slaves.get(0).getPeer
468468
}
469469

470-
logDebug(s"[Update partition] success for $shuffleId $location.")
471470
contexts.synchronized {
472471
contexts.remove(reduceId)
473472
}.asScala.foreach(_.reply(ChangeLocationResponse(StatusCode.Success, location)))
@@ -498,14 +497,11 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
498497
shuffleSplitting.synchronized {
499498
if (shuffleSplitting.containsKey(reduceId)) {
500499
shuffleSplitting.get(reduceId).add(context)
501-
logDebug(s"For $shuffleId, same $reduceId-$oldEpoch is splitting, register context")
502500
return
503501
} else {
504502
val latestLoc = getLatestPartition(shuffleId, reduceId, oldEpoch)
505503
if (latestLoc != null) {
506504
context.reply(ChangeLocationResponse(StatusCode.Success, latestLoc))
507-
logDebug(s"Split request found new partition, old partition $reduceId-$oldEpoch" +
508-
s" return it. shuffleId: $shuffleId $latestLoc")
509505
return
510506
}
511507
val set = new util.HashSet[RpcCallContext]()
@@ -514,7 +510,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
514510
}
515511
}
516512

517-
logDebug(s"Relocate partition for shuffle split ${Utils.makeShuffleKey(applicationId,
513+
logDebug(s"Relocate partition for shuffle split ${Utils.makeShuffleKey(applicationId,
518514
shuffleId)}, oldPartition: $oldPartition")
519515

520516
handleChangePartitionLocation(shuffleSplitting, applicationId, shuffleId, reduceId,
@@ -534,7 +530,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
534530
var attempts = shuffleMapperAttempts.get(shuffleId)
535531
// it would happen when task with no shuffle data called MapperEnd first
536532
if (attempts == null) {
537-
logInfo(s"[handleMapperEnd] $shuffleId not registered, create one.")
533+
logDebug(s"[handleMapperEnd] $shuffleId not registered, create one.")
538534
attempts = new Array[Int](numMappers)
539535
0 until numMappers foreach (ind => attempts(ind) = -1)
540536
shuffleMapperAttempts.put(shuffleId, attempts)
@@ -579,7 +575,6 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
579575
}
580576
timeout = timeout - delta
581577
}
582-
logDebug(s"Start getting reduce file group, $shuffleId.")
583578

584579
if (dataLostShuffleSet.contains(shuffleId)) {
585580
context.reply(GetReducerFileGroupResponse(StatusCode.Failed, null, null))
@@ -950,7 +945,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
950945
// destroy success buffers
951946
val destroyAfterRetry = retrySlots.asScala.filterKeys(!failedAfterRetry.contains(_)).toMap
952947
destroyBuffersWithRetry(applicationId, shuffleId,
953-
destroyAfterRetry.asInstanceOf[WorkerResource])
948+
new WorkerResource(destroyAfterRetry.asJava))
954949
}
955950
}
956951

@@ -1041,7 +1036,6 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
10411036
}
10421037

10431038
private def removeExpiredShuffle(): Unit = {
1044-
logInfo("Check for expired shuffle.")
10451039
val currentTime = System.currentTimeMillis()
10461040
val keys = unregisterShuffleTime.keys().asScala.toList
10471041
keys.foreach { key =>
@@ -1184,7 +1178,6 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
11841178
}
11851179

11861180
def isClusterOverload(numPartitions: Int = 0): Boolean = {
1187-
logInfo(s"Ask Sync Cluster Load Status")
11881181
try {
11891182
rssHARetryClient.askSync[GetClusterLoadStatusResponse](GetClusterLoadStatus(numPartitions),
11901183
classOf[GetClusterLoadStatusResponse]).isOverload

common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>com.aliyun.emr</groupId>
2424
<artifactId>remote-shuffle-service</artifactId>
25-
<version>0.1.0</version>
25+
<version>0.1.1</version>
2626
<relativePath>../pom.xml</relativePath>
2727
</parent>
2828

common/src/main/java/com/aliyun/emr/rss/common/network/client/StreamCallback.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

common/src/main/java/com/aliyun/emr/rss/common/network/client/StreamCallbackWithID.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

0 commit comments

Comments
 (0)