Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
name: Java CI

on: [push]

permissions:
id-token: write
contents: read
checks: write
pull-requests: write

concurrency:
group: ${{ github.ref }}
cancel-in-progress: ${{ !contains(github.ref, 'main') }}

jobs:
build-and-run-unit-tests:
runs-on: arc-4-cores-ondemand-staging-arm
timeout-minutes: 30

steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '11'
distribution: 'temurin'
cache: maven

- name: Set up Maven
run: |
sudo apt-get update
sudo apt-get install -y maven
mvn --version

- name: Build and Test Maven Artifacts
run: mvn -B -ntp clean package

- name: Publish Unit Test Results
uses: dorny/test-reporter@v2
if: success() || failure()
with:
name: Unit Test Results
path: 'target/surefire-reports/TEST-*.xml'
reporter: java-junit
fail-on-error: false
fail-on-empty: false
47 changes: 47 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,35 @@
</testSourceDirectories>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<includes>
<include>**/*Test.class</include>
<include>**/*Suite.class</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>2.2.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
Expand Down Expand Up @@ -205,5 +234,23 @@
<artifactId>jcommander</artifactId>
<version>1.72</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.12</artifactId>
<version>3.2.15</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.12</artifactId>
<version>2.15.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
10 changes: 2 additions & 8 deletions src/main/scala/ai/onehouse/lakeloader/ChangeDataGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -452,17 +452,11 @@ object ChangeDataGenerator {
}
}

private def genParallelRDD(
private[lakeloader] def genParallelRDD(
spark: SparkSession,
targetParallelism: Int,
start: Long,
end: Long): RDD[Long] = {
val partitionSize = (end - start) / targetParallelism
spark.sparkContext
.parallelize(0 to targetParallelism, targetParallelism)
.mapPartitions { it =>
val partitionStart = it.next() * partitionSize
(partitionStart to partitionStart + partitionSize).iterator
}
spark.sparkContext.range(start, end, numSlices = targetParallelism)
}
}
132 changes: 132 additions & 0 deletions src/test/scala/ai/onehouse/lakeloader/UnitTestSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.onehouse.lakeloader

import org.apache.spark.sql.SparkSession
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

class UnitTestSuite extends AnyFunSuite with Matchers with BeforeAndAfterAll {

var spark: SparkSession = _

override def beforeAll(): Unit = {
spark = SparkSession
.builder()
.appName("UnitTestSuite")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.default.parallelism", "4")
.getOrCreate()
}

override def afterAll(): Unit = {
if (spark != null) {
spark.stop()
}
}

test("genParallelRDD should generate expected number of elements for range 0-100") {
val end = 100L
val targetParallelism = 4
val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, 0, end)
val collected = rdd.collect()

collected.length shouldBe end
collected.distinct.length shouldBe collected.length
collected.sorted shouldBe (0L until end).toArray

// Each partition should have data
val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.size)).collect()
partitionSizes.foreach(_ should be > 0)
}

test("genParallelRDD should work with small ranges") {
val end = 10L
val targetParallelism = 2
val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, 0, end)
val collected = rdd.collect()

collected.length shouldBe end
rdd.getNumPartitions shouldBe targetParallelism
collected.distinct.length shouldBe collected.length
collected.sorted shouldBe (0L until end).toArray

// Each partition should have data
val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.size)).collect()
partitionSizes.foreach(_ should be > 0)
}

test("genParallelRDD should work with large ranges") {
val end = 10000L
val targetParallelism = 8
val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, 0, end)
val collected = rdd.collect()

collected.length shouldBe end
rdd.getNumPartitions shouldBe targetParallelism
collected.distinct.length shouldBe collected.length
collected.sorted shouldBe (0L until end).toArray

// Each partition should have data
val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.size)).collect()
partitionSizes.foreach(_ should be > 0)
}

test("genParallelRDD should generate exactly targetParallelism partitions") {
val targetParallelism = 5
val end = 100L
val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, 0, end)

// Should have exactly targetParallelism partitions
rdd.getNumPartitions shouldBe targetParallelism
}

test("genParallelRDD should handle non-divisible end values correctly") {
val end = 97L
val targetParallelism = 7
val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, 0, end)
val collected = rdd.collect().sorted

collected.length shouldBe end
collected shouldBe (0L until end).toArray
collected.distinct.length shouldBe collected.length
val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.size)).collect()
partitionSizes.foreach(_ should be > 0)
}

test("genParallelRDD should work with non-zero start and non-divisible range") {
val start = 50L
val end = 147L // 97 elements total
val targetParallelism = 7
val rdd = ChangeDataGenerator.genParallelRDD(spark, targetParallelism, start, end)
val collected = rdd.collect().sorted

// Verify correctness of collected
val expectedCount = end - start
collected.length shouldBe expectedCount
collected shouldBe (start until end).toArray
collected.distinct.length shouldBe collected.length

// Check partition size distribution
// First partition should have 13 elements and last 6 partitions should have 14 elements
rdd.getNumPartitions shouldBe targetParallelism
val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.size)).collect()
partitionSizes.length shouldBe targetParallelism
val expectedSizes = Array(13, 14, 14, 14, 14, 14, 14)
partitionSizes shouldBe expectedSizes
}
}