From 77c676af73dc7ba1014d97fef7acd7b379e32bb5 Mon Sep 17 00:00:00 2001 From: Tigran Manasyan Date: Tue, 16 Jan 2024 19:32:55 +0400 Subject: [PATCH] [ADH-4027] Parametrize hardcoded attributes --- conf/smart-default.xml | 80 +++++++++++++++++++ .../org/smartdata/conf/SmartConfKeys.java | 40 ++++++++++ .../server/engine/StatesManager.java | 2 +- .../engine/cmdlet/agent/AgentMaster.java | 17 +++- .../engine/data/AccessEventFetcher.java | 27 ++----- .../hdfs/HdfsStatesUpdateService.java | 2 +- .../metric/fetcher/CachedListFetcher.java | 29 +++---- .../hdfs/metric/fetcher/NamespaceFetcher.java | 30 ++----- .../hdfs/scheduler/MoverScheduler.java | 17 ++-- .../hdfs/scheduler/SmallFileScheduler.java | 12 ++- .../metric/fetcher/TestCachedListFetcher.java | 4 +- .../metric/fetcher/TestNamespaceFetcher.java | 15 ++-- .../dao/AccessCountTableManager.java | 53 +++++++----- 13 files changed, 226 insertions(+), 102 deletions(-) diff --git a/conf/smart-default.xml b/conf/smart-default.xml index fc74671e76e..a70401a960f 100644 --- a/conf/smart-default.xml +++ b/conf/smart-default.xml @@ -391,4 +391,84 @@ Comma-separated list of regex templates of internal files to be completely ignored by SSM. + + + smart.access.count.day.tables.num + 30 + + The max number of access count per day tables in the Metastore. + + + + + smart.access.count.hour.tables.num + 48 + + The max number of access count per hour tables in the Metastore. + + + + + smart.access.count.minute.tables.num + 120 + + The max number of access count per minute tables in the Metastore. + + + + + smart.access.count.second.tables.num + 30 + + The max number of access count per second tables in the Metastore. + + + + + smart.access.event.fetch.interval.ms + 1000 + + The interval in milliseconds between access event fetches. + + + + + smart.cached.file.fetch.interval.ms + 5000 + + The interval in milliseconds between cached files fetches from HDFS. + + + + + smart.namespace.fetch.interval.ms + 1 + + The interval in milliseconds between namespace fetches from HDFS. + + + + + smart.mover.scheduler.storage.report.fetch.interval.ms + 120000 + + The interval in milliseconds between storage report fetches from HDFS DataNode in mover scheduler. + + + + + smart.metastore.small-file.insert.batch.size + 200 + + The max size of small file insert batch to the Metastore. + + + + + smart.agent.master.ask.timeout.ms + 5000 + + The max time in milliseconds to wait an answer from the SmartAgent master actor during action submission. + + diff --git a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java index ad721d898cf..9caac0aa7c6 100644 --- a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java +++ b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java @@ -90,6 +90,34 @@ public class SmartConfKeys { "smart.metastore.mysql.legacy.enabled"; public static final boolean SMART_METASTORE_LEGACY_MYSQL_SUPPORT_DEFAULT = false; + public static final String SMART_NUM_DAY_TABLES_TO_KEEP_KEY = + "smart.access.count.day.tables.num"; + public static final int SMART_NUM_DAY_TABLES_TO_KEEP_DEFAULT = 30; + + public static final String SMART_NUM_HOUR_TABLES_TO_KEEP_KEY = + "smart.access.count.hour.tables.num"; + public static final int SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT = 48; + + public static final String SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY = + "smart.access.count.minute.tables.num"; + public static final int SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT = 120; + + public static final String SMART_NUM_SECOND_TABLES_TO_KEEP_KEY = + "smart.access.count.second.tables.num"; + public static final int SMART_NUM_SECOND_TABLES_TO_KEEP_DEFAULT = 30; + + public static final String SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_KEY = + "smart.access.event.fetch.interval.ms"; + public static final long SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_DEFAULT = 1000L; + + public static final String SMART_CACHED_FILE_FETCH_INTERVAL_MS_KEY = + "smart.cached.file.fetch.interval.ms"; + public static final long SMART_CACHED_FILE_FETCH_INTERVAL_MS_DEFAULT = 5 * 1000L; + + public static final String SMART_NAMESPACE_FETCH_INTERVAL_MS_KEY = + "smart.namespace.fetch.interval.ms"; + public static final long SMART_NAMESPACE_FETCH_INTERVAL_MS_DEFAULT = 1L; + // StatesManager // RuleManager @@ -140,6 +168,14 @@ public class SmartConfKeys { public static final int SMART_FILE_DIFF_MAX_NUM_RECORDS_DEFAULT = 10000; + public static final String SMART_MOVER_SCHEDULER_REPORT_FETCH_INTERVAL_MS_KEY = + "smart.mover.scheduler.storage.report.fetch.interval.ms"; + public static final long SMART_MOVER_SCHEDULER_REPORT_FETCH_INTERVAL_MS_DEFAULT = 2 * 60 * 1000; + + public static final String SMART_SMALL_FILE_METASTORE_INSERT_BATCH_SIZE_KEY = + "smart.metastore.small-file.insert.batch.size"; + public static final int SMART_SMALL_FILE_METASTORE_INSERT_BATCH_SIZE_DEFAULT = 200; + // Dispatcher public static final String SMART_CMDLET_DISPATCHER_LOG_DISP_RESULT_KEY = "smart.cmdlet.dispatcher.log.disp.result"; @@ -167,6 +203,10 @@ public class SmartConfKeys { public static final String SMART_AGENT_PORT_KEY = "smart.agent.port"; public static final int SMART_AGENT_PORT_DEFAULT = 7048; + public static final String SMART_AGENT_MASTER_ASK_TIMEOUT_MS_KEY = + "smart.agent.master.ask.timeout.ms"; + public static final long SMART_AGENT_MASTER_ASK_TIMEOUT_MS_DEFAULT = 5000L; + /** Do NOT configure the following two options manually. They are set by the boot scripts. **/ public static final String SMART_AGENT_MASTER_ADDRESS_KEY = "smart.agent.master.address"; public static final String SMART_AGENT_ADDRESS_KEY = "smart.agent.address"; diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/StatesManager.java b/smart-engine/src/main/java/org/smartdata/server/engine/StatesManager.java index 3b280de3af4..d53bbe2c494 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/StatesManager.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/StatesManager.java @@ -75,7 +75,7 @@ public void init() throws IOException { LOG.info("Initializing ..."); this.executorService = Executors.newScheduledThreadPool(4); this.accessCountTableManager = new AccessCountTableManager( - serverContext.getMetaStore(), executorService); + serverContext.getMetaStore(), executorService, serverContext.getConf()); this.fileAccessEventSource = MetricsFactory.createAccessEventSource(serverContext.getConf()); this.accessEventFetcher = new AccessEventFetcher( diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/agent/AgentMaster.java b/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/agent/AgentMaster.java index 08294fed593..d932c6ac9e6 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/agent/AgentMaster.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/agent/AgentMaster.java @@ -53,11 +53,13 @@ import scala.concurrent.Future; import scala.concurrent.duration.Duration; +import static org.smartdata.conf.SmartConfKeys.SMART_AGENT_MASTER_ASK_TIMEOUT_MS_DEFAULT; +import static org.smartdata.conf.SmartConfKeys.SMART_AGENT_MASTER_ASK_TIMEOUT_MS_KEY; + public class AgentMaster { private static final Logger LOG = LoggerFactory.getLogger(AgentMaster.class); - public static final Timeout TIMEOUT = - new Timeout(Duration.create(5, TimeUnit.SECONDS)); + public Timeout masterAskTimeout; private ActorSystem system; private ActorRef master; @@ -71,6 +73,13 @@ private AgentMaster(SmartConf conf) throws IOException { if (addresses == null) { throw new IOException("AgentMaster address not configured!"); } + + long masterAskTimeoutMs = conf.getLong( + SMART_AGENT_MASTER_ASK_TIMEOUT_MS_KEY, + SMART_AGENT_MASTER_ASK_TIMEOUT_MS_DEFAULT + ); + masterAskTimeout = new Timeout(Duration.create(masterAskTimeoutMs, TimeUnit.MILLISECONDS)); + String address = addresses[0]; LOG.info("Agent master: " + address); Config config = AgentUtils.overrideRemoteAddress( @@ -159,8 +168,8 @@ ActorRef getMasterActor() { } Object askMaster(Object message) throws Exception { - Future answer = Patterns.ask(master, message, TIMEOUT); - return Await.result(answer, TIMEOUT.duration()); + Future answer = Patterns.ask(master, message, masterAskTimeout); + return Await.result(answer, masterAskTimeout.duration()); } class ActorSystemLauncher extends Thread { diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/data/AccessEventFetcher.java b/smart-engine/src/main/java/org/smartdata/server/engine/data/AccessEventFetcher.java index 31e2b5afdf5..1e56e3fc651 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/data/AccessEventFetcher.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/data/AccessEventFetcher.java @@ -26,15 +26,16 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import static org.smartdata.conf.SmartConfKeys.SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_DEFAULT; +import static org.smartdata.conf.SmartConfKeys.SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_KEY; + public class AccessEventFetcher { static final Logger LOG = LoggerFactory.getLogger(AccessEventFetcher.class); - private static final Long DEFAULT_INTERVAL = 1 * 1000L; private final ScheduledExecutorService scheduledExecutorService; private final Long fetchInterval; private ScheduledFuture scheduledFuture; @@ -45,24 +46,10 @@ public AccessEventFetcher( AccessCountTableManager manager, ScheduledExecutorService service, FileAccessEventCollector collector) { - this(DEFAULT_INTERVAL, conf, manager, service, collector); - } - - public AccessEventFetcher( - Long fetchInterval, - Configuration conf, - AccessCountTableManager manager, - FileAccessEventCollector collector) { - this(fetchInterval, conf, manager, Executors.newSingleThreadScheduledExecutor(), collector); - } - - public AccessEventFetcher( - Long fetchInterval, - Configuration conf, - AccessCountTableManager manager, - ScheduledExecutorService service, - FileAccessEventCollector collector) { - this.fetchInterval = fetchInterval; + this.fetchInterval = conf.getLong( + SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_KEY, + SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_DEFAULT + ); this.fetchTask = new FetchTask(conf, manager, collector); this.scheduledExecutorService = service; } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HdfsStatesUpdateService.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HdfsStatesUpdateService.java index 638429c715d..1c8b3d852f0 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HdfsStatesUpdateService.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HdfsStatesUpdateService.java @@ -91,7 +91,7 @@ public void init() throws IOException { client = HadoopUtil.getDFSClient(nnUri, conf); checkAndCreateIdFiles(nnUri, context.getConf()); this.executorService = Executors.newScheduledThreadPool(4); - this.cachedListFetcher = new CachedListFetcher(client, metaStore); + this.cachedListFetcher = new CachedListFetcher(conf, client, metaStore); this.inotifyEventFetcher = new InotifyEventFetcher(client, metaStore, executorService, new FetchFinishedCallBack(), context.getConf()); this.dataNodeInfoFetcher = new DataNodeInfoFetcher( diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/CachedListFetcher.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/CachedListFetcher.java index b44aead1642..63aa49baf89 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/CachedListFetcher.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/CachedListFetcher.java @@ -17,6 +17,7 @@ */ package org.smartdata.hdfs.metric.fetcher; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -42,9 +43,11 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import static org.smartdata.conf.SmartConfKeys.SMART_CACHED_FILE_FETCH_INTERVAL_MS_DEFAULT; +import static org.smartdata.conf.SmartConfKeys.SMART_CACHED_FILE_FETCH_INTERVAL_MS_KEY; + public class CachedListFetcher { - private static final Long DEFAULT_INTERVAL = 5 * 1000L; private final ScheduledExecutorService scheduledExecutorService; private final Long fetchInterval; private FetchTask fetchTask; @@ -55,32 +58,22 @@ public class CachedListFetcher { LoggerFactory.getLogger(CachedListFetcher.class); public CachedListFetcher( - Long fetchInterval, + Configuration configuration, DFSClient dfsClient, MetaStore metaStore, ScheduledExecutorService service) { - this.fetchInterval = fetchInterval; + this.fetchInterval = configuration.getLong( + SMART_CACHED_FILE_FETCH_INTERVAL_MS_KEY, + SMART_CACHED_FILE_FETCH_INTERVAL_MS_DEFAULT + ); this.metaStore = metaStore; this.fetchTask = new FetchTask(dfsClient, metaStore); this.scheduledExecutorService = service; } public CachedListFetcher( - Long fetchInterval, - DFSClient dfsClient, MetaStore metaStore) { - this(fetchInterval, dfsClient, metaStore, - Executors.newSingleThreadScheduledExecutor()); - } - - public CachedListFetcher( + Configuration configuration, DFSClient dfsClient, MetaStore metaStore) { - this(DEFAULT_INTERVAL, dfsClient, metaStore, - Executors.newSingleThreadScheduledExecutor()); - } - - public CachedListFetcher( - DFSClient dfsClient, MetaStore metaStore, - ScheduledExecutorService service) { - this(DEFAULT_INTERVAL, dfsClient, metaStore, service); + this(configuration, dfsClient, metaStore, Executors.newSingleThreadScheduledExecutor()); } public void start() { diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java index 53fbc9cb13e..158eb08dfbc 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java @@ -45,8 +45,6 @@ import static org.smartdata.hdfs.CompatibilityHelperLoader.getHelper; public class NamespaceFetcher { - private static final Long DEFAULT_INTERVAL = 1L; - private final ScheduledExecutorService scheduledExecutorService; private final long fetchInterval; private ScheduledFuture[] fetchTaskFutures; @@ -57,28 +55,13 @@ public class NamespaceFetcher { private MetaStore metaStore; private SmartConf conf; - public static final Logger LOG = - LoggerFactory.getLogger(NamespaceFetcher.class); - - public NamespaceFetcher(DFSClient client, MetaStore metaStore, ScheduledExecutorService service) { - this(client, metaStore, DEFAULT_INTERVAL, service, new SmartConf()); - } - - public NamespaceFetcher(DFSClient client, MetaStore metaStore, ScheduledExecutorService service, - SmartConf conf) { - this(client, metaStore, DEFAULT_INTERVAL, service, conf); - } - - public NamespaceFetcher(DFSClient client, MetaStore metaStore, long fetchInterval) { - this(client, metaStore, fetchInterval, null, new SmartConf()); - } + public static final Logger LOG = LoggerFactory.getLogger(NamespaceFetcher.class); - public NamespaceFetcher(DFSClient client, MetaStore metaStore, long fetchInterval, - SmartConf conf) { - this(client, metaStore, fetchInterval, null, conf); + public NamespaceFetcher(DFSClient client, MetaStore metaStore, SmartConf conf) { + this(client, metaStore, null, conf); } - public NamespaceFetcher(DFSClient client, MetaStore metaStore, long fetchInterval, + public NamespaceFetcher(DFSClient client, MetaStore metaStore, ScheduledExecutorService service, SmartConf conf) { int numProducers = conf.getInt(SmartConfKeys.SMART_NAMESPACE_FETCHER_PRODUCERS_NUM_KEY, SmartConfKeys.SMART_NAMESPACE_FETCHER_PRODUCERS_NUM_DEFAULT); @@ -96,7 +79,10 @@ public NamespaceFetcher(DFSClient client, MetaStore metaStore, long fetchInterva for (int i = 0; i < numConsumers; i++) { consumers[i] = new FileStatusIngester(metaStore); } - this.fetchInterval = fetchInterval; + this.fetchInterval = conf.getLong( + SmartConfKeys.SMART_NAMESPACE_FETCH_INTERVAL_MS_KEY, + SmartConfKeys.SMART_NAMESPACE_FETCH_INTERVAL_MS_DEFAULT + ); if (service != null) { this.scheduledExecutorService = service; } else { diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/MoverScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/MoverScheduler.java index 87851987b1b..52fb87c88d6 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/MoverScheduler.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/MoverScheduler.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartdata.SmartContext; +import org.smartdata.conf.SmartConf; import org.smartdata.conf.SmartConfKeys; import org.smartdata.hdfs.HadoopUtil; import org.smartdata.hdfs.action.HdfsAction; @@ -53,7 +54,7 @@ public class MoverScheduler extends ActionSchedulerService { private MovePlanStatistics statistics; private MovePlanMaker planMaker; private final URI nnUri; - private long dnInfoUpdateInterval = 2 * 60 * 1000; + private final long dnInfoUpdateInterval; private ScheduledExecutorService updateService; private ScheduledFuture updateServiceFuture; private long throttleInMb; @@ -67,10 +68,16 @@ public class MoverScheduler extends ActionSchedulerService { public MoverScheduler(SmartContext context, MetaStore metaStore) throws IOException { super(context, metaStore); - nnUri = HadoopUtil.getNameNodeUri(getContext().getConf()); - throttleInMb = getContext().getConf() - .getLong(SmartConfKeys.SMART_ACTION_MOVE_THROTTLE_MB_KEY, - SmartConfKeys.SMART_ACTION_MOVE_THROTTLE_MB_DEFAULT); + SmartConf conf = getContext().getConf(); + nnUri = HadoopUtil.getNameNodeUri(conf); + throttleInMb = conf.getLong( + SmartConfKeys.SMART_ACTION_MOVE_THROTTLE_MB_KEY, + SmartConfKeys.SMART_ACTION_MOVE_THROTTLE_MB_DEFAULT + ); + dnInfoUpdateInterval = conf.getLong( + SmartConfKeys.SMART_MOVER_SCHEDULER_REPORT_FETCH_INTERVAL_MS_KEY, + SmartConfKeys.SMART_MOVER_SCHEDULER_REPORT_FETCH_INTERVAL_MS_DEFAULT + ); if (throttleInMb > 0) { rateLimiter = RateLimiter.create(throttleInMb); } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/SmallFileScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/SmallFileScheduler.java index 32606a73273..61bef8cead9 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/SmallFileScheduler.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/SmallFileScheduler.java @@ -56,6 +56,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.smartdata.conf.SmartConfKeys.SMART_SMALL_FILE_METASTORE_INSERT_BATCH_SIZE_DEFAULT; +import static org.smartdata.conf.SmartConfKeys.SMART_SMALL_FILE_METASTORE_INSERT_BATCH_SIZE_KEY; import static org.smartdata.model.ActionInfo.OLD_FILE_ID; public class SmallFileScheduler extends ActionSchedulerService { @@ -91,17 +93,21 @@ public class SmallFileScheduler extends ActionSchedulerService { */ private ScheduledExecutorService executorService; - private static final int META_STORE_INSERT_BATCH_SIZE = 200; public static final String COMPACT_ACTION_NAME = "compact"; public static final String UNCOMPACT_ACTION_NAME = "uncompact"; public static final List ACTIONS = Arrays.asList(COMPACT_ACTION_NAME, UNCOMPACT_ACTION_NAME); - private DFSClient dfsClient; private static final Logger LOG = LoggerFactory.getLogger(SmallFileScheduler.class); + private final int metastoreInsertBatchSize; + private DFSClient dfsClient; public SmallFileScheduler(SmartContext context, MetaStore metaStore) { super(context, metaStore); this.metaStore = metaStore; + this.metastoreInsertBatchSize = context.getConf().getInt( + SMART_SMALL_FILE_METASTORE_INSERT_BATCH_SIZE_KEY, + SMART_SMALL_FILE_METASTORE_INSERT_BATCH_SIZE_DEFAULT + ); } @Override @@ -661,7 +667,7 @@ private void syncMetaStore() { List compactFileStates = new ArrayList<>(); // Get compact file states from compactFileStateQueue - for (int i = 0; i < META_STORE_INSERT_BATCH_SIZE; i++) { + for (int i = 0; i < metastoreInsertBatchSize; i++) { CompactFileState compactFileState = compactFileStateQueue.poll(); if (compactFileState != null) { try { diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java index 46a2ab3e027..4d918d620e4 100644 --- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java @@ -31,6 +31,7 @@ import org.junit.Test; import org.smartdata.SmartContext; import org.smartdata.conf.SmartConf; +import org.smartdata.conf.SmartConfKeys; import org.smartdata.hdfs.MiniClusterFactory; import org.smartdata.hdfs.action.CacheFileAction; import org.smartdata.hdfs.action.UncacheFileAction; @@ -71,7 +72,7 @@ public void init() throws Exception { dfs = cluster.getFileSystem(); dfsClient = dfs.getClient(); smartContext = new SmartContext(conf); - cachedListFetcher = new CachedListFetcher(600l, dfsClient, metaStore); + cachedListFetcher = new CachedListFetcher(conf, dfsClient, metaStore); } static void initConf(Configuration conf) { @@ -79,6 +80,7 @@ static void initConf(Configuration conf) { conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); + conf.setLong(SmartConfKeys.SMART_CACHED_FILE_FETCH_INTERVAL_MS_KEY, 600); } @After diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestNamespaceFetcher.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestNamespaceFetcher.java index 791216f316f..abf8f74d4ba 100644 --- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestNamespaceFetcher.java +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestNamespaceFetcher.java @@ -20,6 +20,7 @@ import com.google.common.collect.Sets; import java.util.HashSet; import java.util.Set; +import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -39,6 +40,7 @@ import static org.mockito.Mockito.*; import static org.smartdata.conf.SmartConfKeys.SMART_IGNORED_PATH_TEMPLATES_KEY; import static org.smartdata.conf.SmartConfKeys.SMART_IGNORE_DIRS_KEY; +import static org.smartdata.conf.SmartConfKeys.SMART_NAMESPACE_FETCH_INTERVAL_MS_KEY; import java.io.IOException; import java.util.ArrayList; @@ -73,13 +75,12 @@ public Void answer(InvocationOnMock invocationOnMock) { return null; } }).when(adapter).insertFiles(any(FileInfo[].class)); - NamespaceFetcher fetcher; - if(conf != null) { - fetcher = new NamespaceFetcher(client, adapter, 100, conf); - } else { - fetcher = new NamespaceFetcher(client, adapter, 100); - } - return fetcher; + + SmartConf nonNullConfig = Optional.ofNullable(conf) + .orElseGet(SmartConf::new); + nonNullConfig.setLong(SMART_NAMESPACE_FETCH_INTERVAL_MS_KEY, 100L); + + return new NamespaceFetcher(client, adapter, nonNullConfig); } @Test diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableManager.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableManager.java index 10e802c1f90..21556b464b7 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableManager.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableManager.java @@ -18,6 +18,7 @@ package org.smartdata.metastore.dao; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartdata.metastore.MetaStore; @@ -33,12 +34,16 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -public class AccessCountTableManager { - private static final int NUM_DAY_TABLES_TO_KEEP = 30; - private static final int NUM_HOUR_TABLES_TO_KEEP = 48; - private static final int NUM_MINUTE_TABLES_TO_KEEP = 120; - private static final int NUM_SECOND_TABLES_TO_KEEP = 30; +import static org.smartdata.conf.SmartConfKeys.SMART_NUM_DAY_TABLES_TO_KEEP_DEFAULT; +import static org.smartdata.conf.SmartConfKeys.SMART_NUM_DAY_TABLES_TO_KEEP_KEY; +import static org.smartdata.conf.SmartConfKeys.SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT; +import static org.smartdata.conf.SmartConfKeys.SMART_NUM_HOUR_TABLES_TO_KEEP_KEY; +import static org.smartdata.conf.SmartConfKeys.SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT; +import static org.smartdata.conf.SmartConfKeys.SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY; +import static org.smartdata.conf.SmartConfKeys.SMART_NUM_SECOND_TABLES_TO_KEEP_DEFAULT; +import static org.smartdata.conf.SmartConfKeys.SMART_NUM_SECOND_TABLES_TO_KEEP_KEY; +public class AccessCountTableManager { private final MetaStore metaStore; private final Map tableDeques; private final AccessEventAggregator accessEventAggregator; @@ -49,40 +54,48 @@ public class AccessCountTableManager { LoggerFactory.getLogger(AccessCountTableManager.class); public AccessCountTableManager(MetaStore adapter) { - this(adapter, Executors.newFixedThreadPool(4)); + this(adapter, Executors.newFixedThreadPool(4), new Configuration()); } - public AccessCountTableManager(MetaStore adapter, ExecutorService service) { + public AccessCountTableManager(MetaStore adapter, + ExecutorService service, Configuration configuration) { this.metaStore = adapter; this.tableDeques = new HashMap<>(); this.executorService = service; this.accessEventAggregator = new AccessEventAggregator(adapter, this); - this.initTables(); + this.initTables(configuration); } - private void initTables() { + private void initTables(Configuration configuration) { AccessCountTableAggregator aggregator = new AccessCountTableAggregator(metaStore); - AccessCountTableDeque dayTableDeque = - new AccessCountTableDeque(new CountEvictor(metaStore, NUM_DAY_TABLES_TO_KEEP)); + + int perDayAccessTablesCount = configuration.getInt(SMART_NUM_DAY_TABLES_TO_KEEP_KEY, + SMART_NUM_DAY_TABLES_TO_KEEP_DEFAULT); + AccessCountTableDeque dayTableDeque = new AccessCountTableDeque( + new CountEvictor(metaStore, perDayAccessTablesCount)); TableAddOpListener dayTableListener = new TableAddOpListener.DayTableListener(dayTableDeque, aggregator, executorService); - AccessCountTableDeque hourTableDeque = - new AccessCountTableDeque( - new CountEvictor(metaStore, NUM_HOUR_TABLES_TO_KEEP), dayTableListener); + int perHourAccessTablesCount = configuration.getInt(SMART_NUM_HOUR_TABLES_TO_KEEP_KEY, + SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT); + AccessCountTableDeque hourTableDeque = new AccessCountTableDeque( + new CountEvictor(metaStore, perHourAccessTablesCount), dayTableListener); TableAddOpListener hourTableListener = new TableAddOpListener.HourTableListener(hourTableDeque, aggregator, executorService); - AccessCountTableDeque minuteTableDeque = - new AccessCountTableDeque( - new CountEvictor(metaStore, NUM_MINUTE_TABLES_TO_KEEP), hourTableListener); + int perMinuteAccessTablesCount = configuration.getInt(SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY, + SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT); + AccessCountTableDeque minuteTableDeque = new AccessCountTableDeque( + new CountEvictor(metaStore, perMinuteAccessTablesCount), hourTableListener); TableAddOpListener minuteTableListener = new TableAddOpListener.MinuteTableListener(minuteTableDeque, aggregator, executorService); - this.secondTableDeque = - new AccessCountTableDeque( - new CountEvictor(metaStore, NUM_SECOND_TABLES_TO_KEEP), minuteTableListener); + int perSecondAccessTablesCount = configuration.getInt(SMART_NUM_SECOND_TABLES_TO_KEEP_KEY, + SMART_NUM_SECOND_TABLES_TO_KEEP_DEFAULT); + this.secondTableDeque = new AccessCountTableDeque( + new CountEvictor(metaStore, perSecondAccessTablesCount), minuteTableListener); + this.tableDeques.put(TimeGranularity.SECOND, this.secondTableDeque); this.tableDeques.put(TimeGranularity.MINUTE, minuteTableDeque); this.tableDeques.put(TimeGranularity.HOUR, hourTableDeque);