diff --git a/src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java b/src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java index b55bd6a5f..587bca131 100644 --- a/src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java +++ b/src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java @@ -39,6 +39,8 @@ public interface TaskHandler { private final String taskList; private final TaskHandler handler; + private Throttler taskRateThrottler; + PollTaskExecutor( String domain, String taskList, SingleWorkerOptions options, TaskHandler handler) { this.domain = domain; @@ -59,6 +61,10 @@ public interface TaskHandler { options.getPollerOptions().getPollThreadNamePrefix().replaceFirst("Poller", "Executor"), options.getPollerOptions().getUncaughtExceptionHandler())); taskExecutor.setRejectedExecutionHandler(new BlockCallerPolicy()); + + if (options.getWorkerActivitiesPerSecond() > 0.0) { + taskRateThrottler = new Throttler("PollTaskExecutor", options.getWorkerActivitiesPerSecond()); + } } @Override @@ -68,6 +74,9 @@ public void process(T task) { MDC.put(LoggerTag.DOMAIN, domain); MDC.put(LoggerTag.TASK_LIST, taskList); try { + if (taskRateThrottler != null) { + taskRateThrottler.throttle(); + } handler.handle(task); } catch (Throwable ee) { options diff --git a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java index e898d5646..9a05c3943 100644 --- a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java +++ b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java @@ -31,6 +31,7 @@ public final class SingleWorkerOptions { public static final class Builder { + private double workerActivitiesPerSecond; private String identity; private DataConverter dataConverter; private int taskExecutorThreadPoolSize = 100; @@ -47,6 +48,7 @@ public static final class Builder { public Builder() {} public Builder(SingleWorkerOptions options) { + this.workerActivitiesPerSecond = options.getWorkerActivitiesPerSecond(); this.identity = options.getIdentity(); this.dataConverter = options.getDataConverter(); this.pollerOptions = options.getPollerOptions(); @@ -59,6 +61,11 @@ public Builder(SingleWorkerOptions options) { this.contextPropagators = options.getContextPropagators(); } + public Builder setWorkerActivitiesPerSecond(double workerActivitiesPerSecond) { + this.workerActivitiesPerSecond = workerActivitiesPerSecond; + return this; + } + public Builder setIdentity(String identity) { this.identity = identity; return this; @@ -137,6 +144,7 @@ public SingleWorkerOptions build() { } return new SingleWorkerOptions( + workerActivitiesPerSecond, identity, dataConverter, taskExecutorThreadPoolSize, @@ -150,6 +158,7 @@ public SingleWorkerOptions build() { } } + private final double workerActivitiesPerSecond; private final String identity; private final DataConverter dataConverter; private final int taskExecutorThreadPoolSize; @@ -162,6 +171,7 @@ public SingleWorkerOptions build() { private List contextPropagators; private SingleWorkerOptions( + double workerActivitiesPerSecond, String identity, DataConverter dataConverter, int taskExecutorThreadPoolSize, @@ -172,6 +182,7 @@ private SingleWorkerOptions( Scope metricsScope, boolean enableLoggingInReplay, List contextPropagators) { + this.workerActivitiesPerSecond = workerActivitiesPerSecond; this.identity = identity; this.dataConverter = dataConverter; this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize; @@ -184,6 +195,10 @@ private SingleWorkerOptions( this.contextPropagators = contextPropagators; } + double getWorkerActivitiesPerSecond() { + return workerActivitiesPerSecond; + } + public String getIdentity() { return identity; } diff --git a/src/main/java/com/uber/cadence/internal/worker/Throttler.java b/src/main/java/com/uber/cadence/internal/worker/Throttler.java index 0d7d97776..b6cadde89 100644 --- a/src/main/java/com/uber/cadence/internal/worker/Throttler.java +++ b/src/main/java/com/uber/cadence/internal/worker/Throttler.java @@ -38,8 +38,30 @@ final class Throttler { private final long rateIntervalMilliseconds; + /** Default 1s interval when interval per message is less than 1ms */ + private static final long defaultRateIntervalMilliseconds = 1000L; + private long overslept; + private static long calculateIntervalMillisecondsPerMessage(double maxRatePerSecond) { + return (long) (1 / maxRatePerSecond * 1000.0); + } + + /** + * Construct throttler. + * + * @param name Human readable name of the resource being throttled. Used for logging only. + * @param maxRatePerSecond maximum rate allowed + */ + public Throttler(String name, double maxRatePerSecond) { + this( + name, + maxRatePerSecond, + calculateIntervalMillisecondsPerMessage(maxRatePerSecond) <= 0L + ? defaultRateIntervalMilliseconds + : calculateIntervalMillisecondsPerMessage(maxRatePerSecond)); + } + /** * Construct throttler. * @@ -69,7 +91,7 @@ public synchronized void setMaxRatePerSecond(double maxRatePerSecond) { int maxMessagesPerRateInterval = (int) (maxRatePerSecond * rateIntervalMilliseconds / 1000); if (maxMessagesPerRateInterval == 0) { maxMessagesPerRateInterval = 1; - rateInterval = (long) (1.0 / maxRatePerSecond * 1000.0); + rateInterval = calculateIntervalMillisecondsPerMessage(maxRatePerSecond); } else { rateInterval = rateIntervalMilliseconds; } diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index a9e68b3d8..fc7ac890b 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -141,6 +141,7 @@ private static SingleWorkerOptions toActivityOptions( .put(MetricsTag.TASK_LIST, taskList) .build(); return new SingleWorkerOptions.Builder() + .setWorkerActivitiesPerSecond(options.getWorkerActivitiesPerSecond()) .setDataConverter(options.getDataConverter()) .setIdentity(options.getIdentity()) .setPollerOptions(options.getActivityPollerOptions())