Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor context prop support to allow things like opentelemtry to work properly with the cadence sdk #618

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
compile group: 'io.micrometer', name: 'micrometer-core', version: '1.1.2'
compile group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2'
compile group: 'com.auth0', name: 'java-jwt', version:'3.10.2'
compile group: 'io.opentelemetry', name: 'opentelemetry-sdk', version: '1.1.0'

testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/com/uber/cadence/context/ContextPropagator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
* Context Propagators are used to propagate information from workflow to activity, workflow to
* child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}).
*
* <p>It is important to note that all threads share one ContextPropagator instance, so your
* implementation <b>must</b> be thread-safe and store any state in ThreadLocal variables.
*
* <p>A sample <code>ContextPropagator</code> that copies all {@link org.slf4j.MDC} entries starting
* with a given prefix along the code path looks like this:
*
Expand Down Expand Up @@ -136,4 +139,31 @@ public interface ContextPropagator {

/** Sets the current context */
void setCurrentContext(Object context);

/**
* This is a lifecycle method, called after the context has been propagated to the
* workflow/activity thread but the workflow/activity has not yet started.
*/
default void setUp() {
// No-op
}

/**
* This is a lifecycle method, called after the workflow/activity has completed. If the method
* finished without exception, {@code successful} will be true. Otherwise, it will be false and
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where/what is {@code successful}?

* {@link #onError(Throwable)} will have already been called.
*/
default void finish() {
// No-op
}

/**
* This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled
* exception. {@link #finish()} is called after this method.
*
* @param t The unhandled exception that caused the workflow/activity to terminate
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: terminate is a special in Cadence only for specific cases of closed. You can use either close or stop to describe different cases finish with exceptions

*/
default void onError(Throwable t) {
// No-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.context;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.slf4j.MDC;

public class OpenTelemetryContextPropagator implements ContextPropagator {

private static final TextMapPropagator w3cTraceContextPropagator =
W3CTraceContextPropagator.getInstance();
private static final TextMapPropagator w3cBaggagePropagator = W3CBaggagePropagator.getInstance();
private static ThreadLocal<Scope> currentContextOtelScope = new ThreadLocal<>();
private static ThreadLocal<Span> currentOtelSpan = new ThreadLocal<>();
private static ThreadLocal<Scope> currentOtelScope = new ThreadLocal<>();
private static ThreadLocal<Iterable<String>> otelKeySet = new ThreadLocal<>();
private static final TextMapSetter<Map<String, String>> setter = Map::put;
private static final TextMapGetter<Map<String, String>> getter =
new TextMapGetter<Map<String, String>>() {
@Override
public Iterable<String> keys(Map<String, String> carrier) {
return otelKeySet.get();
}

@Nullable
@Override
public String get(Map<String, String> carrier, String key) {
return MDC.get(key);
}
};

@Override
public String getName() {
return "OpenTelemetry";
}

@Override
public Map<String, byte[]> serializeContext(Object context) {
Map<String, byte[]> serializedContext = new HashMap<>();
Map<String, String> contextMap = (Map<String, String>) context;
if (contextMap != null) {
for (Map.Entry<String, String> entry : contextMap.entrySet()) {
serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset()));
}
}
return serializedContext;
}

@Override
public Object deserializeContext(Map<String, byte[]> context) {
Map<String, String> contextMap = new HashMap<>();
for (Map.Entry<String, byte[]> entry : context.entrySet()) {
contextMap.put(entry.getKey(), new String(entry.getValue(), Charset.defaultCharset()));
}
return contextMap;
}

@Override
public Object getCurrentContext() {
Map<String, String> carrier = new HashMap<>();
w3cTraceContextPropagator.inject(Context.current(), carrier, setter);
w3cBaggagePropagator.inject(Context.current(), carrier, setter);
return carrier;
}

@Override
public void setCurrentContext(Object context) {
Map<String, String> contextMap = (Map<String, String>) context;
if (contextMap != null) {
for (Map.Entry<String, String> entry : contextMap.entrySet()) {
MDC.put(entry.getKey(), entry.getValue());
}
otelKeySet.set(contextMap.keySet());
}
}

@Override
@SuppressWarnings("MustBeClosedChecker")
public void setUp() {
Context context =
Baggage.fromContext(w3cBaggagePropagator.extract(Context.current(), null, getter))
.toBuilder()
.build()
.storeInContext(w3cTraceContextPropagator.extract(Context.current(), null, getter));

currentContextOtelScope.set(context.makeCurrent());

Span span =
GlobalOpenTelemetry.getTracer("cadence-client")
.spanBuilder("cadence.workflow")
Comment on lines +118 to +119
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not familar with opentelemtry -- How are these two properties used? I wonder if we should avoid harhcoding here because users may need to customize them.

.setParent(context)
.setSpanKind(SpanKind.CLIENT)
.startSpan();

Scope scope = span.makeCurrent();
currentOtelSpan.set(span);
currentOtelScope.set(scope);
}

@Override
public void finish() {
Scope scope = currentOtelScope.get();
if (scope != null) {
scope.close();
}
Span span = currentOtelSpan.get();
if (span != null) {
span.end();
}
Scope contextScope = currentContextOtelScope.get();
if (contextScope != null) {
contextScope.close();
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the onError is not implemented? is it not needed for OpenTelemetry?

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.uber.cadence.SearchAttributes;
import com.uber.cadence.TaskList;
import com.uber.cadence.TaskListKind;
import com.uber.cadence.context.ContextPropagator;
import com.uber.cadence.converter.DataConverter;
import com.uber.cadence.converter.JsonDataConverter;
import com.uber.cadence.internal.worker.Shutdownable;
Expand All @@ -39,6 +40,8 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
Expand Down Expand Up @@ -251,6 +254,21 @@ public static List<HistoryEvent> DeserializeFromBlobDataToHistoryEvents(List<Dat
return events;
}

// Filters the data based upon which propagator it is for
public static Map<String, byte[]> FilterForPropagator(
Map<String, byte[]> data,
ContextPropagator propagator
) {
return data
.entrySet()
.stream()
.filter(e -> e.getKey().startsWith(propagator.getName()))
.collect(
Collectors.toMap(
e -> e.getKey().substring(propagator.getName().length() + 1),
Map.Entry::getValue));
}

/** Prohibit instantiation */
private InternalUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** This class holds the current set of context propagators */
/** This class holds the current set of context propagators. */
public class ContextThreadLocal {

private static final Logger log = LoggerFactory.getLogger(ContextThreadLocal.class);

private static WorkflowThreadLocal<List<ContextPropagator>> contextPropagators =
WorkflowThreadLocal.withInitial(
new Supplier<List<ContextPropagator>>() {
Expand All @@ -37,7 +41,7 @@ public List<ContextPropagator> get() {
}
});

/** Sets the list of context propagators for the thread */
/** Sets the list of context propagators for the thread. */
public static void setContextPropagators(List<ContextPropagator> propagators) {
if (propagators == null || propagators.isEmpty()) {
return;
Expand All @@ -57,6 +61,11 @@ public static Map<String, Object> getCurrentContextForPropagation() {
return contextData;
}

/**
* Injects the context data into the thread for each configured context propagator.
*
* @param contextData The context data received from the server.
*/
public static void propagateContextToCurrentThread(Map<String, Object> contextData) {
if (contextData == null || contextData.isEmpty()) {
return;
Expand All @@ -67,4 +76,44 @@ public static void propagateContextToCurrentThread(Map<String, Object> contextDa
}
}
}

/** Calls {@link ContextPropagator#setUp()} for each propagator. */
public static void setUpContextPropagators() {
for (ContextPropagator propagator : contextPropagators.get()) {
try {
propagator.setUp();
} catch (Throwable t) {
// Don't let an error in one propagator block the others
log.error("Error calling setUp() on a contextpropagator", t);
}
}
}

/**
* Calls {@link ContextPropagator#onError(Throwable)} for each propagator.
*
* @param t The Throwable that caused the workflow/activity to finish.
*/
public static void onErrorContextPropagators(Throwable t) {
for (ContextPropagator propagator : contextPropagators.get()) {
try {
propagator.onError(t);
} catch (Throwable t1) {
// Don't let an error in one propagator block the others
log.error("Error calling onError() on a contextpropagator", t1);
}
}
}

/** Calls {@link ContextPropagator#finish()} for each propagator. */
public static void finishContextPropagators() {
for (ContextPropagator propagator : contextPropagators.get()) {
try {
propagator.finish();
} catch (Throwable t) {
// Don't let an error in one propagator block the others
log.error("Error calling finish() on a contextpropagator", t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import com.uber.cadence.*;
import com.uber.cadence.context.ContextPropagator;
import com.uber.cadence.internal.common.InternalUtils;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

final class WorkflowContext {

Expand Down Expand Up @@ -166,7 +169,10 @@ Map<String, Object> getPropagatedContexts() {

Map<String, Object> contextData = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
// Only send the context propagator the fields that belong to them
// Change the map from MyPropagator:foo -> bar to foo -> bar
Map<String, byte[]> filteredData = InternalUtils.FilterForPropagator(headerData, propagator);
contextData.put(propagator.getName(), propagator.deserializeContext(filteredData));
}

return contextData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -453,7 +454,18 @@ private Map<String, byte[]> extractContextsAndConvertToBytes(
}
Map<String, byte[]> result = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
// Get the serialized context from the propagator
Map<String, byte[]> serializedContext =
propagator.serializeContext(propagator.getCurrentContext());
// Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar
Map<String, byte[]> namespacedSerializedContext =
serializedContext
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> propagator.getName() + ":" + e.getKey(), Map.Entry::getValue));
result.putAll(namespacedSerializedContext);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

class WorkflowStubImpl implements WorkflowStub {

Expand Down Expand Up @@ -204,7 +205,18 @@ private Map<String, byte[]> extractContextsAndConvertToBytes(
}
Map<String, byte[]> result = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
// Get the serialized context from the propagator
Map<String, byte[]> serializedContext =
propagator.serializeContext(propagator.getCurrentContext());
// Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar
Map<String, byte[]> namespacedSerializedContext =
serializedContext
.entrySet()
.stream()
.collect(
Collectors.toMap(
k -> propagator.getName() + ":" + k.getKey(), Map.Entry::getValue));
result.putAll(namespacedSerializedContext);
Comment on lines +208 to +219
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move to a util to reuse the code. It's repeated 4 times for ChildWF, WF, activity and localActivity.

}
return result;
}
Expand Down
Loading