diff --git a/src/main/1.3/scala/com/holdenkarau/spark/testing/StreamingSuiteBase.scala b/src/main/1.3/scala/com/holdenkarau/spark/testing/StreamingSuiteBase.scala index c36732ec..4141ffd5 100644 --- a/src/main/1.3/scala/com/holdenkarau/spark/testing/StreamingSuiteBase.scala +++ b/src/main/1.3/scala/com/holdenkarau/spark/testing/StreamingSuiteBase.scala @@ -106,6 +106,24 @@ trait StreamingSuiteBase extends FunSuite with BeforeAndAfterAll with Logging assert(output(i) === expected(i)) } + // Wrappers with ordered = false + def testOperation[U: ClassTag, V: ClassTag]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V], + expectedOutput: Seq[Seq[V]] + ) (implicit equality: Equality[V]): Unit = { + testOperation(input, operation, expectedOutput, false) + } + + def testOperation[U: ClassTag, V: ClassTag, W: ClassTag]( + input1: Seq[Seq[U]], + input2: Seq[Seq[V]], + operation: (DStream[U], DStream[V]) => DStream[W], + expectedOutput: Seq[Seq[W]] + ) (implicit equality: Equality[W]): Unit = { + testOperation(input1, input2, operation, expectedOutput, false) + } + /** * Test unary DStream operation with a list of inputs, with number of * batches to run same as the number of input values. @@ -122,7 +140,7 @@ trait StreamingSuiteBase extends FunSuite with BeforeAndAfterAll with Logging input: Seq[Seq[U]], operation: DStream[U] => DStream[V], expectedOutput: Seq[Seq[V]], - ordered: Boolean = false + ordered: Boolean ) (implicit equality: Equality[V]): Unit = { val numBatches = input.size @@ -150,7 +168,7 @@ trait StreamingSuiteBase extends FunSuite with BeforeAndAfterAll with Logging input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W], expectedOutput: Seq[Seq[W]], - ordered: Boolean = false + ordered: Boolean ) (implicit equality: Equality[W]): Unit = { assert(input1.length === input2.length, "Length of the input lists are not equal")