From 04341de56e60a1a401cd0ab57ea0fc57a1e028f1 Mon Sep 17 00:00:00 2001 From: Colbert Guan Date: Mon, 3 Jun 2019 13:36:31 -0700 Subject: [PATCH] W-6164717: Make BF flushing period once per day, separate from clearing period (#469) * make BF flushing period once per day, separate from clearing period * address typo --- .../service/schema/AbstractSchemaService.java | 95 +++++++++++-------- .../schema/AbstractSchemaServiceTest.java | 17 ++++ 2 files changed, 73 insertions(+), 39 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index 5670c0e3d..f0686ad19 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -64,7 +65,8 @@ public abstract class AbstractSchemaService extends DefaultService implements Sc private String modifiedBloomFileName; private final boolean bloomFileWritingEnabled; private boolean modifiedBloomClearingEnabled; - private int modifiedBloomClearPeriodHours; + private int modifiedBloomClearingPeriodHours; + private int modifiedBloomFlushPeriodHours; protected AbstractSchemaService(SystemConfiguration config, MonitorService monitorService) { super(config); @@ -75,8 +77,10 @@ protected AbstractSchemaService(SystemConfiguration config, MonitorService monit Property.BLOOM_FILE_WRITING_ENABLED.getDefaultValue())); modifiedBloomClearingEnabled = Boolean.parseBoolean(config.getValue(Property.MODIFIED_BLOOM_CLEARING_ENABLED.getName(), Property.MODIFIED_BLOOM_CLEARING_ENABLED.getDefaultValue())); - modifiedBloomClearPeriodHours = Integer.parseInt(config.getValue(Property.MODIFIED_BLOOM_CLEARING_PERIOD_HOURS.getName(), + modifiedBloomClearingPeriodHours = Integer.parseInt(config.getValue(Property.MODIFIED_BLOOM_CLEARING_PERIOD_HOURS.getName(), Property.MODIFIED_BLOOM_CLEARING_PERIOD_HOURS.getDefaultValue())); + modifiedBloomFlushPeriodHours = Integer.parseInt(config.getValue(Property.MODIFIED_BLOOM_FLUSH_PERIOD_HOURS.getName(), + Property.MODIFIED_BLOOM_FLUSH_PERIOD_HOURS.getDefaultValue())); String bfStateBaseDir = config.getValue(Property.BF_STATE_BASE_DIR.getName(), Property.BF_STATE_BASE_DIR.getDefaultValue()); @@ -97,8 +101,7 @@ protected AbstractSchemaService(SystemConfiguration config, MonitorService monit _bloomFilterMonitorThread = new Thread(new BloomFilterMonitorThread(), "bloom-filter-monitor"); _bloomFilterMonitorThread.start(); - int bloomFilterFlushHourToStartAt = getBloomFilterFlushHourToStartAt(); - createScheduledExecutorService(bloomFilterFlushHourToStartAt); + createScheduledExecutorService(); } void clearBlooms() { @@ -206,8 +209,15 @@ public void dispose() { @Override public abstract List keywordSearch(KeywordQuery query); + protected int getNumSecondsUntilNthHourOfDay(int nthHour, Calendar fromCalendar) { + int hour = fromCalendar.get(Calendar.HOUR_OF_DAY); + int secondsPastHour = fromCalendar.get(Calendar.MINUTE) * 60; + int hoursUntil = hour < nthHour ? (nthHour - hour) : (nthHour + 24 - hour); + return hoursUntil * 60 * 60 - secondsPastHour; + } + protected int getNumSecondsUntilNthHourOfWeek(int nthHour, Calendar fromCalendar) { - _logger.info("Initialized bloom filter flushing out once a week, at {} days + {} hours after Sunday 12AM", nthHour / 24, nthHour % 24); + // Sunday == 1; Saturday == 7 int day = fromCalendar.get(Calendar.DAY_OF_WEEK) - 1; int hour = fromCalendar.get(Calendar.HOUR_OF_DAY); @@ -219,22 +229,6 @@ protected int getNumSecondsUntilNthHourOfWeek(int nthHour, Calendar fromCalendar return hoursUntil * 60 * 60 - secondsPastHour; } - /* - * Have a different flush start hour to prevent thundering herd problem. - */ - private int getBloomFilterFlushHourToStartAt() { - int bloomFilterFlushHourToStartAt = 0; - try { - String toHash = InetAddress.getLocalHost().getHostName() + config.getValue(config.ARGUS_INSTANCE_ID, "noid"); - HashFunction hf = Hashing.murmur3_128(); - bloomFilterFlushHourToStartAt = Math.abs(hf.newHasher().putString(toHash, Charset.defaultCharset()).hash().asInt() % modifiedBloomClearPeriodHours); - } catch (UnknownHostException e) { - _logger.warn("BloomFilter UnknownHostException", e); - } - _logger.info("BloomFilter flush hour to start at {}th hour of day", bloomFilterFlushHourToStartAt); - return bloomFilterFlushHourToStartAt; - } - private BloomFilter createOrReadBloomFilter(String filename, int expectedNumberInsertions, double errorRate) { File bfFile = new File(filename); if (bloomFileWritingEnabled && bfFile.exists()) { @@ -268,11 +262,28 @@ private void writeBloomsToFile() { } - private void createScheduledExecutorService(int targetHourToStartAt){ + private void createScheduledExecutorService() { + String toHash; + try { + toHash = InetAddress.getLocalHost().getHostName() + config.getValue(config.ARGUS_INSTANCE_ID, "noid"); + } catch (UnknownHostException ex) { + toHash = String.valueOf(new Random().nextDouble()); + } + HashFunction hf = Hashing.murmur3_128(); + int hourHash = Math.abs(hf.newHasher().putString(toHash, Charset.defaultCharset()).hash().asInt()); scheduledExecutorService = Executors.newScheduledThreadPool(1); - int initialDelayInSeconds = getNumSecondsUntilNthHourOfWeek(targetHourToStartAt, Calendar.getInstance()); - BloomFilterFlushThread bloomFilterFlushThread = new BloomFilterFlushThread(); - scheduledExecutorService.scheduleAtFixedRate(bloomFilterFlushThread, initialDelayInSeconds, modifiedBloomClearPeriodHours * 60 * 60, TimeUnit.SECONDS); + + int nthFlushHour = hourHash % modifiedBloomFlushPeriodHours; + _logger.info("Bloom filter will flush to disk at hour {} of every day", nthFlushHour); + int flushDelaySeconds = getNumSecondsUntilNthHourOfDay(nthFlushHour, Calendar.getInstance()); + BloomFilterFlushThread flushingThread = new BloomFilterFlushThread(); + scheduledExecutorService.scheduleAtFixedRate(flushingThread, flushDelaySeconds, modifiedBloomFlushPeriodHours * 60 * 60, TimeUnit.SECONDS); + + BloomFilterClearThread clearingThread = new BloomFilterClearThread(); + int nthClearHour = hourHash % modifiedBloomClearingPeriodHours; + _logger.info("Bloom filter will clear once a week, at {} days + {} hours after Sunday 12AM", nthClearHour / 24, nthClearHour % 24); + int clearDelaySeconds = getNumSecondsUntilNthHourOfWeek(nthClearHour, Calendar.getInstance()); + scheduledExecutorService.scheduleAtFixedRate(clearingThread, clearDelaySeconds, modifiedBloomClearingPeriodHours * 60 * 60, TimeUnit.SECONDS); } private void shutdownScheduledExecutorService(){ @@ -307,7 +318,8 @@ public enum Property { MODIFIED_BLOOM_EXPECTED_NUMBER_INSERTIONS("service.property.schema.bloomfilter.modified.expected.number.insertions", "40"), MODIFIED_BLOOM_ERROR_RATE("service.property.schema.bloomfilter.modified.error.rate", "0.00001"), MODIFIED_BLOOM_CLEARING_ENABLED("service.property.schema.bloomfilter.modified.clearing.enabled", "true"), - MODIFIED_BLOOM_CLEARING_PERIOD_HOURS("service.property.schema.bloomfilter.modified.clearing.period.hours", String.valueOf(7 * 24)); + MODIFIED_BLOOM_CLEARING_PERIOD_HOURS("service.property.schema.bloomfilter.modified.clearing.period.hours", String.valueOf(7 * 24)), + MODIFIED_BLOOM_FLUSH_PERIOD_HOURS("service.property.schema.bloomfilter.modified.flush.period.hours", "24"); private final String _name; @@ -379,29 +391,34 @@ private void _sleepForPollPeriod() { } /** - * Writes bloomFilter to disk then clears it by creating a new bloomFilter. + * Clears bloomfilters to allow for modified-timestamp updating */ - private class BloomFilterFlushThread implements Runnable { + private class BloomFilterClearThread implements Runnable { @Override public void run() { - try { - _flushBloomFilter(); - } catch (Exception ex) { - _logger.warn("Exception occurred while flushing bloom filter.", ex); - } - } - - private void _flushBloomFilter() { - _logger.info("Flushing out bloom filter entries"); - writeBloomsToFile(); modifiedBloomClearingEnabled = Boolean.valueOf(config.refreshAndGetValue( SystemConfiguration.Property.SCHEMA_SERVICE_PROPERTY_FILE, Property.MODIFIED_BLOOM_CLEARING_ENABLED.getName(), Property.MODIFIED_BLOOM_CLEARING_ENABLED.getDefaultValue())); _logger.info("Refreshed {} property and got {}.", Property.MODIFIED_BLOOM_CLEARING_ENABLED.getName(), modifiedBloomClearingEnabled); if (modifiedBloomClearingEnabled) { + _logger.info("Clearing modifiedBloom filter entries"); bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), modifiedBloomExpectedNumberInsertions, modifiedBloomErrorRate); } - /* Don't need explicit synchronization to prevent slowness majority of the time*/ + } + } + + /** + * Writes bloomFilter to disk to allow processes restarts to synchornize themselves + */ + private class BloomFilterFlushThread implements Runnable { + @Override + public void run() { + try { + _logger.info("Flushing bloom filter entries to disk"); + writeBloomsToFile(); + } catch (Exception ex) { + _logger.warn("Exception occurred while flushing bloom filter.", ex); + } } } } diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java index 0cafaa3ee..371b2607d 100644 --- a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java @@ -224,6 +224,23 @@ private void initCounters() { metricsCreatedCount = 0; } + @Test + public void testNumHoursUntilNextClearBloomFilter() { + Calendar calendar = Calendar.getInstance(); + int hour = calendar.get(Calendar.HOUR_OF_DAY); + // Will wait 24 hours before next flush if at same hour boundary + int secondsUntil = _esSchemaService.getNumSecondsUntilNthHourOfDay(hour, calendar); + assertTrue(secondsUntil >= 23 * 60 * 60 && secondsUntil <= 24 * 60 * 60); + + calendar.set(Calendar.HOUR_OF_DAY, Math.floorMod(hour - 2, 24)); + secondsUntil = _esSchemaService.getNumSecondsUntilNthHourOfDay(hour, calendar); + assertTrue(secondsUntil >= 1 * 60 * 60 && secondsUntil <= 2 * 60 * 60); + + calendar.set(Calendar.HOUR_OF_DAY, Math.floorMod(hour + 2, 24)); + secondsUntil = _esSchemaService.getNumSecondsUntilNthHourOfDay(hour, calendar); + assertTrue(secondsUntil >= 21 * 60 * 60 && secondsUntil < 22 * 60 * 60); + } + @Test public void testNumHoursUntilNextFlushBloomFilter() { // use Wednesday 6 AM this week as start date