Skip to content

Commit

Permalink
Add retry with SpillableHostBuffer to GPU scans (#12233)
Browse files Browse the repository at this point in the history
contribute to #8890

This PR adds the support of retry with `SpillableHostBuffer` into GPU
scans when reading the data from files to host, to try to resolve some
CPU OOMs in GPU scans, which are reported from some customer queries.
And these host memory allocations run without the protection of the
retry framework.

The OOM error is like as the below.
```
Caused by: com.nvidia.spark.rapids.jni.CpuRetryOOM: Could not complete allocation after 1000 retries
        at com.nvidia.spark.rapids.HostAlloc.alloc(HostAlloc.scala:244)
        at com.nvidia.spark.rapids.HostAlloc.allocate(HostAlloc.scala:250)
        at ai.rapids.cudf.HostMemoryBuffer.allocate(HostMemoryBuffer.java:138)
        at ai.rapids.cudf.HostMemoryBuffer.allocate(HostMemoryBuffer.java:149)
        at com.nvidia.spark.rapids.ParquetPartitionReaderBase.$anonfun$readPartFile$1(GpuParquetScan.scala:1938)
        at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
        at com.nvidia.spark.rapids.ParquetPartitionReaderBase.readPartFile(GpuParquetScan.scala:1936)
```

This change can be covered by existing tests.

---------

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman authored Feb 28, 2025
1 parent 9c9146c commit d380b18
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import ai.rapids.cudf.{HostMemoryBuffer, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuMetric.{BUFFER_TIME, FILTER_TIME}
import com.nvidia.spark.rapids.RapidsPluginImplicits.{AutoCloseableArray, AutoCloseableProducingSeq}
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
Expand All @@ -53,7 +54,7 @@ import org.apache.spark.util.SerializableConfiguration
* This contains a single HostMemoryBuffer along with other metadata needed
* for combining the buffers before sending to GPU.
*/
case class SingleHMBAndMeta(hmbs: Array[HostMemoryBuffer], bytes: Long, numRows: Long,
case class SingleHMBAndMeta(hmbs: Array[SpillableHostBuffer], bytes: Long, numRows: Long,
blockMeta: Seq[DataBlockBase])

object SingleHMBAndMeta {
Expand Down Expand Up @@ -1007,16 +1008,14 @@ abstract class MultiFileCoalescingPartitionReaderBase(
} else {
val dataBuffer = readPartFiles(currentChunkMeta.currentChunk,
currentChunkMeta.clippedSchema)
if (dataBuffer.getLength == 0) {
if (dataBuffer.length == 0) {
dataBuffer.close()
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
startNewBufferRetry
RmmRapidsRetryIterator.withRetryNoSplit(dataBuffer) { _ =>
// We don't want to actually close the host buffer until we know that we don't
// want to retry more, so offset the close for now.
dataBuffer.incRefCount()
val tableReader = readBufferToTablesAndClose(dataBuffer, dataBuffer.getLength,
val dataBuf = dataBuffer.getDataHostBuffer()
val tableReader = readBufferToTablesAndClose(dataBuf, dataBuf.getLength,
currentChunkMeta.clippedSchema, currentChunkMeta.readSchema,
currentChunkMeta.extraInfo)
CachedGpuBatchIterator(tableReader, colTypes)
Expand All @@ -1042,7 +1041,7 @@ abstract class MultiFileCoalescingPartitionReaderBase(
*/
private def readPartFiles(
blocks: Seq[(Path, DataBlockBase)],
clippedSchema: SchemaBase): HostMemoryBuffer = {
clippedSchema: SchemaBase): SpillableHostBuffer = {

withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW,
metrics("bufferTime"))) { _ =>
Expand All @@ -1057,8 +1056,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
// First, estimate the output file size for the initial allocating.
// the estimated size should be >= size of HEAD + Blocks + FOOTER
val initTotalSize = calculateEstimatedBlocksOutputSize(batchContext)
val initBuf = withRetryNoSplit[HostMemoryBuffer] {
HostMemoryBuffer.allocate(initTotalSize)
}
val (buffer, bufferSize, footerOffset, outBlocks) =
closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { hmb =>
closeOnExcept(initBuf) { hmb =>
// Second, write header
var offset = writeFileHeader(hmb, batchContext)

Expand Down Expand Up @@ -1103,7 +1105,10 @@ abstract class MultiFileCoalescingPartitionReaderBase(
buf = withResource(buffer) { _ =>
withResource(new HostMemoryInputStream(buffer, footerOffset)) { in =>
// realloc memory and copy
closeOnExcept(HostMemoryBuffer.allocate(bufferSize)) { newhmb =>
val newBuf = withRetryNoSplit[HostMemoryBuffer] {
HostMemoryBuffer.allocate(bufferSize)
}
closeOnExcept(newBuf) { newhmb =>
withResource(new HostMemoryOutputStream(newhmb)) { out =>
IOUtils.copy(in, out)
}
Expand Down Expand Up @@ -1133,9 +1138,9 @@ abstract class MultiFileCoalescingPartitionReaderBase(
}
logDebug(s"$getFileFormatShortName Coalescing reading estimates the initTotalSize:" +
s" $initTotalSize, and the true size: $finalBufferSize")
withResource(finalBuffer) { _ =>
finalBuffer.slice(0, finalBufferSize)
}
SpillableHostBuffer(finalBuffer,
finalBufferSize,
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
}
}

Expand Down
57 changes: 33 additions & 24 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import ai.rapids.cudf._
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.SchemaUtils._
import com.nvidia.spark.rapids.filecache.FileCache
import com.nvidia.spark.rapids.jni.CastStrings
Expand Down Expand Up @@ -1063,17 +1064,21 @@ trait OrcPartitionReaderBase extends OrcCommonFunctions with Logging
* @return HostMemeoryBuffer and its data size
*/
protected def readPartFile(ctx: OrcPartitionReaderContext, stripes: Seq[OrcOutputStripe]):
(HostMemoryBuffer, Long) = {
(SpillableHostBuffer, Long) = {
withResource(new NvtxRange("Buffer file split", NvtxColor.YELLOW)) { _ =>
if (stripes.isEmpty) {
return (null, 0L)
}

val hostBufferSize = estimateOutputSize(ctx, stripes)
closeOnExcept(HostMemoryBuffer.allocate(hostBufferSize)) { hmb =>
val hostBuffer = withRetryNoSplit[HostMemoryBuffer] {
HostMemoryBuffer.allocate(hostBufferSize)
}
closeOnExcept(hostBuffer) { hmb =>
withResource(new HostMemoryOutputStream(hmb)) { out =>
writeOrcOutputFile(ctx, out, stripes)
(hmb, out.getPos)
(SpillableHostBuffer(hmb, out.getPos, SpillPriorities.ACTIVE_BATCHING_PRIORITY),
out.getPos)
}
}
}
Expand Down Expand Up @@ -1227,19 +1232,17 @@ class GpuOrcPartitionReader(
dataBuffer.close()
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
// about to start using the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())

val (parseOpts, tableSchema) = getORCOptionsAndSchema(ctx.updatedReadSchema,
ctx.requestedMapping, readDataSchema)
RmmRapidsRetryIterator.withRetryNoSplit(dataBuffer) { _ =>
// Inc the ref count because MakeOrcTableProducer will try to close the dataBuffer
// which we don't want until we know that the retry is done with it.
dataBuffer.incRefCount()

val (parseOpts, tableSchema) = getORCOptionsAndSchema(ctx.updatedReadSchema,
ctx.requestedMapping, readDataSchema)
// MakeOrcTableProducer will try to close the dataBuf
val dataBuf = dataBuffer.getDataHostBuffer()
// Duplicate request is ok and start to use the GPU just after the host
// buffer is ready to not block CPU things.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
val producer = MakeOrcTableProducer(useChunkedReader,
maxChunkedReaderMemoryUsageSizeBytes, conf, targetBatchSizeBytes, parseOpts,
dataBuffer, 0, dataSize, metrics, isCaseSensitive, readDataSchema,
dataBuf, 0, dataSize, metrics, isCaseSensitive, readDataSchema,
tableSchema, Array(partFile), debugDumpPrefix, debugDumpAlways)
CachedGpuBatchIterator(producer, colTypes)
}
Expand Down Expand Up @@ -2239,7 +2242,7 @@ class MultiFileCloudOrcPartitionReader(
}

private def readBufferToBatches(
hostBuffer: HostMemoryBuffer,
hostBuffer: SpillableHostBuffer,
bufferSize: Long,
memFileSchema: TypeDescription,
requestedMapping: Option[Array[Int]],
Expand All @@ -2251,16 +2254,15 @@ class MultiFileCloudOrcPartitionReader(
}
val colTypes = readDataSchema.fields.map(f => f.dataType)

// about to start using the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())

RmmRapidsRetryIterator.withRetryNoSplit(hostBuffer) { _ =>
// The MakeParquetTableProducer will close the input buffer, and that would be bad
// because we don't want to close it until we know that we are done with it
hostBuffer.incRefCount()
// The MakeOrcTableProducer will close the input buffer
val dataBuf = hostBuffer.getDataHostBuffer()
// Duplicate request is ok, and start to use the GPU just after the host
// buffer is ready to not block CPU things.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
val producer = MakeOrcTableProducer(useChunkedReader,
maxChunkedReaderMemoryUsageSizeBytes, conf, targetBatchSizeBytes, parseOpts,
hostBuffer, 0, bufferSize, metrics, isCaseSensitive, readDataSchema,
dataBuf, 0, bufferSize, metrics, isCaseSensitive, readDataSchema,
tableSchema, files, debugDumpPrefix, debugDumpAlways)
val batchIter = CachedGpuBatchIterator(producer, colTypes)

Expand Down Expand Up @@ -2317,8 +2319,13 @@ class MultiFileCloudOrcPartitionReader(
val dataCopyAmount = buf.blockMeta.map(_.getBlockSize).sum
if (dataCopyAmount > 0 && buf.hmbs.nonEmpty) {
require(buf.hmbs.length == 1)
combinedBuf.copyFromHostBuffer(
offset, buf.hmbs.head, OrcTools.ORC_MAGIC.length, dataCopyAmount)
val headBuf = withRetryNoSplit[HostMemoryBuffer] {
buf.hmbs.head.getDataHostBuffer()
}
withResource(headBuf) { _ =>
combinedBuf.copyFromHostBuffer(
offset, headBuf, OrcTools.ORC_MAGIC.length, dataCopyAmount)
}
}
// update the offset for each stripe
var stripeOffset = offset
Expand Down Expand Up @@ -2364,7 +2371,9 @@ class MultiFileCloudOrcPartitionReader(

// e: Create the new meta for the combined buffer
val numRows = combinedMeta.allPartValues.map(_._1).sum
val combinedRet = SingleHMBAndMeta(Array(maybeNewBuf), outStream.getPos, numRows,
val finalBuf = SpillableHostBuffer(maybeNewBuf, maybeNewBuf.getLength,
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
val combinedRet = SingleHMBAndMeta(Array(finalBuf), outStream.getPos, numRows,
blockMetas)
val newHmbWithMeta = metaToUse.copy(
memBuffersAndSizes = Array(combinedRet),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.ParquetPartitionReader.{CopyRange, LocalCopy, PARQUET_MAGIC}
import com.nvidia.spark.rapids.RapidsConf.ParquetFooterReaderType
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.filecache.FileCache
import com.nvidia.spark.rapids.jni.{DateTimeRebase, ParquetFooter}
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ShimFilePartitionReaderFactory, SparkShimImpl}
Expand Down Expand Up @@ -1904,10 +1905,13 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
protected def readPartFile(
blocks: Seq[BlockMetaData],
clippedSchema: MessageType,
filePath: Path): (HostMemoryBuffer, Seq[BlockMetaData]) = {
filePath: Path): (SpillableHostBuffer, Seq[BlockMetaData]) = {
withResource(new NvtxRange("Parquet buffer file split", NvtxColor.YELLOW)) { _ =>
val estTotalSize = calculateParquetOutputSize(blocks, clippedSchema, false)
closeOnExcept(HostMemoryBuffer.allocate(estTotalSize)) { hmb =>
val outHostBuf = withRetryNoSplit[HostMemoryBuffer] {
HostMemoryBuffer.allocate(estTotalSize)
}
closeOnExcept(outHostBuf) { hmb =>
val out = new HostMemoryOutputStream(hmb)
out.write(ParquetPartitionReader.PARQUET_MAGIC)
val outputBlocks = if (compressCfg.decompressAnyCpu) {
Expand All @@ -1924,10 +1928,8 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
throw new QueryExecutionException(s"Calculated buffer size $estTotalSize is to " +
s"small, actual written: ${out.getPos}")
}
val outputBuffer = withResource(hmb) { _ =>
hmb.slice(0, out.getPos)
}
(outputBuffer, outputBlocks)
(SpillableHostBuffer(hmb, out.getPos, SpillPriorities.ACTIVE_BATCHING_PRIORITY),
outputBlocks)
}
}
}
Expand Down Expand Up @@ -2407,7 +2409,8 @@ class MultiFileCloudParquetPartitionReader(
}
val sliceOffset = if (buffers.isEmpty) 0 else PARQUET_MAGIC.size
require(hmbInfo.hmbs.length == 1)
buffers += hmbInfo.hmbs.head.slice(sliceOffset, bytesToSlice)
buffers += SpillableHostBuffer.sliceWithRetry(hmbInfo.hmbs.head, sliceOffset,
bytesToSlice)
}
val outputBlocks = computeBlockMetaData(hmbInfo.blockMeta, offset)
allOutputBlocks ++= outputBlocks
Expand All @@ -2432,7 +2435,9 @@ class MultiFileCloudParquetPartitionReader(
footerOut.write(ParquetPartitionReader.PARQUET_MAGIC)
offset += footerOut.getPos
}
val newHmbBufferInfo = SingleHMBAndMeta(buffers.toArray, offset,
val spHostBufs = buffers.map(b =>
SpillableHostBuffer(b, b.getLength, SpillPriorities.ACTIVE_BATCHING_PRIORITY))
val newHmbBufferInfo = SingleHMBAndMeta(spHostBufs.toArray, offset,
combinedMeta.allPartValues.map(_._1).sum, Seq.empty)
val newHmbMeta = HostMemoryBuffersWithMetaData(
metaToUse.partitionedFile,
Expand Down Expand Up @@ -2657,7 +2662,7 @@ class MultiFileCloudParquetPartitionReader(
val (dataBuffer, blockMeta) =
readPartFile(blocksToRead, fileBlockMeta.schema, filePath)
val numRows = blocksToRead.map(_.getRowCount).sum.toInt
hostBuffers += SingleHMBAndMeta(Array(dataBuffer), dataBuffer.getLength,
hostBuffers += SingleHMBAndMeta(Array(dataBuffer), dataBuffer.length,
numRows, blockMeta)
}
val bytesRead = fileSystemBytesRead() - startingBytesRead
Expand Down Expand Up @@ -2769,27 +2774,26 @@ class MultiFileCloudParquetPartitionReader(
clippedSchema: MessageType,
readDataSchema: StructType,
partedFile: PartitionedFile,
hostBuffers: Array[HostMemoryBuffer],
hostBuffers: Array[SpillableHostBuffer],
allPartValues: Option[Array[(Long, InternalRow)]]): Iterator[ColumnarBatch] = {

val parseOpts = closeOnExcept(hostBuffers) { _ =>
getParquetOptions(readDataSchema, clippedSchema, useFieldId)
}
val colTypes = readDataSchema.fields.map(f => f.dataType)

// about to start using the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())

withResource(hostBuffers) { _ =>
RmmRapidsRetryIterator.withRetryNoSplit {
// The MakeParquetTableProducer will close the input buffers, and that would be bad
// because we don't want to close them until we know that we are done with them.
hostBuffers.foreach(_.incRefCount())
// The MakeParquetTableProducer will close the input buffers
val hostBufs = hostBuffers.safeMap(_.getDataHostBuffer())
// Duplicate request is ok, and start to use the GPU just after the host
// buffer is ready to not block CPU things.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
val tableReader = MakeParquetTableProducer(useChunkedReader,
maxChunkedReaderMemoryUsageSizeBytes,
conf, targetBatchSizeBytes,
parseOpts,
hostBuffers, metrics,
hostBufs, metrics,
dateRebaseMode, timestampRebaseMode, hasInt96Timestamps,
isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, files,
debugDumpPrefix, debugDumpAlways)
Expand Down Expand Up @@ -3039,21 +3043,20 @@ class ParquetPartitionReader(
val (dataBuffer, _) = metrics(BUFFER_TIME).ns {
readPartFile(currentChunkedBlocks, clippedParquetSchema, filePath)
}
if (dataBuffer.getLength == 0) {
if (dataBuffer.length == 0) {
dataBuffer.close()
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
// about to start using the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())

RmmRapidsRetryIterator.withRetryNoSplit(dataBuffer) { _ =>
// Inc the ref count because MakeParquetTableProducer will try to close the dataBuffer
// which we don't want until we know that the retry is done with it.
dataBuffer.incRefCount()
// MakeParquetTableProducer will try to close the hostBuf
val hostBuf = dataBuffer.getDataHostBuffer()
// Duplicate request is ok, and start to use the GPU just after the host
// buffer is ready to not block CPU things.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
val producer = MakeParquetTableProducer(useChunkedReader,
maxChunkedReaderMemoryUsageSizeBytes, conf,
targetBatchSizeBytes, parseOpts,
Array(dataBuffer), metrics,
Array(hostBuf), metrics,
dateRebaseMode, timestampRebaseMode,
hasInt96Timestamps, isSchemaCaseSensitive,
useFieldId, readDataSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.nvidia.spark.rapids

import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer, HostMemoryBuffer}
import com.nvidia.spark.rapids.Arm.closeOnExcept
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.spill.{SpillableColumnarBatchFromBufferHandle, SpillableColumnarBatchHandle, SpillableCompressedColumnarBatchHandle, SpillableDeviceBufferHandle, SpillableHostBufferHandle, SpillableHostColumnarBatchHandle}

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -478,6 +479,19 @@ class SpillableHostBuffer(handle: SpillableHostBufferHandle,
handle.materialize()
}

/**
* Get the host buffer for data part only, which is sliced to the range of [0, length).
* Since a spillable buffer may have larger space than the actual data size.
*/
def getDataHostBuffer(): HostMemoryBuffer = {
val buf = getHostBuffer()
if (length < buf.getLength) {
withResource(buf)(_.slice(0, length))
} else { // length == buf.getLength
buf
}
}

override def toString: String =
s"SpillableHostBuffer length:$length, handle:$handle"
}
Expand Down Expand Up @@ -520,4 +534,10 @@ object SpillableHostBuffer {
}
new SpillableHostBuffer(SpillableHostBufferHandle(buffer), length)
}

def sliceWithRetry(shb: SpillableHostBuffer, start: Long, len: Long): HostMemoryBuffer = {
withRetryNoSplit[HostMemoryBuffer] {
withResource(shb.getHostBuffer())(_.slice(start, len))
}
}
}
Loading

0 comments on commit d380b18

Please sign in to comment.