From 5b6914b3e850230a4b3544ae7a9afaf88b89d07b Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Mon, 23 May 2016 15:49:47 -0700 Subject: [PATCH] Fixing unsubscription races by modeling explicit FSMs for command and thread execution state * Also made onThreadComplete hook fire as soon as user-provided execution emits terminal or is unsubscribed --- .../com/netflix/hystrix/AbstractCommand.java | 179 +++++++++--------- .../com/netflix/hystrix/HystrixCounters.java | 8 +- .../netflix/hystrix/HystrixCommandTest.java | 8 +- .../hystrix/HystrixObservableCommandTest.java | 16 +- 4 files changed, 104 insertions(+), 107 deletions(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java index b4873a1e2..33d2bdfdc 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java @@ -63,10 +63,18 @@ protected final HystrixThreadPoolKey threadPoolKey; protected final HystrixCommandProperties properties; - protected static enum TimedOutStatus { + protected enum TimedOutStatus { NOT_EXECUTED, COMPLETED, TIMED_OUT } + protected enum CommandState { + NOT_STARTED, OBSERVABLE_CHAIN_CREATED, USER_CODE_EXECUTED, UNSUBSCRIBED, TERMINAL + } + + protected enum ThreadState { + NOT_USING_THREAD, STARTED, UNSUBSCRIBED, TERMINAL + } + protected final HystrixCommandMetrics metrics; protected final HystrixCommandKey commandKey; @@ -93,10 +101,8 @@ protected static enum TimedOutStatus { protected final AtomicReference> timeoutTimer = new AtomicReference>(); - protected final AtomicBoolean commandStarted = new AtomicBoolean(); - protected volatile boolean executionStarted = false; - protected volatile boolean threadExecutionStarted = false; - protected volatile boolean isExecutionComplete = false; + protected AtomicReference commandState = new AtomicReference(CommandState.NOT_STARTED); + protected AtomicReference threadState = new AtomicReference(ThreadState.NOT_USING_THREAD); /* * {@link ExecutionResult} refers to what happened as the user-provided code ran. If request-caching is used, @@ -356,7 +362,7 @@ public void call() { */ public Observable toObservable() { /* this is a stateful object so can only be used once */ - if (!commandStarted.compareAndSet(false, true)) { + if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); } @@ -382,18 +388,16 @@ public Observable toObservable() { } } - // ensure that cleanup code only runs exactly once - final AtomicBoolean commandCleanupExecuted = new AtomicBoolean(false); - //doOnCompleted handler already did all of the SUCCESS work //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work final Action0 terminateCommandCleanup = new Action0() { @Override public void call() { - if (commandCleanupExecuted.compareAndSet(false, true)) { - isExecutionComplete = true; - handleCommandEnd(_cmd); + if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { + handleCommandEnd(_cmd, false); //user code never ran + } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { + handleCommandEnd(_cmd, true); //user code did run } } }; @@ -402,11 +406,16 @@ public void call() { final Action0 unsubscribeCommandCleanup = new Action0() { @Override public void call() { - if (commandCleanupExecuted.compareAndSet(false, true)) { - eventNotifier.markEvent(HystrixEventType.CANCELLED, commandKey); - executionResultAtTimeOfCancellation = executionResult - .addEvent((int) (System.currentTimeMillis() - commandStartTimestamp), HystrixEventType.CANCELLED); - handleCommandEnd(_cmd); + if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { + _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); + _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult + .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); + handleCommandEnd(_cmd, false); //user code never ran + } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { + _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); + _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult + .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); + handleCommandEnd(_cmd, true); //user code did run } } }; @@ -449,13 +458,6 @@ public void call() { } }; - final Action1 fireOnErrorHook = new Action1() { - @Override - public void call(Throwable throwable) { - - } - }; - Observable hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); @@ -484,8 +486,7 @@ public void call(Throwable throwable) { return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once - .doOnCompleted(fireOnCompletedHook) - .doOnError(fireOnErrorHook); + .doOnCompleted(fireOnCompletedHook); } private Observable applyHystrixSemantics(final AbstractCommand _cmd) { @@ -614,16 +615,19 @@ private Observable executeCommandWithSpecifiedIsolation(final AbstractCommand @Override public Observable call() { executionResult = executionResult.setExecutionOccurred(); - executionStarted = true; + if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { + return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); + } + metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); - } else { - // not timed out so execute - threadExecutionStarted = true; + } + if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { + //we have not been unsubscribed, so should proceed HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run @@ -640,10 +644,34 @@ public Observable call() { } catch (Throwable ex) { return Observable.error(ex); } + } else { + //command has already been unsubscribed, so return immediately + return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } + }).doOnTerminate(new Action0() { + @Override + public void call() { + if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { + handleThreadEnd(_cmd); + } + if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { + //if it was never started and received terminal, then no need to clean up (I don't think this is possible) + } + //if it was unsubscribed, then other cleanup handled it + } + }).doOnUnsubscribe(new Action0() { + @Override + public void call() { + if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { + handleThreadEnd(_cmd); + } + if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { + //if it was never started and was cancelled, then no need to clean up + } + //if it was terminal, then other cleanup handled it + } }).subscribeOn(threadPool.getScheduler(new Func0() { - @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT); @@ -654,7 +682,10 @@ public Boolean call() { @Override public Observable call() { executionResult = executionResult.setExecutionOccurred(); - executionStarted = true; + if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { + return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); + } + metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // semaphore isolated // store the command that is being run @@ -821,66 +852,42 @@ private Observable getUserExecutionObservable(final AbstractCommand _cmd) userObservable = Observable.error(ex); } - final AtomicBoolean threadStateCleanedUp = new AtomicBoolean(false); - return userObservable .lift(new ExecutionHookApplication(_cmd)) - .lift(new DeprecatedOnRunHookApplication(_cmd)) + .lift(new DeprecatedOnRunHookApplication(_cmd)); + } + + private Observable handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache fromCache, final AbstractCommand _cmd) { + try { + executionHook.onCacheHit(this); + } catch (Throwable hookEx) { + logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx); + } + + return fromCache.toObservableWithStateCopiedInto(this) .doOnTerminate(new Action0() { @Override public void call() { - //If the command timed out, then the calling thread has already walked away so we need - //to handle these markers. Otherwise, the calling thread will perform these for us. - - if (threadExecutionStarted && isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { - if (threadStateCleanedUp.compareAndSet(false, true)) { - handleThreadEnd(_cmd); - } + if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { + cleanUpAfterResponseFromCache(_cmd, false); //user code never ran + } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { + cleanUpAfterResponseFromCache(_cmd, true); //user code did run } } }) .doOnUnsubscribe(new Action0() { @Override public void call() { - if (threadExecutionStarted && isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { - if (threadStateCleanedUp.compareAndSet(false, true)) { - handleThreadEnd(_cmd); - } + if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { + cleanUpAfterResponseFromCache(_cmd, false); //user code never ran + } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { + cleanUpAfterResponseFromCache(_cmd, true); //user code did run } } }); } - private Observable handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache fromCache, final AbstractCommand _cmd) { - try { - executionHook.onCacheHit(this); - } catch (Throwable hookEx) { - logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx); - } - - final AtomicBoolean cleanupCompleted = new AtomicBoolean(false); - - return fromCache.toObservableWithStateCopiedInto(this).doOnTerminate(new Action0() { - @Override - public void call() { - if (!cleanupCompleted.get()) { - cleanUpAfterResponseFromCache(_cmd); - isExecutionComplete = true; - cleanupCompleted.set(true); - } - } - }).doOnUnsubscribe(new Action0() { - @Override - public void call() { - if (!cleanupCompleted.get()) { - cleanUpAfterResponseFromCache(_cmd); - cleanupCompleted.set(true); - } - } - }); - } - - private void cleanUpAfterResponseFromCache(AbstractCommand _cmd) { + private void cleanUpAfterResponseFromCache(final AbstractCommand _cmd, boolean commandExecutionStarted) { Reference tl = timeoutTimer.get(); if (tl != null) { tl.clear(); @@ -893,16 +900,11 @@ private void cleanUpAfterResponseFromCache(AbstractCommand _cmd) { .setNotExecutedInThread(); ExecutionResult cacheOnlyForMetrics = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE) .markUserThreadCompletion(latency); - metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, executionStarted); + metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, commandExecutionStarted); eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey); - - //in case of timeout, the work chained onto the Hystrix thread has the responsibility of this cleanup - if (threadExecutionStarted && !isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { - handleThreadEnd(_cmd); - } } - private void handleCommandEnd(AbstractCommand _cmd) { + private void handleCommandEnd(final AbstractCommand _cmd, boolean commandExecutionStarted) { Reference tl = timeoutTimer.get(); if (tl != null) { tl.clear(); @@ -911,19 +913,14 @@ private void handleCommandEnd(AbstractCommand _cmd) { long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp; executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency); if (executionResultAtTimeOfCancellation == null) { - metrics.markCommandDone(executionResult, commandKey, threadPoolKey, executionStarted); + metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted); } else { - metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, executionStarted); + metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted); } if (endCurrentThreadExecutingCommand != null) { endCurrentThreadExecutingCommand.call(); } - - //in case of timeout, the work chained onto the Hystrix thread has the responsibility of this cleanup - if (threadExecutionStarted && !isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { - handleThreadEnd(_cmd); - } } private Observable handleSemaphoreRejectionViaFallback() { @@ -1722,7 +1719,7 @@ public boolean isCircuitBreakerOpen() { * @return boolean */ public boolean isExecutionComplete() { - return isExecutionComplete; + return commandState.get().equals(CommandState.TERMINAL); } /** diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCounters.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCounters.java index 47a74eb4e..2b42c2cc6 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCounters.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCounters.java @@ -24,12 +24,12 @@ public class HystrixCounters { private static final AtomicInteger concurrentThreadsExecuting = new AtomicInteger(0); - /* package-private */ static void incrementGlobalConcurrentThreads() { - concurrentThreadsExecuting.incrementAndGet(); + /* package-private */ static int incrementGlobalConcurrentThreads() { + return concurrentThreadsExecuting.incrementAndGet(); } - /* package-private */ static void decrementGlobalConcurrentThreads() { - concurrentThreadsExecuting.decrementAndGet(); + /* package-private */ static int decrementGlobalConcurrentThreads() { + return concurrentThreadsExecuting.decrementAndGet(); } /** diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java index 36104add3..1c858ecf5 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java @@ -3763,7 +3763,7 @@ public void call(TestHystrixCommand command) { assertTrue(hook.fallbackEventsMatch(0, 0, 0)); assertEquals(HystrixBadRequestException.class, hook.getCommandException().getClass()); assertEquals(HystrixBadRequestException.class, hook.getExecutionException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onError - onThreadComplete - ", hook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onThreadComplete - onError - ", hook.executionSequence.toString()); } }); } @@ -3798,7 +3798,7 @@ public void call(TestHystrixCommand command) { assertEquals(RuntimeException.class, hook.getCommandException().getClass()); assertEquals(RuntimeException.class, hook.getExecutionException().getClass()); assertNull(hook.getFallbackException()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onError - onThreadComplete - ", hook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onThreadComplete - onError - ", hook.executionSequence.toString()); } }); } @@ -3829,7 +3829,7 @@ public void call(TestHystrixCommand command) { assertTrue(hook.executionEventsMatch(0, 1, 0)); assertTrue(hook.fallbackEventsMatch(1, 0, 1)); assertEquals(RuntimeException.class, hook.getExecutionException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onFallbackStart - onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - onFallbackSuccess - onThreadComplete - onSuccess - ", hook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onThreadComplete - onFallbackStart - onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - onFallbackSuccess - onSuccess - ", hook.executionSequence.toString()); } }); } @@ -3862,7 +3862,7 @@ public void call(TestHystrixCommand command) { assertEquals(RuntimeException.class, hook.getCommandException().getClass()); assertEquals(RuntimeException.class, hook.getExecutionException().getClass()); assertEquals(RuntimeException.class, hook.getFallbackException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", hook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onThreadComplete - onFallbackStart - onFallbackError - onError - ", hook.executionSequence.toString()); } }); } diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java index 5ad8beb94..5589e424f 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java @@ -2108,12 +2108,12 @@ public void call(TestHystrixObservableCommand command) { "onExecutionEmit - !onRunSuccess - !onComplete - onEmit - " + "onExecutionEmit - !onRunSuccess - !onComplete - onEmit - " + "onExecutionEmit - !onRunSuccess - !onComplete - onEmit - " + - "onExecutionError - !onRunError - onFallbackStart - " + + "onExecutionError - !onRunError - onThreadComplete - onFallbackStart - " + "onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - " + "onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - " + "onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - " + "onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - " + - "onFallbackSuccess - onThreadComplete - onSuccess - ", command.getBuilder().executionHook.executionSequence.toString()); + "onFallbackSuccess - onSuccess - ", command.getBuilder().executionHook.executionSequence.toString()); } }); } @@ -2144,7 +2144,7 @@ public void call(TestHystrixObservableCommand command) { assertTrue(hook.fallbackEventsMatch(0, 0, 0)); assertEquals(HystrixBadRequestException.class, hook.getCommandException().getClass()); assertEquals(HystrixBadRequestException.class, hook.getExecutionException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onError - onThreadComplete - ", command.getBuilder().executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onThreadComplete - onError - ", command.getBuilder().executionHook.executionSequence.toString()); } }); } @@ -2177,7 +2177,7 @@ public void call(TestHystrixObservableCommand command) { assertEquals(RuntimeException.class, hook.getCommandException().getClass()); assertEquals(RuntimeException.class, hook.getExecutionException().getClass()); assertNull(hook.getFallbackException()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onError - onThreadComplete - ", command.getBuilder().executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onThreadComplete - onError - ", command.getBuilder().executionHook.executionSequence.toString()); } }); } @@ -2208,7 +2208,7 @@ public void call(TestHystrixObservableCommand command) { assertTrue(hook.executionEventsMatch(0, 1, 0)); assertTrue(hook.fallbackEventsMatch(1, 0, 1)); assertEquals(RuntimeException.class, hook.getExecutionException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onFallbackStart - onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - onFallbackSuccess - onThreadComplete - onSuccess - ", command.getBuilder().executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onThreadComplete - onFallbackStart - onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - onFallbackSuccess - onSuccess - ", command.getBuilder().executionHook.executionSequence.toString()); } }); } @@ -2241,7 +2241,7 @@ public void call(TestHystrixObservableCommand command) { assertEquals(RuntimeException.class, hook.getCommandException().getClass()); assertEquals(RuntimeException.class, hook.getExecutionException().getClass()); assertEquals(RuntimeException.class, hook.getFallbackException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.getBuilder().executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onThreadComplete - onFallbackStart - onFallbackError - onError - ", command.getBuilder().executionHook.executionSequence.toString()); } }); } @@ -2274,7 +2274,7 @@ public void call(TestHystrixObservableCommand command) { assertEquals(RuntimeException.class, hook.getCommandException().getClass()); assertEquals(RuntimeException.class, hook.getExecutionException().getClass()); assertEquals(RuntimeException.class, hook.getFallbackException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.getBuilder().executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onThreadComplete - onFallbackStart - onFallbackError - onError - ", command.getBuilder().executionHook.executionSequence.toString()); } }); } @@ -2307,7 +2307,7 @@ public void call(TestHystrixObservableCommand command) { assertEquals(RuntimeException.class, hook.getCommandException().getClass()); assertEquals(RuntimeException.class, hook.getExecutionException().getClass()); assertEquals(RuntimeException.class, hook.getFallbackException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.getBuilder().executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onExecutionError - !onRunError - onThreadComplete - onFallbackStart - onFallbackError - onError - ", command.getBuilder().executionHook.executionSequence.toString()); } }); }