Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
W-6164717: Make BF flushing period once per day, separate from cleari…
Browse files Browse the repository at this point in the history
…ng period (#469)

* make BF flushing period once per day, separate from clearing period

* address typo
  • Loading branch information
colbyguan authored and GitHub Enterprise committed Jun 3, 2019
1 parent 840f627 commit 04341de
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -64,7 +65,8 @@ public abstract class AbstractSchemaService extends DefaultService implements Sc
private String modifiedBloomFileName;
private final boolean bloomFileWritingEnabled;
private boolean modifiedBloomClearingEnabled;
private int modifiedBloomClearPeriodHours;
private int modifiedBloomClearingPeriodHours;
private int modifiedBloomFlushPeriodHours;

protected AbstractSchemaService(SystemConfiguration config, MonitorService monitorService) {
super(config);
Expand All @@ -75,8 +77,10 @@ protected AbstractSchemaService(SystemConfiguration config, MonitorService monit
Property.BLOOM_FILE_WRITING_ENABLED.getDefaultValue()));
modifiedBloomClearingEnabled = Boolean.parseBoolean(config.getValue(Property.MODIFIED_BLOOM_CLEARING_ENABLED.getName(),
Property.MODIFIED_BLOOM_CLEARING_ENABLED.getDefaultValue()));
modifiedBloomClearPeriodHours = Integer.parseInt(config.getValue(Property.MODIFIED_BLOOM_CLEARING_PERIOD_HOURS.getName(),
modifiedBloomClearingPeriodHours = Integer.parseInt(config.getValue(Property.MODIFIED_BLOOM_CLEARING_PERIOD_HOURS.getName(),
Property.MODIFIED_BLOOM_CLEARING_PERIOD_HOURS.getDefaultValue()));
modifiedBloomFlushPeriodHours = Integer.parseInt(config.getValue(Property.MODIFIED_BLOOM_FLUSH_PERIOD_HOURS.getName(),
Property.MODIFIED_BLOOM_FLUSH_PERIOD_HOURS.getDefaultValue()));

String bfStateBaseDir = config.getValue(Property.BF_STATE_BASE_DIR.getName(),
Property.BF_STATE_BASE_DIR.getDefaultValue());
Expand All @@ -97,8 +101,7 @@ protected AbstractSchemaService(SystemConfiguration config, MonitorService monit
_bloomFilterMonitorThread = new Thread(new BloomFilterMonitorThread(), "bloom-filter-monitor");
_bloomFilterMonitorThread.start();

int bloomFilterFlushHourToStartAt = getBloomFilterFlushHourToStartAt();
createScheduledExecutorService(bloomFilterFlushHourToStartAt);
createScheduledExecutorService();
}

void clearBlooms() {
Expand Down Expand Up @@ -206,8 +209,15 @@ public void dispose() {
@Override
public abstract List<MetricSchemaRecord> keywordSearch(KeywordQuery query);

protected int getNumSecondsUntilNthHourOfDay(int nthHour, Calendar fromCalendar) {
int hour = fromCalendar.get(Calendar.HOUR_OF_DAY);
int secondsPastHour = fromCalendar.get(Calendar.MINUTE) * 60;
int hoursUntil = hour < nthHour ? (nthHour - hour) : (nthHour + 24 - hour);
return hoursUntil * 60 * 60 - secondsPastHour;
}

protected int getNumSecondsUntilNthHourOfWeek(int nthHour, Calendar fromCalendar) {
_logger.info("Initialized bloom filter flushing out once a week, at {} days + {} hours after Sunday 12AM", nthHour / 24, nthHour % 24);

// Sunday == 1; Saturday == 7
int day = fromCalendar.get(Calendar.DAY_OF_WEEK) - 1;
int hour = fromCalendar.get(Calendar.HOUR_OF_DAY);
Expand All @@ -219,22 +229,6 @@ protected int getNumSecondsUntilNthHourOfWeek(int nthHour, Calendar fromCalendar
return hoursUntil * 60 * 60 - secondsPastHour;
}

/*
* Have a different flush start hour to prevent thundering herd problem.
*/
private int getBloomFilterFlushHourToStartAt() {
int bloomFilterFlushHourToStartAt = 0;
try {
String toHash = InetAddress.getLocalHost().getHostName() + config.getValue(config.ARGUS_INSTANCE_ID, "noid");
HashFunction hf = Hashing.murmur3_128();
bloomFilterFlushHourToStartAt = Math.abs(hf.newHasher().putString(toHash, Charset.defaultCharset()).hash().asInt() % modifiedBloomClearPeriodHours);
} catch (UnknownHostException e) {
_logger.warn("BloomFilter UnknownHostException", e);
}
_logger.info("BloomFilter flush hour to start at {}th hour of day", bloomFilterFlushHourToStartAt);
return bloomFilterFlushHourToStartAt;
}

private BloomFilter<CharSequence> createOrReadBloomFilter(String filename, int expectedNumberInsertions, double errorRate) {
File bfFile = new File(filename);
if (bloomFileWritingEnabled && bfFile.exists()) {
Expand Down Expand Up @@ -268,11 +262,28 @@ private void writeBloomsToFile() {
}


private void createScheduledExecutorService(int targetHourToStartAt){
private void createScheduledExecutorService() {
String toHash;
try {
toHash = InetAddress.getLocalHost().getHostName() + config.getValue(config.ARGUS_INSTANCE_ID, "noid");
} catch (UnknownHostException ex) {
toHash = String.valueOf(new Random().nextDouble());
}
HashFunction hf = Hashing.murmur3_128();
int hourHash = Math.abs(hf.newHasher().putString(toHash, Charset.defaultCharset()).hash().asInt());
scheduledExecutorService = Executors.newScheduledThreadPool(1);
int initialDelayInSeconds = getNumSecondsUntilNthHourOfWeek(targetHourToStartAt, Calendar.getInstance());
BloomFilterFlushThread bloomFilterFlushThread = new BloomFilterFlushThread();
scheduledExecutorService.scheduleAtFixedRate(bloomFilterFlushThread, initialDelayInSeconds, modifiedBloomClearPeriodHours * 60 * 60, TimeUnit.SECONDS);

int nthFlushHour = hourHash % modifiedBloomFlushPeriodHours;
_logger.info("Bloom filter will flush to disk at hour {} of every day", nthFlushHour);
int flushDelaySeconds = getNumSecondsUntilNthHourOfDay(nthFlushHour, Calendar.getInstance());
BloomFilterFlushThread flushingThread = new BloomFilterFlushThread();
scheduledExecutorService.scheduleAtFixedRate(flushingThread, flushDelaySeconds, modifiedBloomFlushPeriodHours * 60 * 60, TimeUnit.SECONDS);

BloomFilterClearThread clearingThread = new BloomFilterClearThread();
int nthClearHour = hourHash % modifiedBloomClearingPeriodHours;
_logger.info("Bloom filter will clear once a week, at {} days + {} hours after Sunday 12AM", nthClearHour / 24, nthClearHour % 24);
int clearDelaySeconds = getNumSecondsUntilNthHourOfWeek(nthClearHour, Calendar.getInstance());
scheduledExecutorService.scheduleAtFixedRate(clearingThread, clearDelaySeconds, modifiedBloomClearingPeriodHours * 60 * 60, TimeUnit.SECONDS);
}

private void shutdownScheduledExecutorService(){
Expand Down Expand Up @@ -307,7 +318,8 @@ public enum Property {
MODIFIED_BLOOM_EXPECTED_NUMBER_INSERTIONS("service.property.schema.bloomfilter.modified.expected.number.insertions", "40"),
MODIFIED_BLOOM_ERROR_RATE("service.property.schema.bloomfilter.modified.error.rate", "0.00001"),
MODIFIED_BLOOM_CLEARING_ENABLED("service.property.schema.bloomfilter.modified.clearing.enabled", "true"),
MODIFIED_BLOOM_CLEARING_PERIOD_HOURS("service.property.schema.bloomfilter.modified.clearing.period.hours", String.valueOf(7 * 24));
MODIFIED_BLOOM_CLEARING_PERIOD_HOURS("service.property.schema.bloomfilter.modified.clearing.period.hours", String.valueOf(7 * 24)),
MODIFIED_BLOOM_FLUSH_PERIOD_HOURS("service.property.schema.bloomfilter.modified.flush.period.hours", "24");


private final String _name;
Expand Down Expand Up @@ -379,29 +391,34 @@ private void _sleepForPollPeriod() {
}

/**
* Writes bloomFilter to disk then clears it by creating a new bloomFilter.
* Clears bloomfilters to allow for modified-timestamp updating
*/
private class BloomFilterFlushThread implements Runnable {
private class BloomFilterClearThread implements Runnable {
@Override
public void run() {
try {
_flushBloomFilter();
} catch (Exception ex) {
_logger.warn("Exception occurred while flushing bloom filter.", ex);
}
}

private void _flushBloomFilter() {
_logger.info("Flushing out bloom filter entries");
writeBloomsToFile();
modifiedBloomClearingEnabled = Boolean.valueOf(config.refreshAndGetValue(
SystemConfiguration.Property.SCHEMA_SERVICE_PROPERTY_FILE,
Property.MODIFIED_BLOOM_CLEARING_ENABLED.getName(), Property.MODIFIED_BLOOM_CLEARING_ENABLED.getDefaultValue()));
_logger.info("Refreshed {} property and got {}.", Property.MODIFIED_BLOOM_CLEARING_ENABLED.getName(), modifiedBloomClearingEnabled);
if (modifiedBloomClearingEnabled) {
_logger.info("Clearing modifiedBloom filter entries");
bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), modifiedBloomExpectedNumberInsertions, modifiedBloomErrorRate);
}
/* Don't need explicit synchronization to prevent slowness majority of the time*/
}
}

/**
* Writes bloomFilter to disk to allow processes restarts to synchornize themselves
*/
private class BloomFilterFlushThread implements Runnable {
@Override
public void run() {
try {
_logger.info("Flushing bloom filter entries to disk");
writeBloomsToFile();
} catch (Exception ex) {
_logger.warn("Exception occurred while flushing bloom filter.", ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,23 @@ private void initCounters() {
metricsCreatedCount = 0;
}

@Test
public void testNumHoursUntilNextClearBloomFilter() {
Calendar calendar = Calendar.getInstance();
int hour = calendar.get(Calendar.HOUR_OF_DAY);
// Will wait 24 hours before next flush if at same hour boundary
int secondsUntil = _esSchemaService.getNumSecondsUntilNthHourOfDay(hour, calendar);
assertTrue(secondsUntil >= 23 * 60 * 60 && secondsUntil <= 24 * 60 * 60);

calendar.set(Calendar.HOUR_OF_DAY, Math.floorMod(hour - 2, 24));
secondsUntil = _esSchemaService.getNumSecondsUntilNthHourOfDay(hour, calendar);
assertTrue(secondsUntil >= 1 * 60 * 60 && secondsUntil <= 2 * 60 * 60);

calendar.set(Calendar.HOUR_OF_DAY, Math.floorMod(hour + 2, 24));
secondsUntil = _esSchemaService.getNumSecondsUntilNthHourOfDay(hour, calendar);
assertTrue(secondsUntil >= 21 * 60 * 60 && secondsUntil < 22 * 60 * 60);
}

@Test
public void testNumHoursUntilNextFlushBloomFilter() {
// use Wednesday 6 AM this week as start date
Expand Down

0 comments on commit 04341de

Please sign in to comment.