Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed tracing and moving traces to their correct parent in many places #2060

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.routing.ClusterRouterPool;
import org.apache.pekko.cluster.routing.ClusterRouterPoolSettings;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.persistence.RecoveryCompleted;
import org.apache.pekko.routing.Broadcast;
import org.apache.pekko.routing.ConsistentHashingPool;
import org.apache.pekko.routing.ConsistentHashingRouter;
import org.apache.pekko.routing.Pool;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand Down Expand Up @@ -108,14 +123,14 @@
import org.eclipse.ditto.connectivity.service.messaging.validation.DittoConnectivityCommandValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectionPubSub;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.utils.pekko.PingCommand;
import org.eclipse.ditto.internal.utils.pekko.logging.CommonMdcEntryKey;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.pekko.PingCommand;
import org.eclipse.ditto.internal.utils.pekko.logging.CommonMdcEntryKey;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
Expand All @@ -124,27 +139,12 @@
import org.eclipse.ditto.internal.utils.persistentactors.commands.CommandStrategy;
import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext;
import org.eclipse.ditto.internal.utils.persistentactors.events.EventStrategy;
import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CreateSubscription;

import com.typesafe.config.Config;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.routing.ClusterRouterPool;
import org.apache.pekko.cluster.routing.ClusterRouterPoolSettings;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.persistence.RecoveryCompleted;
import org.apache.pekko.routing.Broadcast;
import org.apache.pekko.routing.ConsistentHashingPool;
import org.apache.pekko.routing.ConsistentHashingRouter;
import org.apache.pekko.routing.Pool;

/**
* Handles {@code *Connection} commands and manages the persistence of connection. The actual connection handling to the
* remote server is delegated to a child actor that uses a specific client (AMQP 1.0 or 0.9.1).
Expand Down Expand Up @@ -572,22 +572,29 @@ private void askSelfForRetrieveConnectionStatus(@Nullable final CharSequence cor

@Override
public void onMutation(final Command<?> command, final ConnectivityEvent<?> event,
final WithDittoHeaders response, final boolean becomeCreated, final boolean becomeDeleted) {
final WithDittoHeaders response, final boolean becomeCreated, final boolean becomeDeleted,
@Nullable final StartedSpan startedSpan) {
if (command instanceof StagedCommand stagedCommand) {
interpretStagedCommand(stagedCommand.withSenderUnlessDefined(getSender()));
if (startedSpan != null) {
startedSpan.finish();
}
} else {
super.onMutation(command, event, response, becomeCreated, becomeDeleted);
super.onMutation(command, event, response, becomeCreated, becomeDeleted, startedSpan);
}
}

@Override
public void onStagedMutation(final Command<?> command, final CompletionStage<ConnectivityEvent<?>> event,
final CompletionStage<WithDittoHeaders> response, final boolean becomeCreated,
final boolean becomeDeleted) {
final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) {
if (command instanceof StagedCommand stagedCommand) {
interpretStagedCommand(stagedCommand.withSenderUnlessDefined(getSender()));
if (startedSpan != null) {
startedSpan.finish();
}
} else {
super.onStagedMutation(command, event, response, becomeCreated, becomeDeleted);
super.onStagedMutation(command, event, response, becomeCreated, becomeDeleted, startedSpan);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,30 @@

import javax.annotation.Nullable;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.SourceRef;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.WithEntity;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehaviorAndRequestCounting;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan;
Expand All @@ -53,18 +64,6 @@
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThings;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingsResponse;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.SourceRef;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

/**
* Acts as a client for {@code ThingsAggregatorActor} which responds
* to a {@link RetrieveThings} command via a {@link SourceRef} which is a pointer in the cluster emitting the retrieved
Expand Down Expand Up @@ -108,8 +107,8 @@ public static Props props(final ActorRef pubSubMediator) {
@Override
public Receive handleMessage() {
return ReceiveBuilder.create()
.match(RetrieveThings.class, rt -> handleRetrieveThings(rt, rt))
.match(SudoRetrieveThings.class, srt -> handleSudoRetrieveThings(srt, srt))
.match(RetrieveThings.class, this::handleRetrieveThings)
.match(SudoRetrieveThings.class, this::handleSudoRetrieveThings)
.matchAny(m -> {
log.warning("Got unknown message: {}", m);
unhandled(m);
Expand All @@ -122,52 +121,47 @@ public void serviceUnbind(final Control serviceUnbind) {
// nothing to do
}

private void handleRetrieveThings(final RetrieveThings rt, final Object msgToAsk) {
private void handleRetrieveThings(final RetrieveThings rt) {
final List<ThingId> thingIds = rt.getEntityIds();
log.withCorrelationId(rt)
.info("Got '{}' message. Retrieving requested '{}' Things..",
RetrieveThings.class.getSimpleName(), thingIds.size());

final ActorRef sender = getSender();
askTargetActor(rt, thingIds, msgToAsk, sender);
askTargetActor(rt, thingIds, getSender());
}

private void handleSudoRetrieveThings(final SudoRetrieveThings srt, final Object msgToAsk) {
private void handleSudoRetrieveThings(final SudoRetrieveThings srt) {
final List<ThingId> thingIds = srt.getThingIds();
log.withCorrelationId(srt)
.info("Got '{}' message. Retrieving requested '{}' Things..",
SudoRetrieveThings.class.getSimpleName(), thingIds.size());

final ActorRef sender = getSender();
askTargetActor(srt, thingIds, msgToAsk, sender);
askTargetActor(srt, thingIds, getSender());
}

private void askTargetActor(final Command<?> command, final List<ThingId> thingIds,
final Object msgToAsk, final ActorRef sender) {

final Object tracedMsgToAsk;
private void askTargetActor(final Command<?> command, final List<ThingId> thingIds, final ActorRef sender)
{
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final var startedSpan = DittoTracing.newPreparedSpan(
command.getDittoHeaders(),
dittoHeaders,
TRACE_AGGREGATOR_RETRIEVE_THINGS
)
.tag("size", Integer.toString(thingIds.size()))
.start();
if (msgToAsk instanceof DittoHeadersSettable<?> dittoHeadersSettable) {
tracedMsgToAsk = dittoHeadersSettable.setDittoHeaders(
DittoHeaders.of(startedSpan.propagateContext(dittoHeadersSettable.getDittoHeaders()))
);
} else {
tracedMsgToAsk = msgToAsk;
}
final Command<?> tracedCommand = command.setDittoHeaders(
DittoHeaders.of(startedSpan.propagateContext(
dittoHeaders.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
))
);

final DistributedPubSubMediator.Publish pubSubMsg =
DistPubSubAccess.publishViaGroup(command.getType(), tracedMsgToAsk);
DistPubSubAccess.publishViaGroup(tracedCommand.getType(), tracedCommand);

withRequestCounting(
Patterns.ask(pubSubMediator, pubSubMsg, Duration.ofSeconds(ASK_TIMEOUT))
.thenAccept(response -> {
if (response instanceof SourceRef) {
handleSourceRef((SourceRef<?>) response, thingIds, command, sender, startedSpan);
handleSourceRef((SourceRef<?>) response, thingIds, tracedCommand, sender, startedSpan);
} else if (response instanceof DittoRuntimeException dre) {
startedSpan.tagAsFailed(dre).finish();
sender.tell(response, getSelf());
Expand All @@ -177,7 +171,7 @@ private void askTargetActor(final Command<?> command, final List<ThingId> thingI
response.getClass().getSimpleName(), response);
final DittoInternalErrorException responseEx =
DittoInternalErrorException.newBuilder()
.dittoHeaders(command.getDittoHeaders())
.dittoHeaders(tracedCommand.getDittoHeaders())
.build();
startedSpan.tagAsFailed(responseEx).finish();
sender.tell(responseEx, getSelf());
Expand All @@ -187,7 +181,8 @@ private void askTargetActor(final Command<?> command, final List<ThingId> thingI
}

private void handleSourceRef(final SourceRef<?> sourceRef, final List<ThingId> thingIds,
final Command<?> originatingCommand, final ActorRef originatingSender, final StartedSpan startedSpan) {
final Command<?> originatingCommand, final ActorRef originatingSender, final StartedSpan startedSpan)
{
final Function<Jsonifiable<?>, PlainJson> thingPlainJsonSupplier;
final Function<List<PlainJson>, CommandResponse<?>> overallResponseSupplier;
final UnaryOperator<List<PlainJson>> plainJsonSorter = supplyPlainJsonSorter(thingIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.apache.pekko.http.javadsl.model.HttpHeader;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.model.Uri;
import org.apache.pekko.http.javadsl.server.Complete;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.http.javadsl.server.RouteResult;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.common.HttpStatusCodeOutOfRangeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
Expand All @@ -43,14 +50,6 @@
import org.eclipse.ditto.internal.utils.tracing.span.SpanTagKey;
import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan;

import org.apache.pekko.http.javadsl.model.HttpHeader;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.model.Uri;
import org.apache.pekko.http.javadsl.server.Complete;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.http.javadsl.server.RouteResult;

/**
* Custom Pekko Http directive tracing the request.
*/
Expand Down Expand Up @@ -116,7 +115,7 @@ private static SpanOperationName resolveSpanOperationName(final HttpRequest http

private static URI getTraceUri(final HttpRequest httpRequest) {
final var traceInformationGenerator = TraceInformationGenerator.getInstance();
final var traceInformation = traceInformationGenerator.apply(String.valueOf(getRelativeUri(httpRequest)));
final var traceInformation = traceInformationGenerator.apply(getRelativeUriPath(httpRequest));
return traceInformation.getTraceUri();
}

Expand All @@ -125,6 +124,12 @@ private static Uri getRelativeUri(final HttpRequest httpRequest) {
return uri.toRelative();
}

private static String getRelativeUriPath(final HttpRequest httpRequest) {
final var uri = httpRequest.getUri();
final var relativeUri = uri.toRelative();
return relativeUri.path();
}

private static String getRequestMethodName(final HttpRequest httpRequest) {
final var httpMethod = httpRequest.method();
return httpMethod.name();
Expand Down Expand Up @@ -246,10 +251,7 @@ private static void addRequestResponseTags(
@Nullable final CharSequence correlationId
) {
startedSpan.tag(SpanTagKey.REQUEST_METHOD_NAME.getTagForValue(getRequestMethodName(httpRequest)));
@Nullable final var relativeRequestUri = tryToGetRelativeRequestUri(httpRequest, correlationId);
if (null != relativeRequestUri) {
startedSpan.tag(SpanTagKey.REQUEST_URI.getTagForValue(relativeRequestUri));
}
startedSpan.tag(SpanTagKey.REQUEST_URI.getTagForValue(URI.create(getRelativeUri(httpRequest).toString())));
@Nullable final var httpStatus = tryToGetResponseHttpStatus(httpResponse, correlationId);
if (null != httpStatus) {
startedSpan.tag(SpanTagKey.HTTP_STATUS.getTagForValue(httpStatus));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.Collection;
import java.util.concurrent.Executor;

import javax.annotation.Nullable;

import org.apache.pekko.actor.ActorSystem;
import org.eclipse.ditto.gateway.service.security.authentication.AuthenticationChain;
import org.eclipse.ditto.gateway.service.security.authentication.AuthenticationFailureAggregator;
Expand All @@ -32,7 +34,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.lang.Nullable;
import com.typesafe.config.Config;

/**
Expand Down
4 changes: 4 additions & 0 deletions internal/models/signalenrichment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-signal</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-tracing</artifactId>
</dependency>

<!-- test-only -->
<dependency>
Expand Down
Loading
Loading