Skip to content

Commit

Permalink
Standardized duration conversion and always make it coarsest possible. (
Browse files Browse the repository at this point in the history
  • Loading branch information
akara authored and sebady committed Mar 14, 2019
1 parent 6065cc1 commit 3183441
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 42 deletions.
7 changes: 4 additions & 3 deletions squbs-ext/src/main/scala/org/squbs/streams/Timeout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import akka.stream._
import akka.stream.scaladsl.{BidiFlow, Flow}
import akka.stream.stage.{GraphStage, _}
import com.typesafe.scalalogging.LazyLogging
import org.squbs.util.DurationConverters
import org.squbs.streams.TimeoutBidi._

import scala.collection.mutable
Expand Down Expand Up @@ -193,7 +194,7 @@ object Timeout {
*/
def create[In, Out, Context](timeout: java.time.Duration):
javadsl.BidiFlow[Pair[In, Context], Pair[In, Context], Pair[Out, Context], Pair[Try[Out], Context], NotUsed] =
toJava(apply[In, Out, Context](FiniteDuration(timeout.toMillis, TimeUnit.MILLISECONDS)))
toJava(apply[In, Out, Context](DurationConverters.toScala(timeout)))

}

Expand Down Expand Up @@ -355,7 +356,7 @@ object TimeoutOrdered {
def create[In, Out](timeout: java.time.Duration,
cleanUp: Consumer[Out]):
akka.stream.javadsl.BidiFlow[In, In, Out, Try[Out], NotUsed] = {
apply(FiniteDuration(timeout.toMillis, TimeUnit.MILLISECONDS), (out: Out) => cleanUp.accept(out)).asJava
apply(DurationConverters.toScala(timeout), (out: Out) => cleanUp.accept(out)).asJava
}

/**
Expand All @@ -367,7 +368,7 @@ object TimeoutOrdered {
*/
def create[In, Out](timeout: java.time.Duration):
akka.stream.javadsl.BidiFlow[In, In, Out, Try[Out], NotUsed] = {
apply(FiniteDuration(timeout.toMillis, TimeUnit.MILLISECONDS), (_: Out) => ()).asJava
apply(DurationConverters.toScala(timeout), (_: Out) => ()).asJava
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.squbs.metrics.MetricsExtension
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

import org.squbs.util.DurationConverters._

object AtomicCircuitBreakerState {

/**
Expand Down Expand Up @@ -92,11 +94,7 @@ object AtomicCircuitBreakerState {
resetTimeout: java.time.Duration,
executor: ExecutionContext,
scheduler: Scheduler): CircuitBreakerState =
apply(name,
maxFailures,
FiniteDuration(callTimeout.toMillis, TimeUnit.MILLISECONDS),
FiniteDuration(resetTimeout.toMillis, TimeUnit.MILLISECONDS)
)(executor, scheduler)
apply(name, maxFailures, toScala(callTimeout), toScala(resetTimeout))(executor, scheduler)

/**
* Java API
Expand All @@ -119,13 +117,8 @@ object AtomicCircuitBreakerState {
exponentialBackoffFactor: Double,
executor: ExecutionContext,
scheduler: Scheduler): CircuitBreakerState =
apply(name,
maxFailures,
FiniteDuration(callTimeout.toMillis, TimeUnit.MILLISECONDS),
FiniteDuration(resetTimeout.toMillis, TimeUnit.MILLISECONDS),
FiniteDuration(maxResetTimeout.toMillis, TimeUnit.MILLISECONDS),
exponentialBackoffFactor
)(executor, scheduler)
apply(name, maxFailures, toScala(callTimeout), toScala(resetTimeout), toScala(maxResetTimeout),
exponentialBackoffFactor)(executor, scheduler)

/**
* Java API
Expand Down
67 changes: 67 additions & 0 deletions squbs-ext/src/main/scala/org/squbs/util/DurationConverters.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2012-2017 Typesafe Inc. <http://www.typesafe.com>
*/
package org.squbs.util

import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
import java.time.{Duration => JavaDuration}

import scala.concurrent.duration.{FiniteDuration, Duration => ScalaDuration}


/**
* This class contains static methods which convert between Java Durations
* and the durations from the Scala concurrency package. This is useful when mediating between Scala and Java
* libraries with asynchronous APIs where timeouts for example are often expressed as durations.
*/
object DurationConverters {

/**
* Transform a Java duration into a Scala duration. If the nanosecond part of the Java duration is zero the returned
* duration will have a time unit of seconds and if nthere is a nanoseconds part the Scala duration will have a time
* unit of nanoseconds.
*
* @throws IllegalArgumentException If the given Java Duration is out of bounds of what can be expressed with the
* Scala FiniteDuration.
*/
final def toScala(duration: java.time.Duration): scala.concurrent.duration.FiniteDuration = {
val originalSeconds = duration.getSeconds
val originalNanos = duration.getNano
if (originalNanos == 0) {
if (originalSeconds == 0) ScalaDuration.Zero
else FiniteDuration(originalSeconds, TimeUnit.SECONDS).toCoarsest.asInstanceOf[FiniteDuration]
} else if (originalSeconds == 0) {
FiniteDuration(originalNanos, TimeUnit.NANOSECONDS).toCoarsest.asInstanceOf[FiniteDuration]
} else {
try {
val secondsAsNanos = Math.multiplyExact(originalSeconds, 1000000000)
val totalNanos = secondsAsNanos + originalNanos
if ((totalNanos < 0 && secondsAsNanos < 0) || (totalNanos > 0 && secondsAsNanos > 0))
FiniteDuration(totalNanos, TimeUnit.NANOSECONDS).toCoarsest.asInstanceOf[FiniteDuration]
else
throw new ArithmeticException()
} catch {
case _: ArithmeticException =>
throw new IllegalArgumentException(s"Java duration $duration cannot be expressed as a Scala duration")
}
}
}

/**
* Transform a Scala FiniteDuration into a Java duration. Note that the Scala duration keeps the time unit it was created
* with while a Java duration always is a pair of seconds and nanos, so the unit it lost.
*/
final def toJava(duration: scala.concurrent.duration.FiniteDuration): java.time.Duration = {
if (duration.length == 0) JavaDuration.ZERO
else duration.unit match {
case TimeUnit.NANOSECONDS => JavaDuration.ofNanos(duration.length)
case TimeUnit.MICROSECONDS => JavaDuration.of(duration.length, ChronoUnit.MICROS)
case TimeUnit.MILLISECONDS => JavaDuration.ofMillis(duration.length)
case TimeUnit.SECONDS => JavaDuration.ofSeconds(duration.length)
case TimeUnit.MINUTES => JavaDuration.ofMinutes(duration.length)
case TimeUnit.HOURS => JavaDuration.ofHours(duration.length)
case TimeUnit.DAYS => JavaDuration.ofDays(duration.length)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testCreateCircuitBreakerStateWithExponentialBackoff() throws Excepti
assertJmxValue("java-params-with-custom-exponential-backoff", "MaxFailures", 1);
assertJmxValue("java-params-with-custom-exponential-backoff", "CallTimeout", "50 milliseconds");
assertJmxValue("java-params-with-custom-exponential-backoff", "ResetTimeout", "20 milliseconds");
assertJmxValue("java-params-with-custom-exponential-backoff", "MaxResetTimeout", "120000 milliseconds"); // 2 minutes
assertJmxValue("java-params-with-custom-exponential-backoff", "MaxResetTimeout", "2 minutes");
assertJmxValue("java-params-with-custom-exponential-backoff", "ExponentialBackoffFactor", 16.0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ public void testBuilderAPI() throws Exception {
assertJmxValue(oNameHC, "MaxConnections", 41);
assertJmxValue(oNameHC, "Environment", "PROD");
assertJmxValue(oNameCB, "MaxFailures", 11);
assertJmxValue(oNameCB, "CallTimeout", "12000 milliseconds"); // 12 seconds
assertJmxValue(oNameCB, "ResetTimeout", "780000 milliseconds"); // 13 minutes
assertJmxValue(oNameCB, "MaxResetTimeout", "1209600000 milliseconds"); // 14 days
assertJmxValue(oNameCB, "CallTimeout", "12 seconds");
assertJmxValue(oNameCB, "ResetTimeout", "13 minutes");
assertJmxValue(oNameCB, "MaxResetTimeout", "14 days");
assertJmxValue(oNameCB, "ExponentialBackoffFactor", 16.0);
}

Expand Down Expand Up @@ -189,9 +189,9 @@ public void testAllInputsAPI() throws Exception {
assertJmxValue(oNameHC, "MaxConnections", 41);
assertJmxValue(oNameHC, "Environment", "PROD");
assertJmxValue(oNameCB, "MaxFailures", 11);
assertJmxValue(oNameCB, "CallTimeout", "12000 milliseconds"); // 12 seconds
assertJmxValue(oNameCB, "ResetTimeout", "780000 milliseconds"); // 13 minutes
assertJmxValue(oNameCB, "MaxResetTimeout", "1209600000 milliseconds"); // 14 days
assertJmxValue(oNameCB, "CallTimeout", "12 seconds");
assertJmxValue(oNameCB, "ResetTimeout", "13 minutes");
assertJmxValue(oNameCB, "MaxResetTimeout", "14 days");
assertJmxValue(oNameCB, "ExponentialBackoffFactor", 16.0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.typesafe.scalalogging.LazyLogging
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.postfixOps
import org.squbs.util.DurationConverters._

/**
*
Expand Down Expand Up @@ -93,7 +94,7 @@ abstract class TimeoutPolicy(name: Option[String], initial: FiniteDuration, star

// API for java
def execute[T](f: TimedFn[T]): T = {
execute((t: FiniteDuration) => f.get(java.time.Duration.ofNanos(t.toNanos)))
execute((t: FiniteDuration) => f.get(toJava(t)))
}

/**
Expand Down Expand Up @@ -278,7 +279,7 @@ object TimeoutPolicy extends LazyLogging {
*/
object TimeoutPolicyBuilder {
def create(initial: java.time.Duration, ec: ExecutionContext) =
TimeoutPolicyBuilder(initial = FiniteDuration(initial.getNano, TimeUnit.NANOSECONDS))(ec)
TimeoutPolicyBuilder(initial = toScala(initial))(ec)
}

case class TimeoutPolicyBuilder(name: Option[String] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.squbs.pattern.timeoutpolicy;

import akka.actor.ActorSystem;
import org.squbs.util.DurationConverters;

import java.time.Duration;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -73,7 +74,7 @@ public Duration invokeExecute(TimeoutPolicy policy) {
} catch (Exception e) {
System.out.println(e);
}
return Duration.ofMillis(policy.waitTime().toMillis());
return DurationConverters.toJava(policy.waitTime());
}

public Duration invokeExecuteClosure(TimeoutPolicy policy) {
Expand All @@ -85,20 +86,20 @@ public Duration invokeExecuteClosure(TimeoutPolicy policy) {
} catch (Exception e) {
System.out.println(e);
}
return Duration.ofMillis(policy.waitTime().toMillis());
return DurationConverters.toJava(policy.waitTime());
}

public Duration invokeInline(TimeoutPolicy policy) {
TimeoutPolicy.TimeoutTransaction tx = policy.transaction();
try {
return timedFn.get(Duration.ofMillis(tx.waitTime().toMillis()));
return timedFn.get(DurationConverters.toJava(tx.waitTime()));
} catch (Exception e) {
System.out.println(e);
} finally {
tx.end();
}
return Duration.ofMillis(policy.waitTime().toMillis());
}
return DurationConverters.toJava(policy.waitTime());
}

public TimeoutPolicy getFixedTimeoutPolicy() {
return fixedTimeoutPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import akka.actor._
import akka.pattern.GracefulStopSupport
import org.squbs.unicomplex.{StopTimeout, Unicomplex}
import org.squbs.util.DurationConverters

import scala.compat.java8.FutureConverters._
import scala.compat.java8.FunctionConverters._
Expand Down Expand Up @@ -96,7 +97,7 @@ trait GracefulStopHelper extends GracefulStopSupport with ActorLogging{this: Act
def stopDependencies(msg: Any) = {
// Note: We need to call the Java API as that will again call the Scala API.
// Any overrides will now come into picture.
Future.sequence(dependencies.map(gracefulStop(_, java.time.Duration.ofMillis(timeout.toMillis), msg).toScala))
Future.sequence(dependencies.map(gracefulStop(_, DurationConverters.toJava(timeout), msg).toScala))
}

stopDependencies(GracefulStop).onComplete({
Expand All @@ -119,10 +120,9 @@ trait GracefulStopHelper extends GracefulStopSupport with ActorLogging{this: Act
* Java API stopping non-leaf actors.
* @param dependencies All non-leaf actors to be stopped.
* @param timeout The timeout.
* @param unit The time unit of the timeout.
*/
protected final def defaultMidActorStop(dependencies: java.util.List[ActorRef], duration: java.time.Duration): Unit =
defaultMidActorStop(dependencies.asScala, Duration.fromNanos(duration.toNanos))
protected final def defaultMidActorStop(dependencies: java.util.List[ActorRef], timeout: java.time.Duration): Unit =
defaultMidActorStop(dependencies.asScala, DurationConverters.toScala(timeout))

/**
* Java API stopping non-leaf actors with default timeout.
Expand All @@ -135,25 +135,23 @@ trait GracefulStopHelper extends GracefulStopSupport with ActorLogging{this: Act
* Java API for gracefulStop.
* @param target The target actor to stop.
* @param timeout The timeout.
* @param unit The time unit of the timeout.
* @param stopMessage The message to send to the actor for stopping.
* @return A CompletionStage carrying `true` for success stopping the target.
*/
protected def gracefulStop(target: ActorRef, duration: java.time.Duration, stopMessage: Any):
java.util.concurrent.CompletionStage[java.lang.Boolean] =
gracefulStop(target, Duration.fromNanos(duration.toNanos), stopMessage).toJava
.thenApply(asJavaFunction((t: Boolean) => t:java.lang.Boolean))
protected def gracefulStop(target: ActorRef, timeout: java.time.Duration, stopMessage: Any):
java.util.concurrent.CompletionStage[java.lang.Boolean] =
gracefulStop(target, DurationConverters.toScala(timeout), stopMessage).toJava
.thenApply(asJavaFunction((t: Boolean) => t:java.lang.Boolean))

/**
* Java API for gracefulStop using the default message - PoisonPill.
* @param target The target actor to stop.
* @param timeout The timeout.
* @param unit The time unit of the timeout.
* @return A CompletionStage carrying `true` for success stopping the target.
*/
protected final def gracefulStop(target: ActorRef, duration: java.time.Duration):
protected final def gracefulStop(target: ActorRef, timeout: java.time.Duration):
java.util.concurrent.CompletionStage[java.lang.Boolean] =
gracefulStop(target, duration, PoisonPill)
gracefulStop(target, timeout, PoisonPill)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import akka.stream.ThrottleMode;
import akka.stream.javadsl.*;
import org.squbs.unicomplex.Timeouts;
import org.squbs.util.DurationConverters;

import java.time.Duration;
import java.util.Optional;
Expand Down Expand Up @@ -73,7 +74,7 @@ public CompletionStage<Done> shutdown() {
ActorRef actorRef = matValue().first();
CompletionStage<Long> fCount = matValue().second();
CompletionStage<Boolean> fStopped =
gracefulStop(actorRef, Duration.ofSeconds(Timeouts.awaitMax().toSeconds()));
gracefulStop(actorRef, DurationConverters.toJava(Timeouts.awaitMax()));
return fCount.thenCombine(fStopped, (count, stopped) -> Done.getInstance());
}
}

0 comments on commit 3183441

Please sign in to comment.