Skip to content

Commit

Permalink
add UT for arrowUtil
Browse files Browse the repository at this point in the history
  • Loading branch information
ycycse committed Sep 17, 2024
1 parent 471aa71 commit 86f8333
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ object ArrowUtils {
val vector = new BitVector(field.name, allocator)
vector.allocateNew(df.count().toInt)
vector
case StringType =>
val vector = new VarCharVector(field.name, allocator)
case _ =>
val vector: VarCharVector = new VarCharVector(field.name, allocator)
vector.allocateNew(df.count().toInt)
vector
case _ => throw new IllegalArgumentException(s"Unsupported data type: ${field.dataType}")
}
}.toList

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.spark.executor

import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.ArrowStreamReader
import org.apache.linkis.DataWorkCloudApplication
import org.apache.linkis.common.conf.DWCArgumentsParser
import org.apache.linkis.engineplugin.spark.utils.ArrowUtils
import org.apache.spark.sql.SparkSession
import org.junit.jupiter.api.{Assertions, Test}

import java.io.ByteArrayInputStream
import scala.collection.mutable

class TestArrowUtil {

def initService(port: String): Unit = {
System.setProperty("wds.linkis.server.version", "v1")
System.setProperty(
"wds.linkis.engineconn.plugin.default.class",
"org.apache.linkis.engineplugin.spark.SparkEngineConnPlugin"
)
val map = new mutable.HashMap[String, String]()
map.put("spring.mvc.servlet.path", "/api/rest_j/v1")
map.put("server.port", port)
map.put("spring.application.name", "SparkSqlExecutor")
map.put("eureka.client.register-with-eureka", "false")
map.put("eureka.client.fetch-registry", "false")
DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(map.toMap))
}

@Test
def testToArrow: Unit = {
initService("26380")
val path = this.getClass.getResource("/").getPath
System.setProperty("HADOOP_CONF_DIR", path)
System.setProperty("wds.linkis.filesystem.hdfs.root.path", path)
System.setProperty("java.io.tmpdir", path)
val sparkSession = SparkSession
.builder()
.master("local[1]")
.appName("testToArrow")
.getOrCreate()
val dataFrame = sparkSession
.createDataFrame(
Seq(("test1", 23, 552214221L), ("test2", 19, 41189877L), ("test3", 241, 1555223L))
)
.toDF("name", "age", "id")
val arrowBytes = ArrowUtils.toArrow(dataFrame)

// read arrow bytes for checking
val allocator = new RootAllocator(Long.MaxValue)
val byteArrayInputStream = new ByteArrayInputStream(arrowBytes)
val streamReader = new ArrowStreamReader(byteArrayInputStream, allocator)

try {
val root: VectorSchemaRoot = streamReader.getVectorSchemaRoot
val expectedData =
Seq(("test1", 23, 552214221L), ("test2", 19, 41189877L), ("test3", 241, 1555223L))

var rowIndex = 0
while (streamReader.loadNextBatch()) {
for (i <- 0 until root.getRowCount) {
val name = root.getVector("name").getObject(i).toString
val age = root.getVector("age").getObject(i).asInstanceOf[Int]
val id = root.getVector("id").getObject(i).asInstanceOf[Long]

val (expectedName, expectedAge, expectedId) = expectedData(rowIndex)
Assertions.assertEquals(name, expectedName)
Assertions.assertEquals(age, expectedAge)
Assertions.assertEquals(id, expectedId)
rowIndex += 1
}
}
Assertions.assertEquals(rowIndex, expectedData.length)
} finally {
streamReader.close()
allocator.close()
}
}

}
4 changes: 4 additions & 0 deletions tool/dependencies/known-dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ antlr-runtime-3.5.2.jar
aopalliance-1.0.jar
aopalliance-repackaged-2.4.0-b34.jar
arrow-format-0.8.0.jar
arrow-format-2.0.0.jar
arrow-memory-0.8.0.jar
arrow-memory-core-2.0.0.jar
arrow-vector-0.8.0.jar
arrow-vector-2.0.0.jar
asm-9.3.jar
asm-analysis-9.3.jar
asm-commons-9.3.jar
Expand Down Expand Up @@ -138,6 +141,7 @@ feign-form-spring-3.8.0.jar
feign-slf4j-11.10.jar
findbugs-annotations-1.3.9-1.jar
flatbuffers-1.2.0-3f79e055.jar
flatbuffers-java-1.9.0.jar
flink-annotations-1.12.2.jar
flink-annotations-1.16.2.jar
flink-cep-1.16.2.jar
Expand Down

0 comments on commit 86f8333

Please sign in to comment.