Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
FMX committed Jan 9, 2025
1 parent e0cafb7 commit 3e846b4
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,58 @@ import org.apache.celeborn.common.protocol.{PbPushDataHandShake, PbRegionFinish,
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.FileChannelUtils

/**
* It's the specific logic for reduce partition writer, map partition writer and map segment partition writer
*/
trait PartitionMetaHandler {

/**
* For reduce partition meta handler, this method won't be invoked.
* For map partition meta handler, this method accept 1,2,4 messages.
* For map segment partition meta handler, this method accept 1,2,3,4 messages.
* @param message This method accept protobuf messages only
* There are only four message types can be accepted.
* 1. PbPushDataHandShake
* 2. PbRegionStart
* 3. PbSegmentStart
* 4. PbRegionFinish
*/
def handleEvent(message: GeneratedMessageV3): Unit

/**
* For reduce partition meta handler, this method will add map id into roaringbitmap if rangeReadFilter is on
* For map partition meta handler, this method will get partition id from bytebuf and update the length of this partition
* For map segment partition meta handler, this method will check the segment contains this partition id
* @param bytes The bytes that contains shuffle data
*/
def beforeWrite(bytes: ByteBuf): Unit

/**
* For reduce partition meta handler, this method will do nothing
* For map partition meta handler, this method will ensure that this region is not finished
* For map segmenta partition meta handler, this method will update segment index
* @param size processed shuffle data size
*/
def afterWrite(size: Int): Unit

def afterFlush(finalFlush: Boolean, size: Int): Unit
/**
* Update file meta about file written bytes changed
* @param size the size of data that put into the data flusher
*/
def afterFlush(size: Int): Unit

/**
* For reduce partition meta handler, this method will do nothing
* For map partition meta handler, this method will clear its index buffer
*/
def beforeDestroy(): Unit

/**
* For reduce partition meta handler, this method will update file meta's chunk offsets
* For map partition meta handler, this method will flush index buffer to disk
* For map segment partition meta handler, this method will flush index buffer to disk and clear
* segment index
*/
def afterClose(): Unit
}

Expand Down Expand Up @@ -214,7 +254,7 @@ class MapPartitionMetaHandler(
isRegionFinished = regionFinished
}

override def afterFlush(finalFlush: Boolean, size: Int): Unit = {
override def afterFlush(size: Int): Unit = {
diskFileInfo.updateBytesFlushed(size)
}

Expand Down Expand Up @@ -290,7 +330,7 @@ class ReducePartitionMetaHandler(val rangeReadFilter: Boolean, val fileInfo: Fil
lazy val mapIdBitMap: Option[RoaringBitmap] =
if (rangeReadFilter) Some(new RoaringBitmap()) else None

override def afterFlush(finalFlush: Boolean, size: Int): Unit = {
override def afterFlush(size: Int): Unit = {
fileInfo.updateBytesFlushed(size)
}

Expand Down Expand Up @@ -389,7 +429,7 @@ class SegmentMapPartitionMetaHandler(diskFileInfo: DiskFileInfo, notifier: Flush
fileMeta.setSegmentGranularityVisible(true)
}

override def afterFlush(finalFlush: Boolean, size: Int): Unit = {
override def afterFlush(size: Int): Unit = {
diskFileInfo.updateBytesFlushed(size)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,31 +143,31 @@ class PartitionMetaHandlerSuite extends CelebornFunSuite with MockitoHelper {

val handler1 = new ReducePartitionMetaHandler(true, diskFileInfo)
handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 0))
handler1.afterFlush(false, 1024)
handler1.afterFlush(1024)
handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 1))
handler1.afterFlush(false, 1024)
handler1.afterFlush(1024)
handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 2))
handler1.afterFlush(false, 1024)
handler1.afterFlush(1024)
handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 3))
handler1.afterFlush(false, 1024)
handler1.afterFlush(1024)
handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 4))
handler1.afterFlush(false, 1024)
handler1.afterFlush(1024)

assert(handler1.mapIdBitMap.get.getCardinality === 5)

assert(diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks === 0)

handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 5))
handler1.afterFlush(false, 1024)
handler1.afterFlush(1024)
handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 6))
handler1.afterFlush(false, 1024)
handler1.afterFlush(1024)
handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 7))
handler1.afterFlush(false, 1024)
handler1.afterFlush(1024)

assert(diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks === 1)

handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 8))
handler1.afterFlush(false, 1024)
handler1.afterFlush(1024)
handler1.afterClose()

assert(diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks == 2)
Expand Down

0 comments on commit 3e846b4

Please sign in to comment.