diff --git a/build.gradle b/build.gradle index ad90c0802..0fed3a527 100644 --- a/build.gradle +++ b/build.gradle @@ -53,7 +53,7 @@ dependencies { errorproneJavac('com.google.errorprone:javac:9+181-r4173-1') errorprone('com.google.errorprone:error_prone_core:2.3.4') - compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.5' + compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.30' compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25' compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3' compile group: 'com.google.code.gson', name: 'gson', version: '2.8.6' diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java b/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java index 56636d514..5bd25f76a 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java @@ -39,10 +39,7 @@ import java.lang.reflect.Type; import java.time.Duration; import java.util.Objects; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; @@ -59,6 +56,7 @@ public class SyncWorkflowWorker private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4); private SuspendableWorker ldaWorker; private POJOActivityTaskHandler ldaTaskHandler; + private final IWorkflowService service; public SyncWorkflowWorker( IWorkflowService service, @@ -74,6 +72,7 @@ public SyncWorkflowWorker( ThreadPoolExecutor workflowThreadPool) { Objects.requireNonNull(workflowThreadPool); this.dataConverter = workflowOptions.getDataConverter(); + this.service = service; factory = new POJOWorkflowImplementationFactory( @@ -252,4 +251,8 @@ public R queryWorkflowExecution( public void accept(PollForDecisionTaskResponse pollForDecisionTaskResponse) { workflowWorker.accept(pollForDecisionTaskResponse); } + + public CompletableFuture isHealthy() { + return service.isHealthy(); + } } diff --git a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java index 613f4d776..d313aa4c7 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java @@ -46,6 +46,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -317,6 +318,11 @@ private class WorkflowServiceWrapper implements IWorkflowService { private final IWorkflowService impl; + @Override + public CompletableFuture isHealthy() { + return impl.isHealthy(); + } + private WorkflowServiceWrapper(IWorkflowService impl) { if (impl == null) { // Create empty implementation that just ignores all requests. diff --git a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java index 90a9b00c8..e5f5cf8a0 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java @@ -484,6 +484,11 @@ public void GetWorkflowExecutionHistoryWithTimeout( impl.GetWorkflowExecutionHistoryWithTimeout(getRequest, resultHandler, timeoutInMillis); } + @Override + public CompletableFuture isHealthy() { + return impl.isHealthy(); + } + @Override public void PollForDecisionTask( PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler) diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java index 79f3385fb..dfd59136f 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java @@ -845,6 +845,13 @@ public void GetWorkflowExecutionHistoryWithTimeout( GetWorkflowExecutionHistory(getRequest, resultHandler); } + @Override + public CompletableFuture isHealthy() { + CompletableFuture rval = new CompletableFuture<>(); + rval.complete(Boolean.TRUE); + return rval; + } + @Override public void PollForDecisionTask( PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler) throws TException { diff --git a/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java b/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java index d275ac8f5..b04c1e2df 100644 --- a/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java +++ b/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java @@ -23,6 +23,7 @@ import com.uber.cadence.StartWorkflowExecutionRequest; import com.uber.cadence.WorkflowService.AsyncIface; import com.uber.cadence.WorkflowService.Iface; +import java.util.concurrent.CompletableFuture; import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; @@ -70,6 +71,7 @@ void GetWorkflowExecutionHistoryWithTimeout( AsyncMethodCallback resultHandler, Long timeoutInMillis) throws TException; + /** * SignalWorkflowExecutionWithTimeout signal workflow same as SignalWorkflowExecution but with * timeout @@ -84,4 +86,10 @@ void SignalWorkflowExecutionWithTimeout( AsyncMethodCallback resultHandler, Long timeoutInMillis) throws TException; + + /** + * Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer + * list + */ + CompletableFuture isHealthy(); } diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index 9f950b21c..00a3bf661 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -104,6 +104,7 @@ import com.uber.tchannel.errors.ErrorType; import com.uber.tchannel.messages.ThriftRequest; import com.uber.tchannel.messages.ThriftResponse; +import com.uber.tchannel.messages.generated.Meta; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -127,7 +128,7 @@ public class WorkflowServiceTChannel implements IWorkflowService { private final ClientOptions options; private final Map thriftHeaders; private final TChannel tChannel; - private final SubChannel subChannel; + private SubChannel subChannel; /** * Creates Cadence client that connects to the specified host and port using specified options. @@ -159,6 +160,13 @@ public WorkflowServiceTChannel(ClientOptions options) { + Version.FEATURE_VERSION); } + public void resetSubchannelPeers() throws UnknownHostException { + InetAddress address = InetAddress.getByName(options.getHost()); + ArrayList peers = new ArrayList<>(); + peers.add(new InetSocketAddress(address, options.getPort())); + this.subChannel.setPeers(peers); + } + /** * Creates Cadence client with specified sub channel and options. * @@ -207,6 +215,49 @@ private ThriftRequest buildThriftRequest(String apiName, T body) { return buildThriftRequest(apiName, body, null); } + /** + * Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer + * list + */ + @Override + public CompletableFuture isHealthy() { + final ThriftRequest req = + new ThriftRequest.Builder(options.getServiceName(), "Meta::health") + .setBody(new Meta.health_args()) + .build(); + final CompletableFuture result = new CompletableFuture<>(); + try { + + final TFuture> future = this.subChannel.send(req); + future.addCallback( + response -> { + req.releaseQuietly(); + if (response.isError()) { + try { + this.resetSubchannelPeers(); + } catch (final Exception inner_e) { + } + result.completeExceptionally(new TException("Rpc error:" + response.getError())); + } else { + result.complete(response.getBody(Meta.health_result.class).getSuccess().isOk()); + } + try { + response.release(); + } catch (final Exception e) { + // ignore + } + }); + } catch (final TChannelError e) { + req.releaseQuietly(); + try { + this.resetSubchannelPeers(); + } catch (final Exception inner_e) { + } + result.complete(Boolean.FALSE); + } + return result; + } + private ThriftRequest buildThriftRequest(String apiName, T body, Long rpcTimeoutOverride) { String endpoint = getEndpoint(INTERFACE_NAME, apiName); ThriftRequest.Builder builder = diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 8ce5956a3..0e2073151 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -330,4 +330,12 @@ public void resumePolling() { public boolean isSuspended() { return workflowWorker.isSuspended() && activityWorker.isSuspended(); } + + /** + * Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer + * list + */ + public CompletableFuture isHealthy() { + return workflowWorker.isHealthy(); + } } diff --git a/src/main/java/com/uber/cadence/worker/WorkerFactory.java b/src/main/java/com/uber/cadence/worker/WorkerFactory.java index 9a3d7d5c0..a1916661e 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerFactory.java +++ b/src/main/java/com/uber/cadence/worker/WorkerFactory.java @@ -39,10 +39,12 @@ import java.util.List; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -287,6 +289,20 @@ public synchronized void shutdownNow() { } } + /** + * Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer + * list + */ + public CompletableFuture isHealthy() { + List> healthyList = + workers.stream().map(Worker::isHealthy).collect(Collectors.toList()); + CompletableFuture result = CompletableFuture.supplyAsync(() -> true); + for (CompletableFuture future : healthyList) { + result = result.thenCombine(future, (current, other) -> current && other); + } + return result; + } + /** * Blocks until all tasks have completed execution after a shutdown request, or the timeout * occurs, or the current thread is interrupted, whichever happens first. diff --git a/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java b/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java index 3b6135f60..eb105a4e3 100644 --- a/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java +++ b/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java @@ -17,8 +17,7 @@ package com.uber.cadence.workerFactory; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import com.uber.cadence.client.WorkflowClient; import com.uber.cadence.serviceclient.ClientOptions; @@ -65,6 +64,11 @@ public void whenAFactoryIsStartedAllWorkersStart() { factory.start(); assertTrue(factory.isStarted()); + try { + assertTrue(factory.isHealthy().get()); + } catch (Exception e) { + assertNull("Failed to check if cluster is health!", e); + } factory.shutdown(); factory.awaitTermination(1, TimeUnit.SECONDS); } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index b87fe0374..19ed8e82a 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -28,4 +28,4 @@ - \ No newline at end of file +