Skip to content

Commit

Permalink
[Transform] Make default transform scheduler frequency 1s again (elas…
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Jun 30, 2022
1 parent d84c9e4 commit 2e02ae9
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
Setting.Property.Dynamic
);

public static final TimeValue DEFAULT_SCHEDULER_FREQUENCY = TimeValue.timeValueMillis(500);
public static final TimeValue DEFAULT_SCHEDULER_FREQUENCY = TimeValue.timeValueSeconds(1);
// How often does the transform scheduler process the tasks
public static final Setting<TimeValue> SCHEDULER_FREQUENCY = Setting.timeSetting(
"xpack.transform.transform_scheduler_frequency",
DEFAULT_SCHEDULER_FREQUENCY,
TimeValue.timeValueMillis(500),
TimeValue.timeValueSeconds(1),
TimeValue.timeValueMinutes(1),
Setting.Property.NodeScope
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ protected void init(

@Override
public void triggered(TransformScheduler.Event event) {
logger.trace(() -> format("[{}] triggered(event={}) ", getTransformId(), event));
logger.trace(() -> format("[%s] triggered(event=%s) ", getTransformId(), event));
// Ignore if event is not for this job
if (event.transformId().equals(getTransformId()) == false) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -24,6 +23,8 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.core.Strings.format;

/**
* {@link TransformScheduler} class is responsible for scheduling transform tasks according to their configured frequency as well as
* retrying policy.
Expand Down Expand Up @@ -115,10 +116,7 @@ void processScheduledTasks() {
if (isTraceEnabled) {
Instant processingFinished = clock.instant();
logger.trace(
Strings.format(
"Processing scheduled tasks finished, took {}ms",
Duration.between(processingStarted, processingFinished).toMillis()
)
format("Processing scheduled tasks finished, took %dms", Duration.between(processingStarted, processingFinished).toMillis())
);
}
if (taskWasProcessed == false) {
Expand Down Expand Up @@ -152,8 +150,8 @@ private boolean processScheduledTasksInternal() {
scheduledTasks.update(scheduledTask.getTransformId(), task -> {
if (task.equals(scheduledTask) == false) {
logger.debug(
() -> Strings.format(
"[{}] task object got modified while processing. Expected: {}, was: {}",
() -> format(
"[%s] task object got modified while processing. Expected: %s, was: %s",
scheduledTask.getTransformId(),
scheduledTask,
task
Expand Down Expand Up @@ -191,7 +189,7 @@ public void stop() {
*/
public void registerTransform(TransformTaskParams transformTaskParams, Listener listener) {
String transformId = transformTaskParams.getId();
logger.trace(() -> Strings.format("[{}] register the transform", transformId));
logger.trace(() -> format("[%s] register the transform", transformId));
long currentTimeMillis = clock.millis();
TransformScheduledTask transformScheduledTask = new TransformScheduledTask(
transformId,
Expand All @@ -214,7 +212,7 @@ public void registerTransform(TransformTaskParams transformTaskParams, Listener
* @param failureCount new value of transform task's failure count
*/
public void handleTransformFailureCountChanged(String transformId, int failureCount) {
logger.trace(() -> Strings.format("[{}] handle transform failure count change to {}", transformId, failureCount));
logger.trace(() -> format("[%s] handle transform failure count change to %d", transformId, failureCount));
// Update the task's failure count (next_scheduled_time gets automatically re-calculated)
scheduledTasks.update(
transformId,
Expand All @@ -235,7 +233,7 @@ public void handleTransformFailureCountChanged(String transformId, int failureCo
*/
public void deregisterTransform(String transformId) {
Objects.requireNonNull(transformId);
logger.trace(() -> Strings.format("[{}] de-register the transform", transformId));
logger.trace(() -> format("[%s] de-register the transform", transformId));
scheduledTasks.remove(transformId);
}

Expand Down

0 comments on commit 2e02ae9

Please sign in to comment.