From 281ea00c22f287fe4b02d169b4d2d3add304307d Mon Sep 17 00:00:00 2001 From: Scott Carey Date: Mon, 2 Dec 2019 12:33:31 -0800 Subject: [PATCH] Add Manual Compaction to HaloDB (#45) Add the 'forceCompaction' method to HaloDB, which takes a compactionThreshold value and will trigger compaction on all files that have stale data above the threshold. Additionally clean up how pauseCompaction() works so that it awaits pending compactions. --- .../com/oath/halodb/CompactionManager.java | 102 +++++++++--------- src/main/java/com/oath/halodb/HaloDB.java | 28 +++-- .../java/com/oath/halodb/HaloDBInternal.java | 24 +++-- .../oath/halodb/CompactionWithErrorsTest.java | 47 ++++---- .../com/oath/halodb/HaloDBCompactionTest.java | 52 +++++++-- src/test/java/com/oath/halodb/TestUtils.java | 20 ++-- 6 files changed, 158 insertions(+), 115 deletions(-) diff --git a/src/main/java/com/oath/halodb/CompactionManager.java b/src/main/java/com/oath/halodb/CompactionManager.java index 57ab5be..0b8aa92 100644 --- a/src/main/java/com/oath/halodb/CompactionManager.java +++ b/src/main/java/com/oath/halodb/CompactionManager.java @@ -5,18 +5,18 @@ package com.oath.halodb; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.RateLimiter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.nio.channels.FileChannel; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.RateLimiter; + class CompactionManager { private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class); @@ -41,7 +41,11 @@ class CompactionManager { private volatile long totalSizeOfRecordsCopied = 0; private volatile long compactionStartTime = System.currentTimeMillis(); - private static final int STOP_SIGNAL = -10101; + // These are purposely 'newed' up because we use reference equality to check the signals and the value does not matter + // signal for the compactor to top its thread after finishing already queued tasks + private static final Integer STOP_SIGNAL = new Integer(-1); + // signal for the compactor thread to stop its thread after finishing any active task but not taking more tasks; + private static final Integer WAKE_SIGNAL = new Integer(-1); private final ReentrantLock startStopLock = new ReentrantLock(); private volatile boolean stopInProgress = false; @@ -53,16 +57,21 @@ class CompactionManager { } // If a file is being compacted we wait for it complete before stopping. - boolean stopCompactionThread(boolean closeCurrentWriteFile) throws IOException { + boolean stopCompactionThread(boolean closeCurrentWriteFile, boolean awaitPending) throws IOException { stopInProgress = true; startStopLock.lock(); try { - isRunning = false; if (isCompactionRunning()) { - // We don't want to call interrupt on compaction thread as it - // may interrupt IO operations and leave files in an inconsistent state. - // instead we use -10101 as a stop signal. - compactionQueue.put(STOP_SIGNAL); + if (awaitPending) { + // we send a stop signal that will stop the thread after existing items in the queue complete + compactionQueue.put(STOP_SIGNAL); + } else { + // set the running flag to false, then send the wake signal. If the queue is empty it will immediately + // consume the signal to wake up the thread and stop. + // if the queue is not empty, then after the current task completes the 'isRunning' flag will stop it + isRunning = false; + compactionQueue.put(WAKE_SIGNAL); + } compactionThread.join(); if (closeCurrentWriteFile && currentWriteFile != null) { currentWriteFile.flushToDisk(); @@ -95,9 +104,14 @@ void startCompactionThread() { } } - void pauseCompactionThread() throws IOException { + /** + * Stop the compaction thread, blocking until it has stopped. + * If awaitPending is true, stops after all outstanding compaction tasks in the queue + * have completed. Otherwise, stops after the current task completes. + **/ + void pauseCompactionThread(boolean awaitPending) throws IOException { logger.info("Pausing compaction thread ..."); - stopCompactionThread(false); + stopCompactionThread(false, awaitPending); } void resumeCompaction() { @@ -109,7 +123,7 @@ int getCurrentWriteFileId() { return currentWriteFile != null ? currentWriteFile.getFileId() : -1; } - boolean submitFileForCompaction(int fileId) { + boolean submitFileForCompaction(Integer fileId) { return compactionQueue.offer(fileId); } @@ -178,7 +192,9 @@ private class CompactionThread extends Thread { startStopLock.lock(); try { compactionThread = null; - startCompactionThread(); + if (isRunning) { + startCompactionThread(); + } } finally { startStopLock.unlock(); } @@ -186,22 +202,32 @@ private class CompactionThread extends Thread { else { logger.info("Not restarting thread as the lock is held by stop compaction method."); } - }); } @Override public void run() { logger.info("Starting compaction thread ..."); - int fileToCompact = -1; - while (isRunning) { + Integer fileToCompact = null; try { - fileToCompact = compactionQueue.take(); - if (fileToCompact == STOP_SIGNAL) { + fileToCompact = compactionQueue.poll(1, TimeUnit.SECONDS); + if (fileToCompact == STOP_SIGNAL) { // reference, not value equality on purpose, these are sentinel objects logger.debug("Received a stop signal."); - // skip rest of the steps and check status of isRunning flag. - // while pausing/stopping compaction isRunning flag must be set to false. + // in this case, isRunning was not set to false already. The signal had to work its way through the + // queue behind the other tasks. So set 'isRunning' to false and break out of the loop to halt. + isRunning = false; + break; + } + if (fileToCompact == WAKE_SIGNAL || fileToCompact == null) { + // scenario: the queue has a long list of files to compact. We add this signal to the queue after + // setting 'isRunning' to false, so all we need to do is break out of the loop and it will shut down + // without processing more tasks. + // If we do break out of this loop with tasks in the queue, then this signal may still be in the queue + // behind those tasks. + // If the thread is resumed later, the signal will be processed after resuming the compactor. + // If we were to set 'isRunning' to false here, that would shut down the + // recently resumed thread when this signal arrived. continue; } logger.debug("Compacting {} ...", fileToCompact); @@ -209,8 +235,10 @@ public void run() { logger.debug("Completed compacting {} to {}", fileToCompact, getCurrentWriteFileId()); dbInternal.markFileAsCompacted(fileToCompact); dbInternal.deleteHaloDBFile(fileToCompact); - } - catch (Exception e) { + fileToCompact = null; + } catch (InterruptedException ie) { + break; + } catch (Exception e) { logger.error(String.format("Error while compacting file %d to %d", fileToCompact, getCurrentWriteFileId()), e); } } @@ -322,26 +350,4 @@ void forceRolloverCurrentWriteFile() throws IOException { dbInternal.getDbDirectory().syncMetaData(); currentWriteFileOffset = 0; } - - // Used only for tests. to be called only after all writes in the test have been performed. - @VisibleForTesting - synchronized boolean isCompactionComplete() { - - if (!isCompactionRunning()) - return true; - - if (compactionQueue.isEmpty()) { - try { - isRunning = false; - submitFileForCompaction(STOP_SIGNAL); - compactionThread.join(); - } catch (InterruptedException e) { - logger.error("Error in isCompactionComplete", e); - } - - return true; - } - - return false; - } } diff --git a/src/main/java/com/oath/halodb/HaloDB.java b/src/main/java/com/oath/halodb/HaloDB.java index c16d304..4b64f11 100644 --- a/src/main/java/com/oath/halodb/HaloDB.java +++ b/src/main/java/com/oath/halodb/HaloDB.java @@ -5,11 +5,10 @@ package com.oath.halodb; -import com.google.common.annotations.VisibleForTesting; - import java.io.File; import java.io.IOException; -import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; public final class HaloDB { @@ -80,14 +79,27 @@ public HaloDBIterator newIterator() throws HaloDBException { return new HaloDBIterator(dbInternal); } - public void pauseCompaction() throws HaloDBException { + /** + * Force a compaction on all data files that have more stale data than the provided threshold ratio. + * A compactionThreshold of 0 would force all files that have any stale data to compact, + * 0.1 would force those that have more than 10% space stale to compact. + **/ + public void forceCompaction(float compactionThreshold) { + dbInternal.forceCompaction(compactionThreshold); + } + + public void pauseCompaction(boolean awaitPending) throws HaloDBException { try { - dbInternal.pauseCompaction(); + dbInternal.pauseCompaction(awaitPending); } catch (IOException e) { throw new HaloDBException("Error while trying to pause compaction thread", e); } } + public void pauseCompaction() throws HaloDBException { + pauseCompaction(false); + } + public boolean snapshot() { return dbInternal.takeSnapshot(); } @@ -105,12 +117,6 @@ public void resumeCompaction() { } // methods used in tests. - - @VisibleForTesting - boolean isCompactionComplete() { - return dbInternal.isCompactionComplete(); - } - @VisibleForTesting boolean isTombstoneFilesMerging() { return dbInternal.isTombstoneFilesMerging(); diff --git a/src/main/java/com/oath/halodb/HaloDBInternal.java b/src/main/java/com/oath/halodb/HaloDBInternal.java index a558cf0..b15a795 100644 --- a/src/main/java/com/oath/halodb/HaloDBInternal.java +++ b/src/main/java/com/oath/halodb/HaloDBInternal.java @@ -51,7 +51,7 @@ class HaloDBInternal { private volatile Thread tombstoneMergeThread; - private Map readFileMap = new ConcurrentHashMap<>(); + private final Map readFileMap = new ConcurrentHashMap<>(); HaloDBOptions options; @@ -167,7 +167,7 @@ synchronized void close() throws IOException { isClosing = true; try { - if(!compactionManager.stopCompactionThread(true)) + if(!compactionManager.stopCompactionThread(true, false)) setIOErrorFlag(); } catch (IOException e) { logger.error("Error while stopping compaction thread. Setting IOError flag", e); @@ -314,7 +314,7 @@ synchronized boolean takeSnapshot() { try { final int currentWriteFileId; - compactionManager.pauseCompactionThread(); + compactionManager.pauseCompactionThread(false); // Only support one snapshot now // TODO: support multiple snapshots if needed @@ -417,8 +417,8 @@ void setIOErrorFlag() throws IOException { metaData.storeToFile(); } - void pauseCompaction() throws IOException { - compactionManager.pauseCompactionThread(); + void pauseCompaction(boolean awaitPending) throws IOException { + compactionManager.pauseCompactionThread(awaitPending); } void resumeCompaction() { @@ -481,6 +481,15 @@ private void markPreviousVersionAsStale(byte[] key, InMemoryIndexMetaData record addFileToCompactionQueueIfThresholdCrossed(recordMetaData.getFileId(), staleRecordSize); } + void forceCompaction(float compactionThreshold) { + staleDataPerFileMap.forEach((fileId, staleData) -> { + HaloDBFile file = readFileMap.get(fileId); + if (staleData > 0 && staleData >= file.getSize() * compactionThreshold) { + compactionManager.submitFileForCompaction(fileId); + } + }); + } + void addFileToCompactionQueueIfThresholdCrossed(int fileId, int staleRecordSize) { HaloDBFile file = readFileMap.get(fileId); if (file == null) @@ -955,11 +964,6 @@ private Map computeStaleDataMapForStats() { } // Used only in tests. - @VisibleForTesting - boolean isCompactionComplete() { - return compactionManager.isCompactionComplete(); - } - @VisibleForTesting boolean isTombstoneFilesMerging() { return isTombstoneFilesMerging; diff --git a/src/test/java/com/oath/halodb/CompactionWithErrorsTest.java b/src/test/java/com/oath/halodb/CompactionWithErrorsTest.java index 7390a80..64fb31f 100644 --- a/src/test/java/com/oath/halodb/CompactionWithErrorsTest.java +++ b/src/test/java/com/oath/halodb/CompactionWithErrorsTest.java @@ -5,20 +5,15 @@ package com.oath.halodb; -import com.google.common.util.concurrent.RateLimiter; +import java.io.IOException; +import java.util.List; import org.testng.Assert; import org.testng.annotations.Test; -import sun.nio.ch.FileChannelImpl; - -import java.io.IOException; -import java.nio.channels.WritableByteChannel; -import java.nio.file.Paths; -import java.util.List; +import com.google.common.util.concurrent.RateLimiter; import mockit.Expectations; -import mockit.Invocation; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; @@ -35,7 +30,7 @@ public void testCompactionWithException() throws HaloDBException, InterruptedExc @Mock public double acquire(int permits) { if (++callCount == 3) { - // throw an exception when copying the third record. + // throw an exception when copying the third record. throw new OutOfMemoryError("Throwing mock exception form compaction thread."); } return 10; @@ -56,7 +51,7 @@ public double acquire(int permits) { TestUtils.waitForCompactionToComplete(db); // An exception was thrown while copying a record in the compaction thread. - // Make sure that all records are still correct. + // Make sure that all records are still correct. Assert.assertEquals(db.size(), records.size()); for (Record r : records) { Assert.assertEquals(db.get(r.getKey()), r.getValue()); @@ -67,7 +62,7 @@ public double acquire(int permits) { // Make sure that everything is good after // we open the db again. Since compaction had failed - // there would be two copies of the same record in two different files. + // there would be two copies of the same record in two different files. Assert.assertEquals(db.size(), records.size()); for (Record r : records) { Assert.assertEquals(db.get(r.getKey()), r.getValue()); @@ -88,7 +83,7 @@ public void testRestartCompactionThreadAfterCrash(@Mocked CompactionManager comp @Mock public double acquire(int permits) { if (++callCount == 3 || callCount == 8) { - // throw exceptions twice, each time compaction thread should crash and restart. + // throw exceptions twice, each time compaction thread should crash and restart. throw new OutOfMemoryError("Throwing mock exception from compaction thread."); } return 10; @@ -106,6 +101,7 @@ public double acquire(int permits) { List records = insertAndUpdate(db, numberOfRecords); + Thread.sleep(200); TestUtils.waitForCompactionToComplete(db); // An exception was thrown while copying a record in the compaction thread. @@ -130,14 +126,14 @@ public double acquire(int permits) { // called when db.open() compactionManager.startCompactionThread(); - // compaction thread should have crashed twice and each time it should have been restarted. + // compaction thread should have crashed twice and each time it should have been restarted. compactionManager.startCompactionThread(); compactionManager.startCompactionThread(); // called after db.close() - compactionManager.stopCompactionThread(true); + compactionManager.stopCompactionThread(true, false); - // called when db.open() the second time. + // called when db.open() the second time. compactionManager.startCompactionThread(); }}; @@ -150,15 +146,6 @@ public double acquire(int permits) { @Test public void testCompactionThreadStopWithIOException() throws HaloDBException, InterruptedException, IOException { - // Throw an IOException while stopping compaction thread. - new MockUp() { - - @Mock - boolean stopCompactionThread(boolean flag) throws IOException { - throw new IOException("Throwing mock IOException while stopping compaction thread."); - - } - }; String directory = TestUtils.getTestDirectory("CompactionManagerTest", "testCompactionThreadStopWithIOException"); @@ -171,12 +158,22 @@ boolean stopCompactionThread(boolean flag) throws IOException { insertAndUpdate(db, numberOfRecords); TestUtils.waitForCompactionToComplete(db); + + // Throw an IOException while stopping compaction thread. + new MockUp() { + + @Mock + boolean stopCompactionThread(boolean flag, boolean otherFlag) throws IOException { + throw new IOException("Throwing mock IOException while stopping compaction thread."); + + } + }; db.close(); DBMetaData dbMetaData = new DBMetaData(dbDirectory); dbMetaData.loadFromFileIfExists(); - // Since there was an IOException while stopping compaction IOError flag must have been set. + // Since there was an IOException while stopping compaction IOError flag must have been set. Assert.assertTrue(dbMetaData.isIOError()); } diff --git a/src/test/java/com/oath/halodb/HaloDBCompactionTest.java b/src/test/java/com/oath/halodb/HaloDBCompactionTest.java index cee8325..48e068f 100644 --- a/src/test/java/com/oath/halodb/HaloDBCompactionTest.java +++ b/src/test/java/com/oath/halodb/HaloDBCompactionTest.java @@ -5,17 +5,17 @@ package com.oath.halodb; -import com.google.common.primitives.Longs; - -import org.testng.Assert; -import org.testng.annotations.Test; - import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.primitives.Longs; + public class HaloDBCompactionTest extends TestBase { private final int recordSize = 1024; @@ -145,14 +145,14 @@ public void testFilesWithStaleDataAddedToCompactionQueueDuringDBOpen(HaloDBOptio List records = TestUtils.insertRandomRecordsOfSize(db, 50, 1024-Record.Header.HEADER_SIZE); // Delete all records, which means that all data files would have crossed the - // stale data threshold. + // stale data threshold. for (Record r : records) { db.delete(r.getKey()); } db.close(); - // open the db withe compaction enabled. + // open the db withe compaction enabled. options.setCompactionDisabled(false); options.setMaxFileSize(10 * 1024); @@ -172,7 +172,7 @@ public void testFilesWithStaleDataAddedToCompactionQueueDuringDBOpen(HaloDBOptio db = getTestDBWithoutDeletingFiles(directory, options); - // insert 20 records into two files. + // insert 20 records into two files. records = TestUtils.insertRandomRecordsOfSize(db, 20, 1024-Record.Header.HEADER_SIZE); File[] dataFilesToDelete = FileUtils.listDataFiles(new File(directory)); @@ -261,6 +261,42 @@ public void testPauseAndResumeCompaction() throws HaloDBException, InterruptedEx dataFiles.forEach(f -> Assert.assertFalse(f.exists(), "data file " + f.getName() + " still exists")); } + @Test + public void testForceCompaction() throws HaloDBException, InterruptedException { + String directory = TestUtils.getTestDirectory("HaloDBCompactionTest", "testForceCompaction"); + + HaloDBOptions options = new HaloDBOptions(); + options.setCompactionThresholdPerFile(1.0); + + HaloDB db = getTestDB(directory, options); + + List records = TestUtils.insertRandomRecordsOfSize(db, 10, 1000); + + db.close(); + + db = getTestDBWithoutDeletingFiles(directory, options); + // nothing deleted, should not trigger compaction + db.forceCompaction(0f); + db.pauseCompaction(true); + Assert.assertEquals(db.stats().getNumberOfRecordsScanned(), 0); + db.resumeCompaction(); + db.delete(records.get(0).getKey()); + + // should not compact, only 1 in 10 records are deleted, but the size threshold is 50% + db.forceCompaction(0.5f); + db.pauseCompaction(true); + Assert.assertEquals(db.stats().getNumberOfRecordsScanned(), 0); + db.resumeCompaction(); + + db.forceCompaction(0.01f); + db.pauseCompaction(true); + Assert.assertEquals(db.stats().getNumberOfRecordsScanned(), 10); + Assert.assertEquals(db.stats().getNumberOfRecordsCopied(), 9); + Assert.assertEquals(db.stats().getNumberOfRecordsReplaced(), 9); + } + + + private Record[] insertAndUpdateRecords(int numberOfRecords, HaloDB db) throws HaloDBException { int valueSize = recordSize - Record.Header.HEADER_SIZE - 8; // 8 is the key size. diff --git a/src/test/java/com/oath/halodb/TestUtils.java b/src/test/java/com/oath/halodb/TestUtils.java index 3ade02c..4dad8fa 100644 --- a/src/test/java/com/oath/halodb/TestUtils.java +++ b/src/test/java/com/oath/halodb/TestUtils.java @@ -5,9 +5,6 @@ package com.oath.halodb; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -28,6 +25,9 @@ import java.util.Set; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class TestUtils { private static final Logger logger = LoggerFactory.getLogger(TestUtils.class); @@ -212,22 +212,16 @@ public static byte[] generateRandomByteArray() { /** * This method will work correctly only after all the writes to the db have been completed. + * @throws HaloDBException */ - static void waitForCompactionToComplete(HaloDB db) { - while (!db.isCompactionComplete()) { - try { - Thread.sleep(1_000); - } catch (InterruptedException e) { - logger.error("Thread interrupted while waiting for compaction to complete"); - throw new RuntimeException(e); - } - } + static void waitForCompactionToComplete(HaloDB db) throws HaloDBException { + db.pauseCompaction(true); } static void waitForTombstoneFileMergeComplete(HaloDB db) { while (db.isTombstoneFilesMerging()) { try { - Thread.sleep(1_000); + Thread.sleep(100); } catch (InterruptedException e) { logger.error("Thread interrupted while waiting for tombstone file merge to complete"); throw new RuntimeException(e);