From c44f08df56ce62ae6881bcd04185c3d631e33d7f Mon Sep 17 00:00:00 2001 From: Yongkyun Lee Date: Mon, 17 Nov 2025 13:37:51 -0800 Subject: [PATCH 1/4] Fix genParallelRDD and add UTs --- pom.xml | 47 +++++++ .../lakeloader/ChangeDataGenerator.scala | 10 +- .../onehouse/lakeloader/UnitTestSuite.scala | 132 ++++++++++++++++++ 3 files changed, 181 insertions(+), 8 deletions(-) create mode 100644 src/test/scala/ai/onehouse/lakeloader/UnitTestSuite.scala diff --git a/pom.xml b/pom.xml index 4463f7d..17c5142 100644 --- a/pom.xml +++ b/pom.xml @@ -134,6 +134,35 @@ + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + + **/*Test.class + **/*Suite.class + + + + + org.scalatest + scalatest-maven-plugin + 2.2.0 + + ${project.build.directory}/surefire-reports + . + TestSuite.txt + + + + test + + test + + + + @@ -205,5 +234,23 @@ jcommander 1.72 + + org.scalatest + scalatest_2.12 + 3.2.15 + test + + + com.fasterxml.jackson.module + jackson-module-scala_2.12 + 2.15.2 + test + + + com.fasterxml.jackson.core + jackson-databind + 2.15.2 + test + diff --git a/src/main/scala/ai/onehouse/lakeloader/ChangeDataGenerator.scala b/src/main/scala/ai/onehouse/lakeloader/ChangeDataGenerator.scala index f4a9341..b951551 100644 --- a/src/main/scala/ai/onehouse/lakeloader/ChangeDataGenerator.scala +++ b/src/main/scala/ai/onehouse/lakeloader/ChangeDataGenerator.scala @@ -451,17 +451,11 @@ object ChangeDataGenerator { } } - private def genParallelRDD( + private[lakeloader] def genParallelRDD( spark: SparkSession, targetParallelism: Int, start: Long, end: Long): RDD[Long] = { - val partitionSize = (end - start) / targetParallelism - spark.sparkContext - .parallelize(0 to targetParallelism, targetParallelism) - .mapPartitions { it => - val partitionStart = it.next() * partitionSize - (partitionStart to partitionStart + partitionSize).iterator - } + spark.sparkContext.range(start, end, numSlices = targetParallelism) } } diff --git a/src/test/scala/ai/onehouse/lakeloader/UnitTestSuite.scala b/src/test/scala/ai/onehouse/lakeloader/UnitTestSuite.scala new file mode 100644 index 0000000..ea4b017 --- /dev/null +++ b/src/test/scala/ai/onehouse/lakeloader/UnitTestSuite.scala @@ -0,0 +1,132 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.onehouse.lakeloader + +import org.apache.spark.sql.SparkSession +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +class UnitTestSuite extends AnyFunSuite with Matchers with BeforeAndAfterAll { + + var spark: SparkSession = _ + + override def beforeAll(): Unit = { + spark = SparkSession + .builder() + .appName("UnitTestSuite") + .master("local[*]") + .config("spark.sql.shuffle.partitions", "4") + .config("spark.default.parallelism", "4") + .getOrCreate() + } + + override def afterAll(): Unit = { + if (spark != null) { + spark.stop() + } + } + + test("genParallelRDD should generate expected number of elements for range 0-100") { + val end = 100L + val targetParallelism = 4 + val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, 0, end) + val collected = rdd.collect() + + collected.length shouldBe end + collected.distinct.length shouldBe collected.length + collected.sorted shouldBe (0L until end).toArray + + // Each partition should have data + val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.size)).collect() + partitionSizes.foreach(_ should be > 0) + } + + test("genParallelRDD should work with small ranges") { + val end = 10L + val targetParallelism = 2 + val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, 0, end) + val collected = rdd.collect() + + collected.length shouldBe end + rdd.getNumPartitions shouldBe targetParallelism + collected.distinct.length shouldBe collected.length + collected.sorted shouldBe (0L until end).toArray + + // Each partition should have data + val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.size)).collect() + partitionSizes.foreach(_ should be > 0) + } + + test("genParallelRDD should work with large ranges") { + val end = 10000L + val targetParallelism = 8 + val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, 0, end) + val collected = rdd.collect() + + collected.length shouldBe end + rdd.getNumPartitions shouldBe targetParallelism + collected.distinct.length shouldBe collected.length + collected.sorted shouldBe (0L until end).toArray + + // Each partition should have data + val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.size)).collect() + partitionSizes.foreach(_ should be > 0) + } + + test("genParallelRDD should generate exactly targetParallelism partitions") { + val targetParallelism = 5 + val end = 100L + val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, 0, end) + + // Should have exactly targetParallelism partitions + rdd.getNumPartitions shouldBe targetParallelism + } + + test("genParallelRDD should handle non-divisible end values correctly") { + val end = 97L + val targetParallelism = 7 + val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, 0, end) + val collected = rdd.collect().sorted + + collected.length shouldBe end + collected shouldBe (0L until end).toArray + collected.distinct.length shouldBe collected.length + val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.size)).collect() + partitionSizes.foreach(_ should be > 0) + } + + test("genParallelRDD should work with non-zero start and non-divisible range") { + val start = 50L + val end = 147L // 97 elements total + val targetParallelism = 7 + val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, start, end) + val collected = rdd.collect().sorted + + // Verify correctness of collected + val expectedCount = end - start + collected.length shouldBe expectedCount + collected shouldBe (start until end).toArray + collected.distinct.length shouldBe collected.length + + // Check partition size distribution + // First partition should have 13 elements and last 6 partitions should have 14 elements + rdd.getNumPartitions shouldBe targetParallelism + val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.size)).collect() + partitionSizes.length shouldBe targetParallelism + val expectedSizes = Array(13, 14, 14, 14, 14, 14, 14) + partitionSizes shouldBe expectedSizes + } +} From 53fd3a6e8d7fe40abb14a95539e2c31b0f246aa4 Mon Sep 17 00:00:00 2001 From: Yongkyun Lee Date: Mon, 17 Nov 2025 14:03:50 -0800 Subject: [PATCH 2/4] Add github CI --- .github/workflows/ci.yaml | 53 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 .github/workflows/ci.yaml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..4259fa0 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,53 @@ +name: Java CI + +on: [push] + +permissions: + id-token: write + contents: read + checks: write + pull-requests: write + +concurrency: + group: ${{ github.ref }} + cancel-in-progress: ${{ !contains(github.ref, 'main') }} + +jobs: + build-and-run-unit-tests: + runs-on: arc-4-cores-ondemand-staging-arm + timeout-minutes: 30 + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up JDK 11 + uses: actions/setup-java@v4 + with: + java-version: '11' + distribution: 'temurin' + cache: maven + + - name: Set up Maven + run: | + sudo apt-get update + sudo apt-get install -y maven + mvn --version + + - name: Scalafmt Check + run: mvn scalafmt:check + + - name: Build and Test Maven Artifacts + run: mvn -B -ntp clean package + + - name: Publish Unit Test Results + uses: dorny/test-reporter@v1 + if: success() || failure() + with: + name: Unit Test Results + path: 'target/surefire-reports/TEST-*.xml' + reporter: java-junit + fail-on-error: false + fail-on-empty: false From ffe0d9eb2a0ff35180d19eea74578aab0cf8d0ba Mon Sep 17 00:00:00 2001 From: Yongkyun Lee Date: Mon, 17 Nov 2025 16:48:20 -0800 Subject: [PATCH 3/4] Update CI --- .github/workflows/ci.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4259fa0..83fa4cd 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -36,9 +36,6 @@ jobs: sudo apt-get install -y maven mvn --version - - name: Scalafmt Check - run: mvn scalafmt:check - - name: Build and Test Maven Artifacts run: mvn -B -ntp clean package From 510738edb4013c3aaf8e3cf063ab9b6fd689b599 Mon Sep 17 00:00:00 2001 From: Yongkyun Lee Date: Mon, 17 Nov 2025 17:16:55 -0800 Subject: [PATCH 4/4] Bump test reporter version --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 83fa4cd..55629e8 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -40,7 +40,7 @@ jobs: run: mvn -B -ntp clean package - name: Publish Unit Test Results - uses: dorny/test-reporter@v1 + uses: dorny/test-reporter@v2 if: success() || failure() with: name: Unit Test Results