Skip to content

Commit

Permalink
add new ut
Browse files Browse the repository at this point in the history
  • Loading branch information
FMX committed Apr 18, 2024
1 parent 7722de3 commit ff9bd22
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,10 @@ object PbSerDeUtils {

val allLocations = (inputLocations ++ implicateLocations)
val workerIdList = new util.ArrayList[String](
allLocations.map(_.getWorker.toUniqueId()).asJava)
allLocations.map(_.getWorker.toUniqueId()).toSet.asJava)
val mountPointsList = new util.ArrayList[String](
allLocations.map(
_.getStorageInfo.getMountPoint).asJava)
_.getStorageInfo.getMountPoint).toSet.asJava)

packedLocationsBuilder.addAllWorkerIdsSet(workerIdList)
packedLocationsBuilder.addAllMountPointsSet(mountPointsList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import java.io.File
import java.util

import scala.collection.JavaConverters._
import scala.util.Random

import org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo}
import org.apache.celeborn.common.protocol.{PartitionLocation, PbPackedWorkerResource, PbWorkerResource, StorageInfo}
import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.util.PbSerDeUtils.toPbPackedPartitionbLocationsPair

class PbSerDeUtilsTest extends CelebornFunSuite {

Expand Down Expand Up @@ -312,4 +316,93 @@ class PbSerDeUtilsTest extends CelebornFunSuite {

assert(partitionLocation3 == loc1)
}

private def testSerializationPerformance(scale: Int): Unit = {
val mountPoints = List(
"/mnt/disk1/celeborn/",
"/mnt/disk2/celeborn/",
"/mnt/disk3/celeborn/",
"/mnt/disk4/celeborn/",
"/mnt/disk5/celeborn/",
"/mnt/disk6/celeborn/",
"/mnt/disk7/celeborn/",
"/mnt/disk8/celeborn/")
val hosts = (0 to 50).map(f =>
(
s"host${f}",
Random.nextInt(65535),
Random.nextInt(65535),
Random.nextInt(65535),
Random.nextInt(65535))).toList
val (primaryLocations, replicaLocations) = (0 to scale).map(i => {
val host = hosts(Random.nextInt(50))
val mountPoint = mountPoints(Random.nextInt(8))
val primary = new PartitionLocation(
i,
0,
host._1,
host._2,
host._3,
host._4,
host._5,
PartitionLocation.Mode.PRIMARY,
null,
new StorageInfo(
StorageInfo.Type.HDD,
mountPoint,
false,
mountPoint + "/application/0/" + RandomStringUtils.randomNumeric(6),
StorageInfo.LOCAL_DISK_MASK),
null)

val rHost = hosts(Random.nextInt(50))
val rMountPoint = mountPoints(Random.nextInt(8))

val replicate = new PartitionLocation(
i,
0,
rHost._1,
rHost._2,
rHost._3,
rHost._4,
rHost._5,
PartitionLocation.Mode.REPLICA,
null,
new StorageInfo(
StorageInfo.Type.HDD,
rMountPoint,
false,
rMountPoint + "/application-xxxsdsada-1/0/" + RandomStringUtils.randomNumeric(6),
StorageInfo.LOCAL_DISK_MASK),
null)
primary.setPeer(replicate)
replicate.setPeer(primary)
(primary, replicate)
}).toList.unzip

val workerResourceSize = PbWorkerResource.newBuilder()
.addAllPrimaryPartitions(primaryLocations.map(PbSerDeUtils.toPbPartitionLocation).asJava)
.addAllReplicaPartitions(replicaLocations.map(PbSerDeUtils.toPbPartitionLocation).asJava)
.setNetworkLocation("location1")
.build().toByteArray.length

val packedWorkerResourceSize = PbPackedWorkerResource.newBuilder()
.setLocationPairs(toPbPackedPartitionbLocationsPair(
primaryLocations ++ replicaLocations))
.setNetworkLocation("location1")
.build().toByteArray.length

assert(packedWorkerResourceSize < workerResourceSize)
log.info(s"Packed size : ${packedWorkerResourceSize} unpacked size :${workerResourceSize}")
log.info(
s"Reduced size : ${(workerResourceSize - packedWorkerResourceSize) / (workerResourceSize * 1.0f) * 100} %")
}

test("serializationComparasion") {
testSerializationPerformance(100)
testSerializationPerformance(1000)
testSerializationPerformance(10000)
testSerializationPerformance(20000)
}

}

0 comments on commit ff9bd22

Please sign in to comment.