From 2e02ae9bd2d3da99ff58df3352bebf7528393165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Thu, 30 Jun 2022 20:16:07 +0200 Subject: [PATCH] [Transform] Make default transform scheduler frequency 1s again (#88215) --- .../xpack/transform/Transform.java | 4 ++-- .../transform/transforms/TransformTask.java | 2 +- .../scheduling/TransformScheduler.java | 18 ++++++++---------- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 2988726c07ad2..ea31d2e76c296 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -148,12 +148,12 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa Setting.Property.Dynamic ); - public static final TimeValue DEFAULT_SCHEDULER_FREQUENCY = TimeValue.timeValueMillis(500); + public static final TimeValue DEFAULT_SCHEDULER_FREQUENCY = TimeValue.timeValueSeconds(1); // How often does the transform scheduler process the tasks public static final Setting SCHEDULER_FREQUENCY = Setting.timeSetting( "xpack.transform.transform_scheduler_frequency", DEFAULT_SCHEDULER_FREQUENCY, - TimeValue.timeValueMillis(500), + TimeValue.timeValueSeconds(1), TimeValue.timeValueMinutes(1), Setting.Property.NodeScope ); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index d960c8a8f40b2..a381c1d86c89a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -395,7 +395,7 @@ protected void init( @Override public void triggered(TransformScheduler.Event event) { - logger.trace(() -> format("[{}] triggered(event={}) ", getTransformId(), event)); + logger.trace(() -> format("[%s] triggered(event=%s) ", getTransformId(), event)); // Ignore if event is not for this job if (event.transformId().equals(getTransformId()) == false) { return; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java index 9c8d309453217..20c3f9b3d0e15 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -24,6 +23,8 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import static org.elasticsearch.core.Strings.format; + /** * {@link TransformScheduler} class is responsible for scheduling transform tasks according to their configured frequency as well as * retrying policy. @@ -115,10 +116,7 @@ void processScheduledTasks() { if (isTraceEnabled) { Instant processingFinished = clock.instant(); logger.trace( - Strings.format( - "Processing scheduled tasks finished, took {}ms", - Duration.between(processingStarted, processingFinished).toMillis() - ) + format("Processing scheduled tasks finished, took %dms", Duration.between(processingStarted, processingFinished).toMillis()) ); } if (taskWasProcessed == false) { @@ -152,8 +150,8 @@ private boolean processScheduledTasksInternal() { scheduledTasks.update(scheduledTask.getTransformId(), task -> { if (task.equals(scheduledTask) == false) { logger.debug( - () -> Strings.format( - "[{}] task object got modified while processing. Expected: {}, was: {}", + () -> format( + "[%s] task object got modified while processing. Expected: %s, was: %s", scheduledTask.getTransformId(), scheduledTask, task @@ -191,7 +189,7 @@ public void stop() { */ public void registerTransform(TransformTaskParams transformTaskParams, Listener listener) { String transformId = transformTaskParams.getId(); - logger.trace(() -> Strings.format("[{}] register the transform", transformId)); + logger.trace(() -> format("[%s] register the transform", transformId)); long currentTimeMillis = clock.millis(); TransformScheduledTask transformScheduledTask = new TransformScheduledTask( transformId, @@ -214,7 +212,7 @@ public void registerTransform(TransformTaskParams transformTaskParams, Listener * @param failureCount new value of transform task's failure count */ public void handleTransformFailureCountChanged(String transformId, int failureCount) { - logger.trace(() -> Strings.format("[{}] handle transform failure count change to {}", transformId, failureCount)); + logger.trace(() -> format("[%s] handle transform failure count change to %d", transformId, failureCount)); // Update the task's failure count (next_scheduled_time gets automatically re-calculated) scheduledTasks.update( transformId, @@ -235,7 +233,7 @@ public void handleTransformFailureCountChanged(String transformId, int failureCo */ public void deregisterTransform(String transformId) { Objects.requireNonNull(transformId); - logger.trace(() -> Strings.format("[{}] de-register the transform", transformId)); + logger.trace(() -> format("[%s] de-register the transform", transformId)); scheduledTasks.remove(transformId); }