Skip to content

Commit

Permalink
Merge branch 'master' into manger-otel-context-prop
Browse files Browse the repository at this point in the history
  • Loading branch information
AngerM-DD authored Jun 25, 2021
2 parents 16e84dd + 63a34bc commit 9002821
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 9 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ dependencies {
errorproneJavac('com.google.errorprone:javac:9+181-r4173-1')
errorprone('com.google.errorprone:error_prone_core:2.3.4')

compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.5'
compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.30'
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.6'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -59,6 +56,7 @@ public class SyncWorkflowWorker
private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);
private SuspendableWorker ldaWorker;
private POJOActivityTaskHandler ldaTaskHandler;
private final IWorkflowService service;

public SyncWorkflowWorker(
IWorkflowService service,
Expand All @@ -74,6 +72,7 @@ public SyncWorkflowWorker(
ThreadPoolExecutor workflowThreadPool) {
Objects.requireNonNull(workflowThreadPool);
this.dataConverter = workflowOptions.getDataConverter();
this.service = service;

factory =
new POJOWorkflowImplementationFactory(
Expand Down Expand Up @@ -252,4 +251,8 @@ public <R> R queryWorkflowExecution(
public void accept(PollForDecisionTaskResponse pollForDecisionTaskResponse) {
workflowWorker.accept(pollForDecisionTaskResponse);
}

public CompletableFuture<Boolean> isHealthy() {
return service.isHealthy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -317,6 +318,11 @@ private class WorkflowServiceWrapper implements IWorkflowService {

private final IWorkflowService impl;

@Override
public CompletableFuture<Boolean> isHealthy() {
return impl.isHealthy();
}

private WorkflowServiceWrapper(IWorkflowService impl) {
if (impl == null) {
// Create empty implementation that just ignores all requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ public void GetWorkflowExecutionHistoryWithTimeout(
impl.GetWorkflowExecutionHistoryWithTimeout(getRequest, resultHandler, timeoutInMillis);
}

@Override
public CompletableFuture<Boolean> isHealthy() {
return impl.isHealthy();
}

@Override
public void PollForDecisionTask(
PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,13 @@ public void GetWorkflowExecutionHistoryWithTimeout(
GetWorkflowExecutionHistory(getRequest, resultHandler);
}

@Override
public CompletableFuture<Boolean> isHealthy() {
CompletableFuture<Boolean> rval = new CompletableFuture<>();
rval.complete(Boolean.TRUE);
return rval;
}

@Override
public void PollForDecisionTask(
PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler) throws TException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.uber.cadence.StartWorkflowExecutionRequest;
import com.uber.cadence.WorkflowService.AsyncIface;
import com.uber.cadence.WorkflowService.Iface;
import java.util.concurrent.CompletableFuture;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;

Expand Down Expand Up @@ -70,6 +71,7 @@ void GetWorkflowExecutionHistoryWithTimeout(
AsyncMethodCallback resultHandler,
Long timeoutInMillis)
throws TException;

/**
* SignalWorkflowExecutionWithTimeout signal workflow same as SignalWorkflowExecution but with
* timeout
Expand All @@ -84,4 +86,10 @@ void SignalWorkflowExecutionWithTimeout(
AsyncMethodCallback resultHandler,
Long timeoutInMillis)
throws TException;

/**
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
* list
*/
CompletableFuture<Boolean> isHealthy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import com.uber.tchannel.errors.ErrorType;
import com.uber.tchannel.messages.ThriftRequest;
import com.uber.tchannel.messages.ThriftResponse;
import com.uber.tchannel.messages.generated.Meta;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
Expand All @@ -127,7 +128,7 @@ public class WorkflowServiceTChannel implements IWorkflowService {
private final ClientOptions options;
private final Map<String, String> thriftHeaders;
private final TChannel tChannel;
private final SubChannel subChannel;
private SubChannel subChannel;

/**
* Creates Cadence client that connects to the specified host and port using specified options.
Expand Down Expand Up @@ -159,6 +160,13 @@ public WorkflowServiceTChannel(ClientOptions options) {
+ Version.FEATURE_VERSION);
}

public void resetSubchannelPeers() throws UnknownHostException {
InetAddress address = InetAddress.getByName(options.getHost());
ArrayList<InetSocketAddress> peers = new ArrayList<>();
peers.add(new InetSocketAddress(address, options.getPort()));
this.subChannel.setPeers(peers);
}

/**
* Creates Cadence client with specified sub channel and options.
*
Expand Down Expand Up @@ -207,6 +215,49 @@ private <T> ThriftRequest<T> buildThriftRequest(String apiName, T body) {
return buildThriftRequest(apiName, body, null);
}

/**
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
* list
*/
@Override
public CompletableFuture<Boolean> isHealthy() {
final ThriftRequest<Meta.health_args> req =
new ThriftRequest.Builder<Meta.health_args>(options.getServiceName(), "Meta::health")
.setBody(new Meta.health_args())
.build();
final CompletableFuture<Boolean> result = new CompletableFuture<>();
try {

final TFuture<ThriftResponse<Meta.health_result>> future = this.subChannel.send(req);
future.addCallback(
response -> {
req.releaseQuietly();
if (response.isError()) {
try {
this.resetSubchannelPeers();
} catch (final Exception inner_e) {
}
result.completeExceptionally(new TException("Rpc error:" + response.getError()));
} else {
result.complete(response.getBody(Meta.health_result.class).getSuccess().isOk());
}
try {
response.release();
} catch (final Exception e) {
// ignore
}
});
} catch (final TChannelError e) {
req.releaseQuietly();
try {
this.resetSubchannelPeers();
} catch (final Exception inner_e) {
}
result.complete(Boolean.FALSE);
}
return result;
}

private <T> ThriftRequest<T> buildThriftRequest(String apiName, T body, Long rpcTimeoutOverride) {
String endpoint = getEndpoint(INTERFACE_NAME, apiName);
ThriftRequest.Builder<T> builder =
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/uber/cadence/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,12 @@ public void resumePolling() {
public boolean isSuspended() {
return workflowWorker.isSuspended() && activityWorker.isSuspended();
}

/**
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
* list
*/
public CompletableFuture<Boolean> isHealthy() {
return workflowWorker.isHealthy();
}
}
16 changes: 16 additions & 0 deletions src/main/java/com/uber/cadence/worker/WorkerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -287,6 +289,20 @@ public synchronized void shutdownNow() {
}
}

/**
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
* list
*/
public CompletableFuture<Boolean> isHealthy() {
List<CompletableFuture<Boolean>> healthyList =
workers.stream().map(Worker::isHealthy).collect(Collectors.toList());
CompletableFuture<Boolean> result = CompletableFuture.supplyAsync(() -> true);
for (CompletableFuture<Boolean> future : healthyList) {
result = result.thenCombine(future, (current, other) -> current && other);
}
return result;
}

/**
* Blocks until all tasks have completed execution after a shutdown request, or the timeout
* occurs, or the current thread is interrupted, whichever happens first.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package com.uber.cadence.workerFactory;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import com.uber.cadence.client.WorkflowClient;
import com.uber.cadence.serviceclient.ClientOptions;
Expand Down Expand Up @@ -65,6 +64,11 @@ public void whenAFactoryIsStartedAllWorkersStart() {

factory.start();
assertTrue(factory.isStarted());
try {
assertTrue(factory.isHealthy().get());
} catch (Exception e) {
assertNull("Failed to check if cluster is health!", e);
}
factory.shutdown();
factory.awaitTermination(1, TimeUnit.SECONDS);
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
</configuration>

0 comments on commit 9002821

Please sign in to comment.