From 4a880792ba03437239bf4c7ae9e811d210576ed5 Mon Sep 17 00:00:00 2001 From: Ethan Feng Date: Sat, 23 Jul 2022 11:17:38 +0800 Subject: [PATCH] [BUG] multi-thread flusher causes data inconsistent with chunk offsets (#275) (cherry picked from commit cb42b2fa5cf1113bf315d6bd09a8cb6154518622) --- .../rss/service/deploy/worker/FileWriter.java | 8 +- .../deploy/worker/LocalStorageManager.scala | 89 +++++++++++-------- 2 files changed, 57 insertions(+), 40 deletions(-) diff --git a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java index c30849d3952..1f35d482aff 100644 --- a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java +++ b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java @@ -60,6 +60,7 @@ public final class FileWriter extends DeviceObserver { private long bytesFlushed; private final DiskFlusher flusher; + private final int flushWorkerIndex; private CompositeByteBuf flushBuffer; private final long chunkSize; @@ -119,6 +120,7 @@ public FileWriter( PartitionSplitMode splitMode) throws IOException { this.file = file; this.flusher = flusher; + this.flushWorkerIndex = flusher.getWorkerIndex(); this.dataRootDir = workingDir; this.chunkSize = chunkSize; this.nextBoundary = chunkSize; @@ -315,7 +317,7 @@ private void takeBuffer() { } // real action - flushBuffer = flusher.takeBuffer(timeoutMs); + flushBuffer = flusher.takeBuffer(timeoutMs, flushWorkerIndex); // metrics end if (source.samplePerfCritical()) { @@ -330,7 +332,7 @@ private void takeBuffer() { } private void addTask(FlushTask task) throws IOException { - if (!flusher.addTask(task, timeoutMs)) { + if (!flusher.addTask(task, timeoutMs, flushWorkerIndex)) { IOException e = new IOException("Add flush task timeout."); notifier.setException(e); throw e; @@ -339,7 +341,7 @@ private void addTask(FlushTask task) throws IOException { private synchronized void returnBuffer() { if (flushBuffer != null) { - flusher.returnBuffer(flushBuffer); + flusher.returnBuffer(flushBuffer, flushWorkerIndex); flushBuffer = null; } } diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala index 03ce0df2aa5..40ec5dcb339 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala @@ -53,13 +53,10 @@ private[worker] final class DiskFlusher( val deviceMonitor: DeviceMonitor, val threadCount: Int) extends DeviceObserver with Logging { private lazy val diskFlusherId = System.identityHashCode(this) - private val workingQueue = new LinkedBlockingQueue[FlushTask](queueCapacity) - private val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf](queueCapacity) - private val writeActionPool = ThreadUtils.newDaemonFixedThreadPool(threadCount, - workingDir.getName + "-flusher") - for (_ <- 0 until queueCapacity) { - bufferQueue.put(Unpooled.compositeBuffer(256)) - } + private val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount) + private val bufferQueues = new Array[LinkedBlockingQueue[CompositeByteBuf]](threadCount) + private val workers = new Array[Thread](threadCount) + private val nextWorkerIndex = new AtomicInteger() @volatile private var lastBeginFlushTime: Long = -1 @@ -68,12 +65,21 @@ private[worker] final class DiskFlusher( val stopFlag = new AtomicBoolean(false) val rand = new Random() - private val worker = new Thread(s"$this") { - override def run(): Unit = { - while (!stopFlag.get()) { - val task = workingQueue.take() - writeActionPool.submit(new Runnable { - override def run(): Unit = { + init() + + private def init(): Unit = { + val actualQueueSize = queueCapacity / threadCount + 1 + for (index <- 0 until (threadCount)) { + workingQueues(index) = new LinkedBlockingQueue[FlushTask](actualQueueSize) + bufferQueues(index) = new LinkedBlockingQueue[CompositeByteBuf](actualQueueSize) + for (_ <- 0 until actualQueueSize) { + bufferQueues(index).put(Unpooled.compositeBuffer(256)) + } + + workers(index) = new Thread(s"$this-$index") { + override def run(): Unit = { + while (!stopFlag.get()) { + val task = workingQueues(index).take() val key = s"DiskFlusher-$workingDir-${rand.nextInt()}" workerSource.sample(WorkerSource.FlushDataTime, key) { if (!task.notifier.hasException) { @@ -90,38 +96,46 @@ private[worker] final class DiskFlusher( } lastBeginFlushTime = -1 } - returnBuffer(task.buffer) + returnBuffer(task.buffer, index) task.notifier.numPendingFlushes.decrementAndGet() } } - }) + } } + workers(index).setDaemon(true) + workers(index).setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler { + override def uncaughtException(t: Thread, e: Throwable): Unit = { + logError(s"$this thread terminated.", e) + } + }) + workers(index).start() } + + deviceMonitor.registerDiskFlusher(this) } - worker.setDaemon(true) - worker.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler { - override def uncaughtException(t: Thread, e: Throwable): Unit = { - logError(s"$this thread terminated.", e) - } - }) - worker.start() - deviceMonitor.registerDiskFlusher(this) + def getWorkerIndex: Int = { + val nextIndex = nextWorkerIndex.getAndIncrement() + if (nextIndex > threadCount) { + nextWorkerIndex.set(0) + } + nextIndex % threadCount + } - def takeBuffer(timeoutMs: Long): CompositeByteBuf = { - bufferQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) + def takeBuffer(timeoutMs: Long, workerIndex: Int): CompositeByteBuf = { + bufferQueues(workerIndex).poll(timeoutMs, TimeUnit.MILLISECONDS) } - def returnBuffer(buffer: CompositeByteBuf): Unit = { + def returnBuffer(buffer: CompositeByteBuf, workerIndex: Int): Unit = { MemoryTracker.instance().releaseDiskBuffer(buffer.readableBytes()) buffer.removeComponents(0, buffer.numComponents()) buffer.clear() - bufferQueue.put(buffer) + bufferQueues(workerIndex).put(buffer) } - def addTask(task: FlushTask, timeoutMs: Long): Boolean = { - workingQueue.offer(task, timeoutMs, TimeUnit.MILLISECONDS) + def addTask(task: FlushTask, timeoutMs: Long, workerIndex: Int): Boolean = { + workingQueues(workerIndex).offer(task, timeoutMs, TimeUnit.MILLISECONDS) } override def notifyError(deviceName: String, dirs: ListBuffer[File] = null, @@ -129,16 +143,17 @@ private[worker] final class DiskFlusher( logError(s"$this is notified Device $deviceName Error $deviceErrorType! Stop Flusher.") stopFlag.set(true) try { - worker.interrupt() - writeActionPool.shutdown() + workers.foreach(_.interrupt()) } catch { case e: Exception => - logError(s"Exception when interrupt worker: $worker, $e") + logError(s"Exception when interrupt worker: $workers, $e") + } + workingQueues.foreach { queue => + queue.asScala.foreach { task => + task.buffer.removeComponents(0, task.buffer.numComponents()) + task.buffer.clear() + } } - workingQueue.asScala.foreach(task => { - task.buffer.removeComponents(0, task.buffer.numComponents()) - task.buffer.clear() - }) deviceMonitor.unregisterDiskFlusher(this) } @@ -147,7 +162,7 @@ private[worker] final class DiskFlusher( deviceMonitor.reportDeviceError(workingDir, e, deviceErrorType) } - def bufferQueueInfo(): String = s"$this available buffers: ${bufferQueue.size()}" + def bufferQueueInfo(): String = s"$this used buffers: ${bufferQueues.map(_.size()).toList}" override def hashCode(): Int = { workingDir.hashCode()