An extension to the Apache Spark project that provides support for highly-scalable RDD joins using a Sort-Merge join algorithm.
In the standard distribution, Spark includes RDD join operators that must collect all values across all rows on both sides of the join for any given key into an in-memory buffer before it can output the joined values for that key. Because of this, you might experience scalability issues when joining large datasets or datasets that exhibit a low-cardinality or skewed distribution of values.
Well... yes and no. Outlined below are some of the possible resolutions that one might try to work around the issue:
-
Increase the partition count. Increasing the number of partitions won't help, since the source of the problem is related to processing of a single key whose values will end up in the same partition, regardless of partition count.
-
Increasing the executor memory. This can solve the problem in some cases where we are able to allocate enough memory for every executor to ensure we can handle the largest set of values for any given key. There are several drawbacks to this approach, though:
-
Allocating the required amount of memory might not be possible. If the maximum physical memory available to any given executor exceeds the amount we need, then we're out of luck.
-
It requires us to know, ahead of time, the maximum amount of memory any given executor will need. This usually involves a trial-and-error effort which can be time-consuming, especially when production-sized data is required to replicate the issue. Also if the data changes shape over time, the job may suddenly begin to fail, requiring us to tweak the job's settings rather than allowing it degrade gracefully and still succeed.
-
All executors will need to allocate enough memory for the worst-case, which leads to over-allocation. When dealing with skewed data, only a few keys may require a large amount of memory to complete the join. These keys could end up being handled by any executor, so we need to ensure that all executors are configured with enough memory to satisfy the worst-case, even if the majority of executors might never encounter a key that requires a significant amount of memory.
-
Sometimes we'd like to use fewer cluster resources at the expense of a longer job run time. In some cases, we don't need a job to run as fast as possible and we'd like to trade cluster utilization for execution time. The merge-join operators will try to buffer as many values as possible in-memory before spilling to disk for better performance, but don't actually require that any values be buffered at all. If a job's executors are configured with less memory, it may cause more data to spill to disk which will cause the job to run longer, but sometimes this is a decent trade-off as long as the job still completes within an acceptable timeframe while making more memory available for other jobs.
-
-
Implement the solution differently to avoid the join. Sure, sometimes there may be another way to solve the problem, but you may be sacrificing developer time, performance, and/or readability to do so. There could also be cases where there isn't a viable work-around and are forced to abandon Spark and resort to using another framework.
This library provides complementary merge-join operators for each of the built-in join operators, but will degrade gracefully rather than fail the job if there's not enough executor memory configured to contain all of the values for any given key. It does this by using spillable collection for the right side of the join, and will buffer values in-memory until the executor's memory threshold is exceeded and then it will start spilling to disk.
For better performance, the larger of the two RDD's being joined should be on the LEFT side so that the number of spills occurring with the right side is minimized.
This library has different versions based on the Spark version used due to changes in the internals of Spark between these versions. It is also cross-published for Scala 2.10.
Spark Version* | Merge-Join Version |
---|---|
2.x | 2.0.1 |
1.6.x | 1.6.1 |
1.5.x | 1.5.1 |
* Currently tested up to Spark 2.2.0
Using SBT:
libraryDependencies += "com.hindog.spark" %% "spark-mergejoin" % "2.0.1"
Using Maven:
<dependency>
<groupId>com.hindog.spark</groupId>
<artifactId>spark-mergejoin_2.11</artifactId>
<version>2.0.1</version>
</dependency>
Using SBT:
libraryDependencies += "com.hindog.spark" %% "spark-mergejoin" % "1.6.1"
Using Maven:
<dependency>
<groupId>com.hindog.spark</groupId>
<artifactId>spark-mergejoin_2.11</artifactId>
<version>1.6.1</version>
</dependency>
This library can also be added to Spark jobs launched through spark-shell
or spark-submit
by using the --packages
command line option.
For example, to include it when starting the spark shell:
$ bin/spark-shell --packages com.hindog.spark:spark-mergejoin_2.11:2.0.0
Unlike using --jars
, using --packages
ensures that this library and its dependencies will be added to the classpath. The --packages
argument can also be used with bin/spark-submit
.
import com.hindog.spark.rdd._
The table below lists each of the standard join operators available for type RDD[(K, V)]
and their corresponding merge-join operator provided by this library. One additional requirement for the merge-join operators is that an implicit Ordering[K]
must be in scope.
Standard Join | Merge-Join Equivalent |
---|---|
join |
mergeJoin |
leftOuterJoin |
leftOuterMergeJoin |
rightOuterJoin |
rightOuterMergeJoin |
fullOuterJoin |
fullOuterMergeJoin |
For further documentation, refer to the API docs.
Here is an example demonstrating some joins using sample data:
val sc: SparkContext = { /\* ... create SparkContext ... \*/ }
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val innerJoined = rdd1.mergeJoin(rdd2)
innerJoined.collect.foreach(println)
// (1,(1,x))
// (1,(2,x))
// (2,(1,y))
// (2,(1,z))
val leftJoined = rdd1.leftOuterJoin(rdd2)
leftJoined.collect.foreach(println)
// (1,(1,Some(x)))
// (1,(2,Some(x)))
// (2,(1,Some(y)))
// (2,(1,Some(z)))
// (3,(1,None))
val rightJoined = rdd1.rightOuterJoin(rdd2)
rightJoined.collect.foreach(println)
// (4,(None,w))
// (1,(Some(1),x))
// (1,(Some(2),x))
// (2,(Some(1),y))
// (2,(Some(1),z))
val fullJoined = rdd1.fullOuterMergeJoin(rdd2)
fullJoined.collect.foreach(println)
// (4,(None,Some(w)))
// (1,(Some(1),Some(x)))
// (1,(Some(2),Some(x)))
// (2,(Some(1),Some(y)))
// (2,(Some(1),Some(z)))
// (3,(Some(1),None))
Key | Type | Default | Description |
---|---|---|---|
spark.mergejoin.includeSpillMetrics |
Boolean |
true |
Whether to include bytes spilled to memory/disk while performing the join to each task's metrics in addition to the standard task metrics. Set this to false if you don't want to add these metrics to the standard task metrics. |
The operators return an instance MergeJoinRDD which delegates internally to a specific implementation of MergeJoin.Joiner for each join type.
There are a few optimizations in place to try to minimize the amount of work that's being done during the join:
-
In cases were we have a key on one side but not the other, we skip creation of the spillable collection and output the tuples directly according to the MergeJoin.Joiner's
leftOuter
/rightOuter
method. -
When we do have a key present on both sides, we need to accumulate the values for the right side (which is why it should ideally be the "smaller" of the two sides). To do this, we create a spillable collection for the right side that will buffer as many values as possible in memory before spilling to disk so we only pay the penalty for spilling to disk for keys where it is necessary.
-
Since the methods for the MergeJoin.Joiner contract (eg:
inner
,leftOuter
,rightOuter
) return anIterator
of values for each unique key, we don't need to perform join logic on each value iteration-- instead we perform join logic for each unique key iteration. -
In cases where there are no values to emit for a particular key, the MergeJoin.Joiner can return an empty
Iterator
for that key which causes an immediate advance to the next key instead of emitting+filtering the output rows. (ie: consider a naive approach that instead might have theJoiner
emitOption[(K, (V, W))]
for every value and rely on the caller to filter out theNone
's)