From 3dd5aa2e595a07f7d5afa3714061bcac4ada637d Mon Sep 17 00:00:00 2001 From: Anuj Modi Date: Tue, 7 Jan 2025 01:05:10 -0800 Subject: [PATCH 1/5] Make ReadBufferManager Interface --- .../fs/azurebfs/services/AbfsInputStream.java | 8 +- .../azurebfs/services/ReadBufferManager.java | 648 +---------------- .../services/ReadBufferManagerV1.java | 651 ++++++++++++++++++ .../services/ReadBufferManagerV2.java | 516 ++++++++++++++ .../azurebfs/services/ReadBufferWorker.java | 4 +- ...ger.java => ITestReadBufferManagerV1.java} | 12 +- .../services/TestAbfsInputStream.java | 59 +- 7 files changed, 1210 insertions(+), 688 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java rename hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/{ITestReadBufferManager.java => ITestReadBufferManagerV1.java} (96%) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 5d770805a51c4..0468acd66e020 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -175,7 +175,7 @@ public AbfsInputStream( // Propagate the config values to ReadBufferManager so that the first instance // to initialize can set the readAheadBlockSize - ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize); + ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize); if (streamStatistics != null) { ioStatistics = streamStatistics.getIOStatistics(); } @@ -510,7 +510,7 @@ private int readInternal(final long position, final byte[] b, final int offset, while (numReadAheads > 0 && nextOffset < contentLength) { LOG.debug("issuing read ahead requestedOffset = {} requested size {}", nextOffset, nextSize); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize, + ReadBufferManagerV1.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize, new TracingContext(readAheadTracingContext)); nextOffset = nextOffset + nextSize; numReadAheads--; @@ -519,7 +519,7 @@ private int readInternal(final long position, final byte[] b, final int offset, } // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); + receivedBytes = ReadBufferManagerV1.getBufferManager().getBlock(this, position, length, b); bytesFromReadAhead += receivedBytes; if (receivedBytes > 0) { incrementReadOps(); @@ -720,7 +720,7 @@ public boolean seekToNewSource(long l) throws IOException { public synchronized void close() throws IOException { LOG.debug("Closing {}", this); closed = true; - ReadBufferManager.getBufferManager().purgeBuffersForStream(this); + ReadBufferManagerV1.getBufferManager().purgeBuffersForStream(this); buffer = null; // de-reference the buffer so it can be GC'ed sooner if (contextEncryptionAdapter != null) { contextEncryptionAdapter.destroy(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 031545f57a193..ba83520b63c2d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -1,650 +1,4 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.hadoop.fs.azurebfs.services; -import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.Stack; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.hadoop.fs.azurebfs.utils.TracingContext; -import org.apache.hadoop.classification.VisibleForTesting; - -/** - * The Read Buffer Manager for Rest AbfsClient. - */ -final class ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); - private static final int ONE_KB = 1024; - private static final int ONE_MB = ONE_KB * ONE_KB; - - private static final int NUM_BUFFERS = 16; - private static final int NUM_THREADS = 8; - private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold - - private static int blockSize = 4 * ONE_MB; - private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; - private Thread[] threads = new Thread[NUM_THREADS]; - private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack freeList = new Stack<>(); // indices in buffers[] array that are available - - private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet - private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads - private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading - private static ReadBufferManager bufferManager; // singleton, initialized in static initialization block - private static final ReentrantLock LOCK = new ReentrantLock(); - - static ReadBufferManager getBufferManager() { - if (bufferManager == null) { - LOCK.lock(); - try { - if (bufferManager == null) { - bufferManager = new ReadBufferManager(); - bufferManager.init(); - } - } finally { - LOCK.unlock(); - } - } - return bufferManager; - } - - static void setReadBufferManagerConfigs(int readAheadBlockSize) { - if (bufferManager == null) { - LOGGER.debug( - "ReadBufferManager not initialized yet. Overriding readAheadBlockSize as {}", - readAheadBlockSize); - blockSize = readAheadBlockSize; - } - } - - private void init() { - buffers = new byte[NUM_BUFFERS][]; - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC - freeList.add(i); - } - for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); - t.setDaemon(true); - threads[i] = t; - t.setName("ABFS-prefetch-" + i); - t.start(); - } - ReadBufferWorker.UNLEASH_WORKERS.countDown(); - } - - // hide instance constructor - private ReadBufferManager() { - LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); - } - - - /* - * - * AbfsInputStream-facing methods - * - */ - - - /** - * {@link AbfsInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link AbfsInputStream} for which to do the read-ahead - * @param requestedOffset The offset in the file which shoukd be read - * @param requestedLength The length to read - */ - void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, - TracingContext tracingContext) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", - stream.getPath(), requestedOffset, requestedLength); - } - ReadBuffer buffer; - synchronized (this) { - if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again - } - if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything - } - - buffer = new ReadBuffer(); - buffer.setStream(stream); - buffer.setOffset(requestedOffset); - buffer.setLength(0); - buffer.setRequestedLength(requestedLength); - buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); - buffer.setLatch(new CountDownLatch(1)); - buffer.setTracingContext(tracingContext); - - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); - readAheadQueue.add(buffer); - notifyAll(); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); - } - } - } - - - /** - * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a - * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading - * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead - * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because - * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own - * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). - * - * @param stream the file to read bytes for - * @param position the offset in the file to do a read for - * @param length the length to read - * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. - * @return the number of bytes read - */ - int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) - throws IOException { - // not synchronized, so have to be careful with locking - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file {} position {} thread {}", - stream.getPath(), position, Thread.currentThread().getName()); - } - - waitForProcess(stream, position); - - int bytesRead = 0; - synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); - } - if (bytesRead > 0) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for {} position {} length {}", - stream.getPath(), position, bytesRead); - } - return bytesRead; - } - - // otherwise, just say we got nothing - calling thread can do its own read - return 0; - } - - /* - * - * Internal methods - * - */ - - private void waitForProcess(final AbfsInputStream stream, final long position) { - ReadBuffer readBuf; - synchronized (this) { - clearFromReadAheadQueue(stream, position); - readBuf = getFromList(inProgressList, stream, position); - } - if (readBuf != null) { // if in in-progress queue, then block for it - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", - stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); - } - readBuf.getLatch().await(); // blocking wait on the caller stream's thread - // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread - // is done processing it (in doneReading). There, the latch is set after removing the buffer from - // inProgressList. So this latch is safe to be outside the synchronized block. - // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock - // while waiting, so no one will be able to change any state. If this becomes more complex in the future, - // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); - } - } - } - - /** - * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. - * The objective is to find just one buffer - there is no advantage to evicting more than one. - * - * @return whether the eviction succeeeded - i.e., were we able to free up one buffer - */ - private synchronized boolean tryEvict() { - ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { - return false; // there are no evict-able buffers - } - - long currentTimeInMs = currentTimeMillis(); - - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) - for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { - nodeToEvict = buf; - break; - } - } - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) - for (ReadBuffer buf : completedReadList) { - if (buf.isAnyByteConsumed()) { - nodeToEvict = buf; - break; - } - } - - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try any old nodes that have not been consumed - // Failed read buffers (with buffer index=-1) that are older than - // thresholdAge should be cleaned up, but at the same time should not - // report successful eviction. - // Queue logic expects that a buffer is freed up for read ahead when - // eviction is successful, whereas a failed ReadBuffer would have released - // its buffer when its status was set to READ_FAILED. - long earliestBirthday = Long.MAX_VALUE; - ArrayList oldFailedBuffers = new ArrayList<>(); - for (ReadBuffer buf : completedReadList) { - if ((buf.getBufferindex() != -1) - && (buf.getTimeStamp() < earliestBirthday)) { - nodeToEvict = buf; - earliestBirthday = buf.getTimeStamp(); - } else if ((buf.getBufferindex() == -1) - && (currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) { - oldFailedBuffers.add(buf); - } - } - - for (ReadBuffer buf : oldFailedBuffers) { - evict(buf); - } - - if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) { - return evict(nodeToEvict); - } - - LOGGER.trace("No buffer eligible for eviction"); - // nothing can be evicted - return false; - } - - private boolean evict(final ReadBuffer buf) { - // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, - // avoid adding it to freeList. - if (buf.getBufferindex() != -1) { - freeList.push(buf.getBufferindex()); - } - - completedReadList.remove(buf); - buf.setTracingContext(null); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); - } - return true; - } - - private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) { - // returns true if any part of the buffer is already queued - return (isInList(readAheadQueue, stream, requestedOffset) - || isInList(inProgressList, stream, requestedOffset) - || isInList(completedReadList, stream, requestedOffset)); - } - - private boolean isInList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { - return (getFromList(list, stream, requestedOffset) != null); - } - - private ReadBuffer getFromList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : list) { - if (buffer.getStream() == stream) { - if (buffer.getStatus() == ReadBufferStatus.AVAILABLE - && requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getLength()) { - return buffer; - } else if (requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { - return buffer; - } - } - } - return null; - } - - /** - * Returns buffers that failed or passed from completed queue. - * @param stream - * @param requestedOffset - * @return - */ - private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : completedReadList) { - // Buffer is returned if the requestedOffset is at or above buffer's - // offset but less than buffer's length or the actual requestedLength - if ((buffer.getStream() == stream) - && (requestedOffset >= buffer.getOffset()) - && ((requestedOffset < buffer.getOffset() + buffer.getLength()) - || (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) { - return buffer; - } - } - - return null; - } - - private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { - ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); - if (buffer != null) { - readAheadQueue.remove(buffer); - notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); - } - } - - private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, - final byte[] buffer) throws IOException { - ReadBuffer buf = getBufferFromCompletedQueue(stream, position); - - if (buf == null) { - return 0; - } - - if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { - // To prevent new read requests to fail due to old read-ahead attempts, - // return exception only from buffers that failed within last thresholdAgeMilliseconds - if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) { - throw buf.getErrException(); - } else { - return 0; - } - } - - if ((buf.getStatus() != ReadBufferStatus.AVAILABLE) - || (position >= buf.getOffset() + buf.getLength())) { - return 0; - } - - int cursor = (int) (position - buf.getOffset()); - int availableLengthInBuffer = buf.getLength() - cursor; - int lengthToCopy = Math.min(length, availableLengthInBuffer); - System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); - return lengthToCopy; - } - - /* - * - * ReadBufferWorker-thread-facing methods - * - */ - - /** - * ReadBufferWorker thread calls this to get the next buffer that it should work on. - * - * @return {@link ReadBuffer} - * @throws InterruptedException if thread is interrupted - */ - ReadBuffer getNextBlockToRead() throws InterruptedException { - ReadBuffer buffer = null; - synchronized (this) { - //buffer = readAheadQueue.take(); // blocking method - while (readAheadQueue.size() == 0) { - wait(); - } - buffer = readAheadQueue.remove(); - notifyAll(); - if (buffer == null) { - return null; // should never happen - } - buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); - inProgressList.add(buffer); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker picked file {} for offset {}", - buffer.getStream().getPath(), buffer.getOffset()); - } - return buffer; - } - - /** - * ReadBufferWorker thread calls this method to post completion. - * - * @param buffer the buffer whose read was completed - * @param result the {@link ReadBufferStatus} after the read operation in the worker thread - * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read - */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); - } - synchronized (this) { - // If this buffer has already been purged during - // close of InputStream then we don't update the lists. - if (inProgressList.contains(buffer)) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setLength(bytesActuallyRead); - } else { - freeList.push(buffer.getBufferindex()); - // buffer will be deleted as per the eviction policy. - } - // completed list also contains FAILED read buffers - // for sending exception message to clients. - buffer.setStatus(result); - buffer.setTimeStamp(currentTimeMillis()); - completedReadList.add(buffer); - } - } - - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) - } - - /** - * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). - * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), - * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. - * Note: it is not monotonic across Sockets, and even within a CPU, its only the - * more recent parts which share a clock across all cores. - * - * @return current time in milliseconds - */ - private long currentTimeMillis() { - return System.nanoTime() / 1000 / 1000; - } - - @VisibleForTesting - int getThresholdAgeMilliseconds() { - return thresholdAgeMilliseconds; - } - - @VisibleForTesting - static void setThresholdAgeMilliseconds(int thresholdAgeMs) { - thresholdAgeMilliseconds = thresholdAgeMs; - } - - @VisibleForTesting - int getCompletedReadListSize() { - return completedReadList.size(); - } - - @VisibleForTesting - public synchronized List getCompletedReadListCopy() { - return new ArrayList<>(completedReadList); - } - - @VisibleForTesting - public synchronized List getFreeListCopy() { - return new ArrayList<>(freeList); - } - - @VisibleForTesting - public synchronized List getReadAheadQueueCopy() { - return new ArrayList<>(readAheadQueue); - } - - @VisibleForTesting - public synchronized List getInProgressCopiedList() { - return new ArrayList<>(inProgressList); - } - - @VisibleForTesting - void callTryEvict() { - tryEvict(); - } - - - /** - * Purging the buffers associated with an {@link AbfsInputStream} - * from {@link ReadBufferManager} when stream is closed. - * @param stream input stream. - */ - public synchronized void purgeBuffersForStream(AbfsInputStream stream) { - LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); - readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); - purgeList(stream, completedReadList); - } - - /** - * Method to remove buffers associated with a {@link AbfsInputStream} - * when its close method is called. - * NOTE: This method is not threadsafe and must be called inside a - * synchronised block. See caller. - * @param stream associated input stream. - * @param list list of buffers like {@link this#completedReadList} - * or {@link this#inProgressList}. - */ - private void purgeList(AbfsInputStream stream, LinkedList list) { - for (Iterator it = list.iterator(); it.hasNext();) { - ReadBuffer readBuffer = it.next(); - if (readBuffer.getStream() == stream) { - it.remove(); - // As failed ReadBuffers (bufferIndex = -1) are already pushed to free - // list in doneReading method, we will skip adding those here again. - if (readBuffer.getBufferindex() != -1) { - freeList.push(readBuffer.getBufferindex()); - } - } - } - } - - /** - * Test method that can clean up the current state of readAhead buffers and - * the lists. Will also trigger a fresh init. - */ - @VisibleForTesting - void testResetReadBufferManager() { - synchronized (this) { - ArrayList completedBuffers = new ArrayList<>(); - for (ReadBuffer buf : completedReadList) { - if (buf != null) { - completedBuffers.add(buf); - } - } - - for (ReadBuffer buf : completedBuffers) { - evict(buf); - } - - readAheadQueue.clear(); - inProgressList.clear(); - completedReadList.clear(); - freeList.clear(); - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = null; - } - buffers = null; - resetBufferManager(); - } - } - - /** - * Reset buffer manager to null. - */ - @VisibleForTesting - static void resetBufferManager() { - bufferManager = null; - } - - /** - * Reset readAhead buffer to needed readAhead block size and - * thresholdAgeMilliseconds. - * @param readAheadBlockSize - * @param thresholdAgeMilliseconds - */ - @VisibleForTesting - void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds) { - setBlockSize(readAheadBlockSize); - setThresholdAgeMilliseconds(thresholdAgeMilliseconds); - testResetReadBufferManager(); - } - - @VisibleForTesting - static void setBlockSize(int readAheadBlockSize) { - blockSize = readAheadBlockSize; - } - - @VisibleForTesting - int getReadAheadBlockSize() { - return blockSize; - } - - /** - * Test method that can mimic no free buffers scenario and also add a ReadBuffer - * into completedReadList. This readBuffer will get picked up by TryEvict() - * next time a new queue request comes in. - * @param buf that needs to be added to completedReadlist - */ - @VisibleForTesting - void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) { - freeList.clear(); - completedReadList.add(buf); - } - - @VisibleForTesting - int getNumBuffers() { - return NUM_BUFFERS; - } +public interface ReadBufferManager { } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java new file mode 100644 index 0000000000000..15ddf48d17af3 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -0,0 +1,651 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Stack; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.classification.VisibleForTesting; + +/** + * The Read Buffer Manager for Rest AbfsClient. + */ +final class ReadBufferManagerV1 implements ReadBufferManager { + private static final Logger LOGGER = LoggerFactory.getLogger( + ReadBufferManagerV1.class); + private static final int ONE_KB = 1024; + private static final int ONE_MB = ONE_KB * ONE_KB; + + private static final int NUM_BUFFERS = 16; + private static final int NUM_THREADS = 8; + private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + + private static int blockSize = 4 * ONE_MB; + private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; + private Thread[] threads = new Thread[NUM_THREADS]; + private byte[][] buffers; // array of byte[] buffers, to hold the data that is read + private Stack freeList = new Stack<>(); // indices in buffers[] array that are available + + private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet + private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads + private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading + private static ReadBufferManagerV1 bufferManager; // singleton, initialized in static initialization block + private static final ReentrantLock LOCK = new ReentrantLock(); + + static ReadBufferManagerV1 getBufferManager() { + if (bufferManager == null) { + LOCK.lock(); + try { + if (bufferManager == null) { + bufferManager = new ReadBufferManagerV1(); + bufferManager.init(); + } + } finally { + LOCK.unlock(); + } + } + return bufferManager; + } + + static void setReadBufferManagerConfigs(int readAheadBlockSize) { + if (bufferManager == null) { + LOGGER.debug( + "ReadBufferManagerV1 not initialized yet. Overriding readAheadBlockSize as {}", + readAheadBlockSize); + blockSize = readAheadBlockSize; + } + } + + private void init() { + buffers = new byte[NUM_BUFFERS][]; + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC + freeList.add(i); + } + for (int i = 0; i < NUM_THREADS; i++) { + Thread t = new Thread(new ReadBufferWorker(i)); + t.setDaemon(true); + threads[i] = t; + t.setName("ABFS-prefetch-" + i); + t.start(); + } + ReadBufferWorker.UNLEASH_WORKERS.countDown(); + } + + // hide instance constructor + private ReadBufferManagerV1() { + LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); + } + + + /* + * + * AbfsInputStream-facing methods + * + */ + + + /** + * {@link AbfsInputStream} calls this method to queue read-aheads. + * + * @param stream The {@link AbfsInputStream} for which to do the read-ahead + * @param requestedOffset The offset in the file which shoukd be read + * @param requestedLength The length to read + */ + void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, + TracingContext tracingContext) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", + stream.getPath(), requestedOffset, requestedLength); + } + ReadBuffer buffer; + synchronized (this) { + if (isAlreadyQueued(stream, requestedOffset)) { + return; // already queued, do not queue again + } + if (freeList.isEmpty() && !tryEvict()) { + return; // no buffers available, cannot queue anything + } + + buffer = new ReadBuffer(); + buffer.setStream(stream); + buffer.setOffset(requestedOffset); + buffer.setLength(0); + buffer.setRequestedLength(requestedLength); + buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); + buffer.setLatch(new CountDownLatch(1)); + buffer.setTracingContext(tracingContext); + + Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already + + buffer.setBuffer(buffers[bufferIndex]); + buffer.setBufferindex(bufferIndex); + readAheadQueue.add(buffer); + notifyAll(); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", + stream.getPath(), requestedOffset, buffer.getBufferindex()); + } + } + } + + + /** + * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a + * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading + * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead + * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because + * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own + * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). + * + * @param stream the file to read bytes for + * @param position the offset in the file to do a read for + * @param length the length to read + * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. + * @return the number of bytes read + */ + int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) + throws IOException { + // not synchronized, so have to be careful with locking + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("getBlock for file {} position {} thread {}", + stream.getPath(), position, Thread.currentThread().getName()); + } + + waitForProcess(stream, position); + + int bytesRead = 0; + synchronized (this) { + bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); + } + if (bytesRead > 0) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done read from Cache for {} position {} length {}", + stream.getPath(), position, bytesRead); + } + return bytesRead; + } + + // otherwise, just say we got nothing - calling thread can do its own read + return 0; + } + + /* + * + * Internal methods + * + */ + + private void waitForProcess(final AbfsInputStream stream, final long position) { + ReadBuffer readBuf; + synchronized (this) { + clearFromReadAheadQueue(stream, position); + readBuf = getFromList(inProgressList, stream, position); + } + if (readBuf != null) { // if in in-progress queue, then block for it + try { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", + stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); + } + readBuf.getLatch().await(); // blocking wait on the caller stream's thread + // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread + // is done processing it (in doneReading). There, the latch is set after removing the buffer from + // inProgressList. So this latch is safe to be outside the synchronized block. + // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock + // while waiting, so no one will be able to change any state. If this becomes more complex in the future, + // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("latch done for file {} buffer idx {} length {}", + stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); + } + } + } + + /** + * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. + * The objective is to find just one buffer - there is no advantage to evicting more than one. + * + * @return whether the eviction succeeeded - i.e., were we able to free up one buffer + */ + private synchronized boolean tryEvict() { + ReadBuffer nodeToEvict = null; + if (completedReadList.size() <= 0) { + return false; // there are no evict-able buffers + } + + long currentTimeInMs = currentTimeMillis(); + + // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) + for (ReadBuffer buf : completedReadList) { + if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { + nodeToEvict = buf; + break; + } + } + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) + for (ReadBuffer buf : completedReadList) { + if (buf.isAnyByteConsumed()) { + nodeToEvict = buf; + break; + } + } + + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try any old nodes that have not been consumed + // Failed read buffers (with buffer index=-1) that are older than + // thresholdAge should be cleaned up, but at the same time should not + // report successful eviction. + // Queue logic expects that a buffer is freed up for read ahead when + // eviction is successful, whereas a failed ReadBuffer would have released + // its buffer when its status was set to READ_FAILED. + long earliestBirthday = Long.MAX_VALUE; + ArrayList oldFailedBuffers = new ArrayList<>(); + for (ReadBuffer buf : completedReadList) { + if ((buf.getBufferindex() != -1) + && (buf.getTimeStamp() < earliestBirthday)) { + nodeToEvict = buf; + earliestBirthday = buf.getTimeStamp(); + } else if ((buf.getBufferindex() == -1) + && (currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) { + oldFailedBuffers.add(buf); + } + } + + for (ReadBuffer buf : oldFailedBuffers) { + evict(buf); + } + + if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) { + return evict(nodeToEvict); + } + + LOGGER.trace("No buffer eligible for eviction"); + // nothing can be evicted + return false; + } + + private boolean evict(final ReadBuffer buf) { + // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, + // avoid adding it to freeList. + if (buf.getBufferindex() != -1) { + freeList.push(buf.getBufferindex()); + } + + completedReadList.remove(buf); + buf.setTracingContext(null); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", + buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); + } + return true; + } + + private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) { + // returns true if any part of the buffer is already queued + return (isInList(readAheadQueue, stream, requestedOffset) + || isInList(inProgressList, stream, requestedOffset) + || isInList(completedReadList, stream, requestedOffset)); + } + + private boolean isInList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { + return (getFromList(list, stream, requestedOffset) != null); + } + + private ReadBuffer getFromList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { + for (ReadBuffer buffer : list) { + if (buffer.getStream() == stream) { + if (buffer.getStatus() == ReadBufferStatus.AVAILABLE + && requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getLength()) { + return buffer; + } else if (requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { + return buffer; + } + } + } + return null; + } + + /** + * Returns buffers that failed or passed from completed queue. + * @param stream + * @param requestedOffset + * @return + */ + private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { + for (ReadBuffer buffer : completedReadList) { + // Buffer is returned if the requestedOffset is at or above buffer's + // offset but less than buffer's length or the actual requestedLength + if ((buffer.getStream() == stream) + && (requestedOffset >= buffer.getOffset()) + && ((requestedOffset < buffer.getOffset() + buffer.getLength()) + || (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) { + return buffer; + } + } + + return null; + } + + private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { + ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); + if (buffer != null) { + readAheadQueue.remove(buffer); + notifyAll(); // lock is held in calling method + freeList.push(buffer.getBufferindex()); + } + } + + private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, + final byte[] buffer) throws IOException { + ReadBuffer buf = getBufferFromCompletedQueue(stream, position); + + if (buf == null) { + return 0; + } + + if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { + // To prevent new read requests to fail due to old read-ahead attempts, + // return exception only from buffers that failed within last thresholdAgeMilliseconds + if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) { + throw buf.getErrException(); + } else { + return 0; + } + } + + if ((buf.getStatus() != ReadBufferStatus.AVAILABLE) + || (position >= buf.getOffset() + buf.getLength())) { + return 0; + } + + int cursor = (int) (position - buf.getOffset()); + int availableLengthInBuffer = buf.getLength() - cursor; + int lengthToCopy = Math.min(length, availableLengthInBuffer); + System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); + if (cursor == 0) { + buf.setFirstByteConsumed(true); + } + if (cursor + lengthToCopy == buf.getLength()) { + buf.setLastByteConsumed(true); + } + buf.setAnyByteConsumed(true); + return lengthToCopy; + } + + /* + * + * ReadBufferWorker-thread-facing methods + * + */ + + /** + * ReadBufferWorker thread calls this to get the next buffer that it should work on. + * + * @return {@link ReadBuffer} + * @throws InterruptedException if thread is interrupted + */ + ReadBuffer getNextBlockToRead() throws InterruptedException { + ReadBuffer buffer = null; + synchronized (this) { + //buffer = readAheadQueue.take(); // blocking method + while (readAheadQueue.size() == 0) { + wait(); + } + buffer = readAheadQueue.remove(); + notifyAll(); + if (buffer == null) { + return null; // should never happen + } + buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); + inProgressList.add(buffer); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker picked file {} for offset {}", + buffer.getStream().getPath(), buffer.getOffset()); + } + return buffer; + } + + /** + * ReadBufferWorker thread calls this method to post completion. + * + * @param buffer the buffer whose read was completed + * @param result the {@link ReadBufferStatus} after the read operation in the worker thread + * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read + */ + void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", + buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); + } + synchronized (this) { + // If this buffer has already been purged during + // close of InputStream then we don't update the lists. + if (inProgressList.contains(buffer)) { + inProgressList.remove(buffer); + if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { + buffer.setStatus(ReadBufferStatus.AVAILABLE); + buffer.setLength(bytesActuallyRead); + } else { + freeList.push(buffer.getBufferindex()); + // buffer will be deleted as per the eviction policy. + } + // completed list also contains FAILED read buffers + // for sending exception message to clients. + buffer.setStatus(result); + buffer.setTimeStamp(currentTimeMillis()); + completedReadList.add(buffer); + } + } + + //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results + buffer.getLatch().countDown(); // wake up waiting threads (if any) + } + + /** + * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). + * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), + * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. + * Note: it is not monotonic across Sockets, and even within a CPU, its only the + * more recent parts which share a clock across all cores. + * + * @return current time in milliseconds + */ + private long currentTimeMillis() { + return System.nanoTime() / 1000 / 1000; + } + + @VisibleForTesting + int getThresholdAgeMilliseconds() { + return thresholdAgeMilliseconds; + } + + @VisibleForTesting + static void setThresholdAgeMilliseconds(int thresholdAgeMs) { + thresholdAgeMilliseconds = thresholdAgeMs; + } + + @VisibleForTesting + int getCompletedReadListSize() { + return completedReadList.size(); + } + + @VisibleForTesting + public synchronized List getCompletedReadListCopy() { + return new ArrayList<>(completedReadList); + } + + @VisibleForTesting + public synchronized List getFreeListCopy() { + return new ArrayList<>(freeList); + } + + @VisibleForTesting + public synchronized List getReadAheadQueueCopy() { + return new ArrayList<>(readAheadQueue); + } + + @VisibleForTesting + public synchronized List getInProgressCopiedList() { + return new ArrayList<>(inProgressList); + } + + @VisibleForTesting + void callTryEvict() { + tryEvict(); + } + + + /** + * Purging the buffers associated with an {@link AbfsInputStream} + * from {@link ReadBufferManagerV1} when stream is closed. + * @param stream input stream. + */ + public synchronized void purgeBuffersForStream(AbfsInputStream stream) { + LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); + readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); + purgeList(stream, completedReadList); + } + + /** + * Method to remove buffers associated with a {@link AbfsInputStream} + * when its close method is called. + * NOTE: This method is not threadsafe and must be called inside a + * synchronised block. See caller. + * @param stream associated input stream. + * @param list list of buffers like {@link this#completedReadList} + * or {@link this#inProgressList}. + */ + private void purgeList(AbfsInputStream stream, LinkedList list) { + for (Iterator it = list.iterator(); it.hasNext();) { + ReadBuffer readBuffer = it.next(); + if (readBuffer.getStream() == stream) { + it.remove(); + // As failed ReadBuffers (bufferIndex = -1) are already pushed to free + // list in doneReading method, we will skip adding those here again. + if (readBuffer.getBufferindex() != -1) { + freeList.push(readBuffer.getBufferindex()); + } + } + } + } + + /** + * Test method that can clean up the current state of readAhead buffers and + * the lists. Will also trigger a fresh init. + */ + @VisibleForTesting + void testResetReadBufferManager() { + synchronized (this) { + ArrayList completedBuffers = new ArrayList<>(); + for (ReadBuffer buf : completedReadList) { + if (buf != null) { + completedBuffers.add(buf); + } + } + + for (ReadBuffer buf : completedBuffers) { + evict(buf); + } + + readAheadQueue.clear(); + inProgressList.clear(); + completedReadList.clear(); + freeList.clear(); + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = null; + } + buffers = null; + resetBufferManager(); + } + } + + /** + * Reset buffer manager to null. + */ + @VisibleForTesting + static void resetBufferManager() { + bufferManager = null; + } + + /** + * Reset readAhead buffer to needed readAhead block size and + * thresholdAgeMilliseconds. + * @param readAheadBlockSize + * @param thresholdAgeMilliseconds + */ + @VisibleForTesting + void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds) { + setBlockSize(readAheadBlockSize); + setThresholdAgeMilliseconds(thresholdAgeMilliseconds); + testResetReadBufferManager(); + } + + @VisibleForTesting + static void setBlockSize(int readAheadBlockSize) { + blockSize = readAheadBlockSize; + } + + @VisibleForTesting + int getReadAheadBlockSize() { + return blockSize; + } + + /** + * Test method that can mimic no free buffers scenario and also add a ReadBuffer + * into completedReadList. This readBuffer will get picked up by TryEvict() + * next time a new queue request comes in. + * @param buf that needs to be added to completedReadlist + */ + @VisibleForTesting + void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) { + freeList.clear(); + completedReadList.add(buf); + } + + @VisibleForTesting + int getNumBuffers() { + return NUM_BUFFERS; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java new file mode 100644 index 0000000000000..c057994c523b0 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java @@ -0,0 +1,516 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Stack; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.classification.VisibleForTesting; + +/** + * The Read Buffer Manager for Rest AbfsClient. + */ +final class ReadBufferManagerV2 implements ReadBufferManager { + private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManagerV2.class); + private static final int ONE_KB = 1024; + private static final int ONE_MB = ONE_KB * ONE_KB; + + private static final int NUM_BUFFERS = 3; + private static final int NUM_THREADS = 3; + private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + + private int blockSize = 4 * ONE_MB; + private int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; + private Thread[] threads = new Thread[NUM_THREADS]; + private byte[][] buffers; // array of byte[] buffers, to hold the data that is read + private Stack freeList = new Stack<>(); // indices in buffers[] array that are available + + private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet + private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads + private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading + private final ReentrantLock LOCK = new ReentrantLock(); + + public void setReadBufferManagerConfigs(int readAheadBlockSize) { + blockSize = readAheadBlockSize; + } + + public void init() { + buffers = new byte[NUM_BUFFERS][]; + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC + freeList.add(i); + } + for (int i = 0; i < NUM_THREADS; i++) { + Thread t = new Thread(new ReadBufferWorker(i)); + t.setDaemon(true); + threads[i] = t; + t.setName("ABFS-prefetch-" + i); + t.start(); + } + ReadBufferWorker.UNLEASH_WORKERS.countDown(); + } + + /** + * {@link AbfsInputStream} calls this method to queue read-aheads. + * + * @param stream The {@link AbfsInputStream} for which to do the read-ahead + * @param requestedOffset The offset in the file which shoukd be read + * @param requestedLength The length to read + */ + public void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, + TracingContext tracingContext) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", + stream.getPath(), requestedOffset, requestedLength); + } + ReadBuffer buffer; + synchronized (this) { + if (isAlreadyQueued(stream, requestedOffset)) { + return; // already queued, do not queue again + } + if (freeList.isEmpty() && !tryEvict()) { + return; // no buffers available, cannot queue anything + } + + buffer = new ReadBuffer(); + buffer.setStream(stream); + buffer.setOffset(requestedOffset); + buffer.setLength(0); + buffer.setRequestedLength(requestedLength); + buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); + buffer.setLatch(new CountDownLatch(1)); + buffer.setTracingContext(tracingContext); + + Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already + + buffer.setBuffer(buffers[bufferIndex]); + buffer.setBufferindex(bufferIndex); + readAheadQueue.add(buffer); + notifyAll(); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", + stream.getPath(), requestedOffset, buffer.getBufferindex()); + } + } + } + + + /** + * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a + * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading + * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead + * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because + * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own + * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). + * + * @param stream the file to read bytes for + * @param position the offset in the file to do a read for + * @param length the length to read + * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. + * @return the number of bytes read + */ + int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) + throws IOException { + // not synchronized, so have to be careful with locking + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("getBlock for file {} position {} thread {}", + stream.getPath(), position, Thread.currentThread().getName()); + } + + waitForProcess(stream, position); + + int bytesRead = 0; + synchronized (this) { + bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); + } + if (bytesRead > 0) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done read from Cache for {} position {} length {}", + stream.getPath(), position, bytesRead); + } + return bytesRead; + } + + // otherwise, just say we got nothing - calling thread can do its own read + return 0; + } + + private void waitForProcess(final AbfsInputStream stream, final long position) { + ReadBuffer readBuf; + synchronized (this) { + clearFromReadAheadQueue(stream, position); + readBuf = getFromList(inProgressList, stream, position); + } + if (readBuf != null) { // if in in-progress queue, then block for it + try { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", + stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); + } + readBuf.getLatch().await(); // blocking wait on the caller stream's thread + // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread + // is done processing it (in doneReading). There, the latch is set after removing the buffer from + // inProgressList. So this latch is safe to be outside the synchronized block. + // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock + // while waiting, so no one will be able to change any state. If this becomes more complex in the future, + // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("latch done for file {} buffer idx {} length {}", + stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); + } + } + } + + /** + * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. + * The objective is to find just one buffer - there is no advantage to evicting more than one. + * + * @return whether the eviction succeeeded - i.e., were we able to free up one buffer + */ + private synchronized boolean tryEvict() { + ReadBuffer nodeToEvict = null; + if (completedReadList.size() <= 0) { + return false; // there are no evict-able buffers + } + + long currentTimeInMs = currentTimeMillis(); + + // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) + for (ReadBuffer buf : completedReadList) { + if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { + nodeToEvict = buf; + break; + } + } + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) + for (ReadBuffer buf : completedReadList) { + if (buf.isAnyByteConsumed()) { + nodeToEvict = buf; + break; + } + } + + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try any old nodes that have not been consumed + // Failed read buffers (with buffer index=-1) that are older than + // thresholdAge should be cleaned up, but at the same time should not + // report successful eviction. + // Queue logic expects that a buffer is freed up for read ahead when + // eviction is successful, whereas a failed ReadBuffer would have released + // its buffer when its status was set to READ_FAILED. + long earliestBirthday = Long.MAX_VALUE; + ArrayList oldFailedBuffers = new ArrayList<>(); + for (ReadBuffer buf : completedReadList) { + if ((buf.getBufferindex() != -1) + && (buf.getTimeStamp() < earliestBirthday)) { + nodeToEvict = buf; + earliestBirthday = buf.getTimeStamp(); + } else if ((buf.getBufferindex() == -1) + && (currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) { + oldFailedBuffers.add(buf); + } + } + + for (ReadBuffer buf : oldFailedBuffers) { + evict(buf); + } + + if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) { + return evict(nodeToEvict); + } + + LOGGER.trace("No buffer eligible for eviction"); + // nothing can be evicted + return false; + } + + private boolean evict(final ReadBuffer buf) { + // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, + // avoid adding it to freeList. + if (buf.getBufferindex() != -1) { + freeList.push(buf.getBufferindex()); + } + + completedReadList.remove(buf); + buf.setTracingContext(null); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", + buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); + } + return true; + } + + private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) { + // returns true if any part of the buffer is already queued + return (isInList(readAheadQueue, stream, requestedOffset) + || isInList(inProgressList, stream, requestedOffset) + || isInList(completedReadList, stream, requestedOffset)); + } + + private boolean isInList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { + return (getFromList(list, stream, requestedOffset) != null); + } + + private ReadBuffer getFromList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { + for (ReadBuffer buffer : list) { + if (buffer.getStream() == stream) { + if (buffer.getStatus() == ReadBufferStatus.AVAILABLE + && requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getLength()) { + return buffer; + } else if (requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { + return buffer; + } + } + } + return null; + } + + /** + * Returns buffers that failed or passed from completed queue. + * @param stream + * @param requestedOffset + * @return + */ + private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { + for (ReadBuffer buffer : completedReadList) { + // Buffer is returned if the requestedOffset is at or above buffer's + // offset but less than buffer's length or the actual requestedLength + if ((buffer.getStream() == stream) + && (requestedOffset >= buffer.getOffset()) + && ((requestedOffset < buffer.getOffset() + buffer.getLength()) + || (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) { + return buffer; + } + } + + return null; + } + + private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { + ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); + if (buffer != null) { + readAheadQueue.remove(buffer); + notifyAll(); // lock is held in calling method + freeList.push(buffer.getBufferindex()); + } + } + + private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, + final byte[] buffer) throws IOException { + ReadBuffer buf = getBufferFromCompletedQueue(stream, position); + + if (buf == null) { + return 0; + } + + if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { + // To prevent new read requests to fail due to old read-ahead attempts, + // return exception only from buffers that failed within last thresholdAgeMilliseconds + if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) { + throw buf.getErrException(); + } else { + return 0; + } + } + + if ((buf.getStatus() != ReadBufferStatus.AVAILABLE) + || (position >= buf.getOffset() + buf.getLength())) { + return 0; + } + + int cursor = (int) (position - buf.getOffset()); + int availableLengthInBuffer = buf.getLength() - cursor; + int lengthToCopy = Math.min(length, availableLengthInBuffer); + System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); + if (cursor == 0) { + buf.setFirstByteConsumed(true); + } + if (cursor + lengthToCopy == buf.getLength()) { + buf.setLastByteConsumed(true); + } + buf.setAnyByteConsumed(true); + return lengthToCopy; + } + + /** + * ReadBufferWorker thread calls this to get the next buffer that it should work on. + * + * @return {@link ReadBuffer} + * @throws InterruptedException if thread is interrupted + */ + ReadBuffer getNextBlockToRead() throws InterruptedException { + ReadBuffer buffer = null; + synchronized (this) { + //buffer = readAheadQueue.take(); // blocking method + while (readAheadQueue.size() == 0) { + wait(); + } + buffer = readAheadQueue.remove(); + notifyAll(); + if (buffer == null) { + return null; // should never happen + } + buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); + inProgressList.add(buffer); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker picked file {} for offset {}", + buffer.getStream().getPath(), buffer.getOffset()); + } + return buffer; + } + + /** + * ReadBufferWorker thread calls this method to post completion. + * + * @param buffer the buffer whose read was completed + * @param result the {@link ReadBufferStatus} after the read operation in the worker thread + * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read + */ + void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", + buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); + } + synchronized (this) { + // If this buffer has already been purged during + // close of InputStream then we don't update the lists. + if (inProgressList.contains(buffer)) { + inProgressList.remove(buffer); + if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { + buffer.setStatus(ReadBufferStatus.AVAILABLE); + buffer.setLength(bytesActuallyRead); + } else { + freeList.push(buffer.getBufferindex()); + // buffer will be deleted as per the eviction policy. + } + // completed list also contains FAILED read buffers + // for sending exception message to clients. + buffer.setStatus(result); + buffer.setTimeStamp(currentTimeMillis()); + completedReadList.add(buffer); + } + } + + //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results + buffer.getLatch().countDown(); // wake up waiting threads (if any) + } + + /** + * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). + * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), + * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. + * Note: it is not monotonic across Sockets, and even within a CPU, its only the + * more recent parts which share a clock across all cores. + * + * @return current time in milliseconds + */ + private long currentTimeMillis() { + return System.nanoTime() / 1000 / 1000; + } + + @VisibleForTesting + int getThresholdAgeMilliseconds() { + return thresholdAgeMilliseconds; + } + + + @VisibleForTesting + int getCompletedReadListSize() { + return completedReadList.size(); + } + + @VisibleForTesting + public synchronized List getCompletedReadListCopy() { + return new ArrayList<>(completedReadList); + } + + @VisibleForTesting + public synchronized List getFreeListCopy() { + return new ArrayList<>(freeList); + } + + @VisibleForTesting + public synchronized List getReadAheadQueueCopy() { + return new ArrayList<>(readAheadQueue); + } + + @VisibleForTesting + public synchronized List getInProgressCopiedList() { + return new ArrayList<>(inProgressList); + } + + /** + * Purging the buffers associated with an {@link AbfsInputStream} + * from {@link ReadBufferManagerV1} when stream is closed. + * @param stream input stream. + */ + public synchronized void purgeBuffersForStream(AbfsInputStream stream) { + LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); + readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); + purgeList(stream, completedReadList); + } + + /** + * Method to remove buffers associated with a {@link AbfsInputStream} + * when its close method is called. + * NOTE: This method is not threadsafe and must be called inside a + * synchronised block. See caller. + * @param stream associated input stream. + * @param list list of buffers like {@link this#completedReadList} + * or {@link this#inProgressList}. + */ + private void purgeList(AbfsInputStream stream, LinkedList list) { + for (Iterator it = list.iterator(); it.hasNext();) { + ReadBuffer readBuffer = it.next(); + if (readBuffer.getStream() == stream) { + it.remove(); + // As failed ReadBuffers (bufferIndex = -1) are already pushed to free + // list in doneReading method, we will skip adding those here again. + if (readBuffer.getBufferindex() != -1) { + freeList.push(readBuffer.getBufferindex()); + } + } + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index a30f06261ef6f..df1e51495262f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -51,7 +51,7 @@ public void run() { } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager(); ReadBuffer buffer; while (true) { try { @@ -73,7 +73,7 @@ public void run() { Math.min(buffer.getRequestedLength(), buffer.getBuffer().length), buffer.getTracingContext()); - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager + bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManagerV1 } catch (IOException ex) { buffer.setErrException(ex); bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV1.java similarity index 96% rename from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java rename to hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV1.java index a57430fa808cc..1b6852f3b3e9a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV1.java @@ -47,7 +47,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; import static org.apache.hadoop.test.LambdaTestUtils.eventually; -public class ITestReadBufferManager extends AbstractAbfsIntegrationTest { +public class ITestReadBufferManagerV1 extends AbstractAbfsIntegrationTest { /** * Time before the JUnit test times out for eventually() clauses @@ -62,12 +62,12 @@ public class ITestReadBufferManager extends AbstractAbfsIntegrationTest { */ public static final int PROBE_INTERVAL_MILLIS = 1_000; - public ITestReadBufferManager() throws Exception { + public ITestReadBufferManagerV1() throws Exception { } @Test public void testPurgeBufferManagerForParallelStreams() throws Exception { - describe("Testing purging of buffers from ReadBufferManager for " + describe("Testing purging of buffers from ReadBufferManagerV1 for " + "parallel input streams"); final int numBuffers = 16; final LinkedList freeList = new LinkedList<>(); @@ -99,7 +99,7 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { executorService.awaitTermination(1, TimeUnit.MINUTES); } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager(); // readahead queue is empty assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); // verify the in progress list eventually empties out. @@ -115,7 +115,7 @@ private void assertListEmpty(String listName, List list) { @Test public void testPurgeBufferManagerForSequentialStream() throws Exception { - describe("Testing purging of buffers in ReadBufferManager for " + describe("Testing purging of buffers in ReadBufferManagerV1 for " + "sequential input streams"); AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); final String fileName = methodName.getMethodName(); @@ -131,7 +131,7 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { } finally { IOUtils.closeStream(iStream1); } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager(); AbfsInputStream iStream2 = null; try { iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index e4ed9881ffa4f..4f55d81fb2ff3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -89,7 +89,7 @@ public class TestAbfsInputStream extends @Override public void teardown() throws Exception { super.teardown(); - ReadBufferManager.getBufferManager().testResetReadBufferManager(); + ReadBufferManagerV1.getBufferManager().testResetReadBufferManager(); } private AbfsRestOperation getMockRestOp() { @@ -164,12 +164,12 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, private void queueReadAheads(AbfsInputStream inputStream) { // Mimic AbfsInputStream readAhead queue requests - ReadBufferManager.getBufferManager() + ReadBufferManagerV1.getBufferManager() .queueReadAhead(inputStream, 0, ONE_KB, inputStream.getTracingContext()); - ReadBufferManager.getBufferManager() + ReadBufferManagerV1.getBufferManager() .queueReadAhead(inputStream, ONE_KB, ONE_KB, inputStream.getTracingContext()); - ReadBufferManager.getBufferManager() + ReadBufferManagerV1.getBufferManager() .queueReadAhead(inputStream, TWO_KB, TWO_KB, inputStream.getTracingContext()); } @@ -187,15 +187,16 @@ private void verifyReadCallCount(AbfsClient client, int count) private void checkEvictedStatus(AbfsInputStream inputStream, int position, boolean expectedToThrowException) throws Exception { // Sleep for the eviction threshold time - Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds() + 1000); + Thread.sleep( + ReadBufferManagerV1.getBufferManager().getThresholdAgeMilliseconds() + 1000); // Eviction is done only when AbfsInputStream tries to queue new items. // 1 tryEvict will remove 1 eligible item. To ensure that the current test buffer // will get evicted (considering there could be other tests running in parallel), // call tryEvict for the number of items that are there in completedReadList. - int numOfCompletedReadListItems = ReadBufferManager.getBufferManager().getCompletedReadListSize(); + int numOfCompletedReadListItems = ReadBufferManagerV1.getBufferManager().getCompletedReadListSize(); while (numOfCompletedReadListItems > 0) { - ReadBufferManager.getBufferManager().callTryEvict(); + ReadBufferManagerV1.getBufferManager().callTryEvict(); numOfCompletedReadListItems--; } @@ -210,7 +211,7 @@ private void checkEvictedStatus(AbfsInputStream inputStream, int position, boole public TestAbfsInputStream() throws Exception { super(); // Reduce thresholdAgeMilliseconds to 3 sec for the tests - ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD); + ReadBufferManagerV1.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD); } private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException { @@ -319,7 +320,7 @@ public void testOpenFileWithOptions() throws Exception { * This test expects AbfsInputStream to throw the exception that readAhead * thread received on read. The readAhead thread must be initiated from the * active read request itself. - * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * Also checks that the ReadBuffers are evicted as per the ReadBufferManagerV1 * threshold criteria. * @throws Exception */ @@ -364,7 +365,7 @@ public void testFailedReadAhead() throws Exception { public void testFailedReadAheadEviction() throws Exception { AbfsClient client = getMockAbfsClient(); AbfsRestOperation successOp = getMockRestOp(); - ReadBufferManager.setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD); + ReadBufferManagerV1.setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD); // Stub : // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() // Actual read request fails with the failure in readahead thread @@ -379,15 +380,15 @@ public void testFailedReadAheadEviction() throws Exception { // Add a failed buffer to completed queue and set to no free buffers to read ahead. ReadBuffer buff = new ReadBuffer(); buff.setStatus(ReadBufferStatus.READ_FAILED); - ReadBufferManager.getBufferManager().testMimicFullUseAndAddFailedBuffer(buff); + ReadBufferManagerV1.getBufferManager().testMimicFullUseAndAddFailedBuffer(buff); // if read failed buffer eviction is tagged as a valid eviction, it will lead to // wrong assumption of queue logic that a buffer is freed up and can lead to : // java.util.EmptyStackException // at java.util.Stack.peek(Stack.java:102) // at java.util.Stack.pop(Stack.java:84) - // at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.queueReadAhead - ReadBufferManager.getBufferManager().queueReadAhead(inputStream, 0, ONE_KB, + // at org.apache.hadoop.fs.azurebfs.services.ReadBufferManagerV1.queueReadAhead + ReadBufferManagerV1.getBufferManager().queueReadAhead(inputStream, 0, ONE_KB, getTestTracingContext(getFileSystem(), true)); } @@ -395,7 +396,7 @@ public void testFailedReadAheadEviction() throws Exception { * * The test expects AbfsInputStream to initiate a remote read request for * the request offset and length when previous read ahead on the offset had failed. - * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * Also checks that the ReadBuffers are evicted as per the ReadBufferManagerV1 * threshold criteria. * @throws Exception */ @@ -429,7 +430,7 @@ public void testOlderReadAheadFailure() throws Exception { verifyReadCallCount(client, 3); // Sleep for thresholdAgeMs so that the read ahead buffer qualifies for being old. - Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds()); + Thread.sleep(ReadBufferManagerV1.getBufferManager().getThresholdAgeMilliseconds()); // Second read request should retry the read (and not issue any new readaheads) inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB); @@ -474,7 +475,7 @@ public void testSuccessfulReadAhead() throws Exception { any(String.class), any(), any(TracingContext.class)); AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt"); - int beforeReadCompletedListSize = ReadBufferManager.getBufferManager().getCompletedReadListSize(); + int beforeReadCompletedListSize = ReadBufferManagerV1.getBufferManager().getCompletedReadListSize(); // First read request that triggers readAheads. inputStream.read(new byte[ONE_KB]); @@ -482,9 +483,9 @@ public void testSuccessfulReadAhead() throws Exception { // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); int newAdditionsToCompletedRead = - ReadBufferManager.getBufferManager().getCompletedReadListSize() + ReadBufferManagerV1.getBufferManager().getCompletedReadListSize() - beforeReadCompletedListSize; - // read buffer might be dumped if the ReadBufferManager getblock preceded + // read buffer might be dumped if the ReadBufferManagerV1 getblock preceded // the action of buffer being picked for reading from readaheadqueue, so that // inputstream can proceed with read and not be blocked on readahead thread // availability. So the count of buffers in completedReadQueue for the stream @@ -529,8 +530,8 @@ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { any(String.class), nullable(ContextEncryptionAdapter.class), any(TracingContext.class)); - final ReadBufferManager readBufferManager - = ReadBufferManager.getBufferManager(); + final ReadBufferManagerV1 readBufferManager + = ReadBufferManagerV1.getBufferManager(); final int readBufferTotal = readBufferManager.getNumBuffers(); final int expectedFreeListBufferCount = readBufferTotal @@ -574,7 +575,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { /** * This test expects ReadAheadManager to throw exception if the read ahead * thread had failed within the last thresholdAgeMilliseconds. - * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * Also checks that the ReadBuffers are evicted as per the ReadBufferManagerV1 * threshold criteria. * @throws Exception */ @@ -607,7 +608,7 @@ public void testReadAheadManagerForFailedReadAhead() throws Exception { // if readAhead failed for specific offset, getBlock should // throw exception from the ReadBuffer that failed within last thresholdAgeMilliseconds sec intercept(IOException.class, - () -> ReadBufferManager.getBufferManager().getBlock( + () -> ReadBufferManagerV1.getBufferManager().getBlock( inputStream, 0, ONE_KB, @@ -624,7 +625,7 @@ public void testReadAheadManagerForFailedReadAhead() throws Exception { /** * The test expects ReadAheadManager to return 0 receivedBytes when previous * read ahead on the offset had failed and not throw exception received then. - * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * Also checks that the ReadBuffers are evicted as per the ReadBufferManagerV1 * threshold criteria. * @throws Exception */ @@ -655,14 +656,14 @@ public void testReadAheadManagerForOlderReadAheadFailure() throws Exception { // AbfsInputStream Read would have waited for the read-ahead for the requested offset // as we are testing from ReadAheadManager directly, sleep for thresholdAgeMilliseconds so that // read buffer qualifies for to be an old buffer - Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds()); + Thread.sleep(ReadBufferManagerV1.getBufferManager().getThresholdAgeMilliseconds()); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); // getBlock from a new read request should return 0 if there is a failure // 30 sec before in read ahead buffer for respective offset. - int bytesRead = ReadBufferManager.getBufferManager().getBlock( + int bytesRead = ReadBufferManagerV1.getBufferManager().getBlock( inputStream, ONE_KB, ONE_KB, @@ -715,7 +716,7 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { verifyReadCallCount(client, 3); // getBlock for a new read should return the buffer read-ahead - int bytesRead = ReadBufferManager.getBufferManager().getBlock( + int bytesRead = ReadBufferManagerV1.getBufferManager().getBlock( inputStream, ONE_KB, ONE_KB, @@ -853,7 +854,7 @@ public AbfsInputStream testReadAheadConfigs(int readRequestSize, .describedAs("Unexpected AlwaysReadBufferSize settings") .isEqualTo(alwaysReadBufferSizeEnabled); - Assertions.assertThat(ReadBufferManager.getBufferManager().getReadAheadBlockSize()) + Assertions.assertThat(ReadBufferManagerV1.getBufferManager().getReadAheadBlockSize()) .describedAs("Unexpected readAhead block size") .isEqualTo(readAheadBlockSize); @@ -921,9 +922,9 @@ private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize, } private void resetReadBufferManager(int bufferSize, int threshold) { - ReadBufferManager.getBufferManager() + ReadBufferManagerV1.getBufferManager() .testResetReadBufferManager(bufferSize, threshold); - // Trigger GC as aggressive recreation of ReadBufferManager buffers + // Trigger GC as aggressive recreation of ReadBufferManagerV1 buffers // by successive tests can lead to OOM based on the dev VM/machine capacity. System.gc(); } From 86cab1a13a17c76699eb63b666dadec8e23b0be1 Mon Sep 17 00:00:00 2001 From: Anuj Modi Date: Tue, 7 Jan 2025 01:18:49 -0800 Subject: [PATCH 2/5] Making ReadBufferManagerV2 Configurable --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 14 ++++++++++++++ .../fs/azurebfs/AzureBlobFileSystemStore.java | 1 + .../fs/azurebfs/constants/ConfigurationKeys.java | 6 ++++++ .../constants/FileSystemConfigurations.java | 1 + .../fs/azurebfs/services/AbfsInputStream.java | 2 ++ .../azurebfs/services/AbfsInputStreamContext.java | 12 ++++++++++++ 6 files changed, 36 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 6a51f8d902038..3ef3db767fbfc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -377,6 +377,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ENABLE_READAHEAD) private boolean enabledReadAhead; + @BooleanConfigurationValidatorAnnotation( + ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2, + DefaultValue = DEFAULT_ENABLE_READAHEAD_V2) + private boolean isReadAheadV2Enabled; + @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS, MinValue = 0, DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS) @@ -1332,6 +1337,15 @@ void setReadAheadEnabled(final boolean enabledReadAhead) { this.enabledReadAhead = enabledReadAhead; } + public boolean isReadAheadV2Enabled() { + return this.isReadAheadV2Enabled; + } + + @VisibleForTesting + void setReadAheadV2Enabled(final boolean isReadAheadV2Enabled) { + this.isReadAheadV2Enabled = isReadAheadV2Enabled; + } + public int getReadAheadRange() { return this.readAheadRange; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 867edfd1438fe..276431b28495e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -962,6 +962,7 @@ AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize()) .withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth()) .withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends()) .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled()) + .isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled()) .withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely()) .withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead()) .withFooterReadBufferSize(footerReadBufferSize) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 3742361b48445..72f80dcdb0b9b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -262,6 +262,12 @@ public final class ConfigurationKeys { */ public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead"; + /** + * Enable or disable readaheadV2 buffer in AbfsInputStream. + * Value: {@value}. + */ + public static final String FS_AZURE_ENABLE_READAHEAD_V2 = "fs.azure.enable.readahead_v2"; + /** Setting this true will make the driver use it's own RemoteIterator implementation */ public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator"; /** Server side encryption key encoded in Base6format {@value}.*/ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 6072fcbb6fa08..1bb9a5820fc51 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -127,6 +127,7 @@ public final class FileSystemConfigurations { public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; public static final boolean DEFAULT_ENABLE_READAHEAD = true; + public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false; public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING; public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 0468acd66e020..1b7dc3a6b34b0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -79,6 +79,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final String eTag; // eTag of the path when InputStream are created private final boolean tolerateOobAppends; // whether tolerate Oob Appends private final boolean readAheadEnabled; // whether enable readAhead; + private final boolean isReadAheadV2Enabled; // whether enable readAheadV2; private final String inputStreamId; private final boolean alwaysReadBufferSize; /* @@ -150,6 +151,7 @@ public AbfsInputStream( this.eTag = eTag; this.readAheadRange = abfsInputStreamContext.getReadAheadRange(); this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled(); + this.isReadAheadV2Enabled = abfsInputStreamContext.isReadAheadV2Enabled(); this.alwaysReadBufferSize = abfsInputStreamContext.shouldReadBufferSizeAlways(); this.bufferedPreadDisabled = abfsInputStreamContext diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index fdcad5ac3a0d0..f6272492d6081 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -41,6 +41,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean isReadAheadEnabled = true; + private boolean isReadAheadV2Enabled; + private boolean alwaysReadBufferSize; private int readAheadBlockSize; @@ -91,6 +93,12 @@ public AbfsInputStreamContext isReadAheadEnabled( return this; } + public AbfsInputStreamContext isReadAheadV2Enabled( + final boolean isReadAheadV2Enabled) { + this.isReadAheadV2Enabled = isReadAheadV2Enabled; + return this; + } + public AbfsInputStreamContext withReadAheadRange( final int readAheadRange) { this.readAheadRange = readAheadRange; @@ -181,6 +189,10 @@ public boolean isReadAheadEnabled() { return isReadAheadEnabled; } + public boolean isReadAheadV2Enabled() { + return isReadAheadV2Enabled; + } + public int getReadAheadRange() { return readAheadRange; } From daa15528f368be0b9853aa890f155402d62ac091 Mon Sep 17 00:00:00 2001 From: Anuj Modi Date: Mon, 10 Mar 2025 21:22:52 -0700 Subject: [PATCH 3/5] Per Stream Buffer Manager --- .../constants/FileSystemConfigurations.java | 2 +- .../fs/azurebfs/services/AbfsInputStream.java | 21 ++++++++++++--- .../fs/azurebfs/services/ReadBuffer.java | 2 +- .../azurebfs/services/ReadBufferManager.java | 12 +++++++++ .../services/ReadBufferManagerV1.java | 22 +++++++++------- .../services/ReadBufferManagerV2.java | 26 ++++++++++++++----- .../azurebfs/services/ReadBufferWorker.java | 5 ++-- .../services/TestAbfsInputStream.java | 2 +- 8 files changed, 68 insertions(+), 24 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 1bb9a5820fc51..4a05fa6fffabe 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -127,7 +127,7 @@ public final class FileSystemConfigurations { public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; public static final boolean DEFAULT_ENABLE_READAHEAD = true; - public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false; + public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = true; public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING; public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 1b7dc3a6b34b0..136cd8ea616f0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -132,6 +132,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, /** ABFS instance to be held by the input stream to avoid GC close. */ private final BackReference fsBackRef; + private ReadBufferManager readBufferManager = null; + public AbfsInputStream( final AbfsClient client, final Statistics statistics, @@ -175,9 +177,15 @@ public AbfsInputStream( this.fsBackRef = abfsInputStreamContext.getFsBackRef(); contextEncryptionAdapter = abfsInputStreamContext.getEncryptionAdapter(); - // Propagate the config values to ReadBufferManager so that the first instance - // to initialize can set the readAheadBlockSize - ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize); + if (isReadAheadV2Enabled) { + readBufferManager = new ReadBufferManagerV2(readAheadBlockSize); + } else { + // Propagate the config values to ReadBufferManager so that the first instance + // to initialize can set the readAheadBlockSize + ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize); + readBufferManager = ReadBufferManagerV1.getBufferManager(); + } + if (streamStatistics != null) { ioStatistics = streamStatistics.getIOStatistics(); } @@ -512,7 +520,7 @@ private int readInternal(final long position, final byte[] b, final int offset, while (numReadAheads > 0 && nextOffset < contentLength) { LOG.debug("issuing read ahead requestedOffset = {} requested size {}", nextOffset, nextSize); - ReadBufferManagerV1.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize, + readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize, new TracingContext(readAheadTracingContext)); nextOffset = nextOffset + nextSize; numReadAheads--; @@ -846,6 +854,11 @@ public int getReadAheadQueueDepth() { return readAheadQueueDepth; } + @VisibleForTesting + public ReadBufferManager getReadBufferManager() { + return readBufferManager; + } + @VisibleForTesting public boolean shouldAlwaysReadBufferSize() { return alwaysReadBufferSize; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java index 9ce926d841c84..43d528a3b189a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -26,7 +26,7 @@ import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED; -class ReadBuffer { +public class ReadBuffer { private AbfsInputStream stream; private long offset; // offset within the file for the buffer diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index ba83520b63c2d..a15c662ad37c3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -1,4 +1,16 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + public interface ReadBufferManager { + void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, + TracingContext tracingContext); + int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) + throws IOException; + void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead); + ReadBuffer getNextBlockToRead() throws InterruptedException; + int getReadAheadBlockSize(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java index 15ddf48d17af3..24472131e5602 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -39,8 +39,7 @@ * The Read Buffer Manager for Rest AbfsClient. */ final class ReadBufferManagerV1 implements ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger( - ReadBufferManagerV1.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManagerV1.class); private static final int ONE_KB = 1024; private static final int ONE_MB = ONE_KB * ONE_KB; @@ -91,10 +90,10 @@ private void init() { freeList.add(i); } for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); + Thread t = new Thread(new ReadBufferWorker(i, this)); t.setDaemon(true); threads[i] = t; - t.setName("ABFS-prefetch-" + i); + t.setName("ABFS-prefetch-singleton-" + i); t.start(); } ReadBufferWorker.UNLEASH_WORKERS.countDown(); @@ -120,7 +119,8 @@ private ReadBufferManagerV1() { * @param requestedOffset The offset in the file which shoukd be read * @param requestedLength The length to read */ - void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, + @Override + public void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, TracingContext tracingContext) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", @@ -172,7 +172,8 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. * @return the number of bytes read */ - int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) + @Override + public int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) throws IOException { // not synchronized, so have to be careful with locking if (LOGGER.isTraceEnabled()) { @@ -425,7 +426,8 @@ private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long * @return {@link ReadBuffer} * @throws InterruptedException if thread is interrupted */ - ReadBuffer getNextBlockToRead() throws InterruptedException { + @Override + public ReadBuffer getNextBlockToRead() throws InterruptedException { ReadBuffer buffer = null; synchronized (this) { //buffer = readAheadQueue.take(); // blocking method @@ -454,7 +456,8 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { * @param result the {@link ReadBufferStatus} after the read operation in the worker thread * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { + @Override + public void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); @@ -628,7 +631,8 @@ static void setBlockSize(int readAheadBlockSize) { } @VisibleForTesting - int getReadAheadBlockSize() { + @Override + public int getReadAheadBlockSize() { return blockSize; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java index c057994c523b0..df2dad490e130 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java @@ -43,8 +43,8 @@ final class ReadBufferManagerV2 implements ReadBufferManager { private static final int ONE_KB = 1024; private static final int ONE_MB = ONE_KB * ONE_KB; - private static final int NUM_BUFFERS = 3; - private static final int NUM_THREADS = 3; + private static final int NUM_BUFFERS = 16; + private static final int NUM_THREADS = 8; private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold private int blockSize = 4 * ONE_MB; @@ -58,6 +58,11 @@ final class ReadBufferManagerV2 implements ReadBufferManager { private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading private final ReentrantLock LOCK = new ReentrantLock(); + public ReadBufferManagerV2(int readAheadBlockSize) { + setReadBufferManagerConfigs(readAheadBlockSize); + init(); + } + public void setReadBufferManagerConfigs(int readAheadBlockSize) { blockSize = readAheadBlockSize; } @@ -69,7 +74,7 @@ public void init() { freeList.add(i); } for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); + Thread t = new Thread(new ReadBufferWorker(i, this)); t.setDaemon(true); threads[i] = t; t.setName("ABFS-prefetch-" + i); @@ -85,6 +90,7 @@ public void init() { * @param requestedOffset The offset in the file which shoukd be read * @param requestedLength The length to read */ + @Override public void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, TracingContext tracingContext) { if (LOGGER.isTraceEnabled()) { @@ -137,7 +143,8 @@ public void queueReadAhead(final AbfsInputStream stream, final long requestedOff * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. * @return the number of bytes read */ - int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) + @Override + public int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) throws IOException { // not synchronized, so have to be careful with locking if (LOGGER.isTraceEnabled()) { @@ -378,7 +385,8 @@ private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long * @return {@link ReadBuffer} * @throws InterruptedException if thread is interrupted */ - ReadBuffer getNextBlockToRead() throws InterruptedException { + @Override + public ReadBuffer getNextBlockToRead() throws InterruptedException { ReadBuffer buffer = null; synchronized (this) { //buffer = readAheadQueue.take(); // blocking method @@ -407,7 +415,8 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { * @param result the {@link ReadBufferStatus} after the read operation in the worker thread * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { + @Override + public void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); @@ -513,4 +522,9 @@ private void purgeList(AbfsInputStream stream, LinkedList list) { } } } + + @Override + public int getReadAheadBlockSize() { + return blockSize; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index df1e51495262f..3e8a6334e78ad 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -28,9 +28,11 @@ class ReadBufferWorker implements Runnable { protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); private int id; + private final ReadBufferManager bufferManager; - ReadBufferWorker(final int id) { + ReadBufferWorker(final int id, final ReadBufferManager bufferManager) { this.id = id; + this.bufferManager = bufferManager; } /** @@ -51,7 +53,6 @@ public void run() { } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } - ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager(); ReadBuffer buffer; while (true) { try { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 4f55d81fb2ff3..170668efab8b5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -854,7 +854,7 @@ public AbfsInputStream testReadAheadConfigs(int readRequestSize, .describedAs("Unexpected AlwaysReadBufferSize settings") .isEqualTo(alwaysReadBufferSizeEnabled); - Assertions.assertThat(ReadBufferManagerV1.getBufferManager().getReadAheadBlockSize()) + Assertions.assertThat(inputStream.getReadBufferManager().getReadAheadBlockSize()) .describedAs("Unexpected readAhead block size") .isEqualTo(readAheadBlockSize); From 4c9d1f1f9da385c4eafe841e331f53600a170791 Mon Sep 17 00:00:00 2001 From: Anuj Modi Date: Tue, 11 Mar 2025 05:10:33 -0700 Subject: [PATCH 4/5] Fixing Failing tests --- .../apache/hadoop/fs/azurebfs/services/AbfsInputStream.java | 4 ++-- .../hadoop/fs/azurebfs/services/ReadBufferManager.java | 1 + .../hadoop/fs/azurebfs/services/ReadBufferManagerV1.java | 1 + .../hadoop/fs/azurebfs/services/ReadBufferManagerV2.java | 5 +++-- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 136cd8ea616f0..67a282a6e28a9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -529,7 +529,7 @@ private int readInternal(final long position, final byte[] b, final int offset, } // try reading from buffers first - receivedBytes = ReadBufferManagerV1.getBufferManager().getBlock(this, position, length, b); + receivedBytes = readBufferManager.getBlock(this, position, length, b); bytesFromReadAhead += receivedBytes; if (receivedBytes > 0) { incrementReadOps(); @@ -730,7 +730,7 @@ public boolean seekToNewSource(long l) throws IOException { public synchronized void close() throws IOException { LOG.debug("Closing {}", this); closed = true; - ReadBufferManagerV1.getBufferManager().purgeBuffersForStream(this); + readBufferManager.purgeBuffersForStream(this); buffer = null; // de-reference the buffer so it can be GC'ed sooner if (contextEncryptionAdapter != null) { contextEncryptionAdapter.destroy(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index a15c662ad37c3..e2f64b9d66988 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -13,4 +13,5 @@ int getBlock(final AbfsInputStream stream, final long position, final int length void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead); ReadBuffer getNextBlockToRead() throws InterruptedException; int getReadAheadBlockSize(); + void purgeBuffersForStream(AbfsInputStream stream); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java index 24472131e5602..676f1a324829a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -545,6 +545,7 @@ void callTryEvict() { * from {@link ReadBufferManagerV1} when stream is closed. * @param stream input stream. */ + @Override public synchronized void purgeBuffersForStream(AbfsInputStream stream) { LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java index df2dad490e130..e3a87f7e8a81e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java @@ -43,8 +43,8 @@ final class ReadBufferManagerV2 implements ReadBufferManager { private static final int ONE_KB = 1024; private static final int ONE_MB = ONE_KB * ONE_KB; - private static final int NUM_BUFFERS = 16; - private static final int NUM_THREADS = 8; + private static final int NUM_BUFFERS = 6; + private static final int NUM_THREADS = 3; private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold private int blockSize = 4 * ONE_MB; @@ -494,6 +494,7 @@ public synchronized List getInProgressCopiedList() { * from {@link ReadBufferManagerV1} when stream is closed. * @param stream input stream. */ + @Override public synchronized void purgeBuffersForStream(AbfsInputStream stream) { LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); From 79e539dfe299b1609f727c218ebe3697afb4b2aa Mon Sep 17 00:00:00 2001 From: Anuj Modi Date: Tue, 18 Mar 2025 08:53:59 -0700 Subject: [PATCH 5/5] Fixing OOM --- .../constants/FileSystemConfigurations.java | 2 +- .../azurebfs/services/ReadBufferManager.java | 1 + .../services/ReadBufferManagerV1.java | 4 ++++ .../services/ReadBufferManagerV2.java | 22 ++++++++++++++++++- 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 4a05fa6fffabe..1bb9a5820fc51 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -127,7 +127,7 @@ public final class FileSystemConfigurations { public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; public static final boolean DEFAULT_ENABLE_READAHEAD = true; - public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = true; + public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false; public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING; public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index e2f64b9d66988..2295a3c64ad4c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -14,4 +14,5 @@ int getBlock(final AbfsInputStream stream, final long position, final int length ReadBuffer getNextBlockToRead() throws InterruptedException; int getReadAheadBlockSize(); void purgeBuffersForStream(AbfsInputStream stream); + boolean hasShutDown(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java index 676f1a324829a..af6d572e55299 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -99,6 +99,10 @@ private void init() { ReadBufferWorker.UNLEASH_WORKERS.countDown(); } + public boolean hasShutDown() { + return false; + } + // hide instance constructor private ReadBufferManagerV1() { LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java index e3a87f7e8a81e..3d155e04a5cde 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java @@ -56,7 +56,7 @@ final class ReadBufferManagerV2 implements ReadBufferManager { private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading - private final ReentrantLock LOCK = new ReentrantLock(); + private boolean shutdown = false; // flag to signal threads to stop public ReadBufferManagerV2(int readAheadBlockSize) { setReadBufferManagerConfigs(readAheadBlockSize); @@ -83,6 +83,25 @@ public void init() { ReadBufferWorker.UNLEASH_WORKERS.countDown(); } + private void shutdown() { + shutdown = true; + for (Thread thread : threads) { + if (thread != null) { + thread.interrupt(); + } + } + for (int i = 0; i < buffers.length; i++) { + if (buffers[i] != null) { + buffers[i] = null; + } + } + } + + @Override + public boolean hasShutDown() { + return shutdown; + } + /** * {@link AbfsInputStream} calls this method to queue read-aheads. * @@ -499,6 +518,7 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) { LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); purgeList(stream, completedReadList); + shutdown(); } /**