From 61ffa9890534398f722640b611d21d3bf23770e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20J=C3=A4ckle?= Date: Thu, 14 Nov 2024 15:05:07 +0100 Subject: [PATCH] fixed tracing and moving traces to their correct parent in many places --- .../ConnectionPersistenceActor.java | 55 ++++++++------ .../ThingsAggregatorProxyActor.java | 75 +++++++++---------- .../directives/RequestTracingDirective.java | 28 +++---- ...GatewayAuthenticationDirectiveFactory.java | 3 +- internal/models/signalenrichment/pom.xml | 4 + .../ByRoundTripSignalEnrichmentFacade.java | 24 ++++-- .../DittoCachingSignalEnrichmentFacade.java | 19 ++++- .../AbstractSignalEnrichmentFacadeTest.java | 20 +++++ ...JsonifiableWithDittoHeadersSerializer.java | 13 +++- .../AbstractPersistenceActor.java | 47 ++++++++---- .../persistentactors/results/EmptyResult.java | 8 +- .../persistentactors/results/ErrorResult.java | 8 +- .../results/MutationResult.java | 7 +- .../persistentactors/results/QueryResult.java | 8 +- .../persistentactors/results/Result.java | 6 +- .../results/ResultVisitor.java | 14 +++- .../internal/utils/tracing/DittoTracing.java | 2 +- .../tracing/TraceInformationGenerator.java | 13 ++-- .../TraceInformationGeneratorTest.java | 6 +- .../actors/PolicyPersistenceActor.java | 11 ++- .../TopLevelPolicyActionCommandStrategy.java | 19 ++++- .../AbstractPolicyCommandStrategyTest.java | 26 ++++--- .../commands/PolicyConflictStrategyTest.java | 14 ++-- .../actors/ThingPersistenceActor.java | 20 +++-- .../commands/AbstractCommandStrategyTest.java | 18 +++-- .../commands/ResultFactoryTest.java | 8 +- .../commands/ThingConflictStrategyTest.java | 14 ++-- .../service/starter/actors/SearchActor.java | 73 +++++++++++------- .../service/updater/actors/ThingUpdater.java | 12 ++- .../write/streaming/EnforcementFlowTest.java | 19 +++++ 30 files changed, 399 insertions(+), 195 deletions(-) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java index 9e6d360eba..01ccb06b43 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java @@ -37,6 +37,21 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.Props; +import org.apache.pekko.actor.Status; +import org.apache.pekko.actor.SupervisorStrategy; +import org.apache.pekko.cluster.Cluster; +import org.apache.pekko.cluster.routing.ClusterRouterPool; +import org.apache.pekko.cluster.routing.ClusterRouterPoolSettings; +import org.apache.pekko.japi.pf.ReceiveBuilder; +import org.apache.pekko.pattern.Patterns; +import org.apache.pekko.persistence.RecoveryCompleted; +import org.apache.pekko.routing.Broadcast; +import org.apache.pekko.routing.ConsistentHashingPool; +import org.apache.pekko.routing.ConsistentHashingRouter; +import org.apache.pekko.routing.Pool; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder; import org.eclipse.ditto.base.model.headers.DittoHeaders; @@ -108,14 +123,14 @@ import org.eclipse.ditto.connectivity.service.messaging.validation.DittoConnectivityCommandValidator; import org.eclipse.ditto.connectivity.service.util.ConnectionPubSub; import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey; -import org.eclipse.ditto.internal.utils.pekko.PingCommand; -import org.eclipse.ditto.internal.utils.pekko.logging.CommonMdcEntryKey; -import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter; -import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess; import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier; import org.eclipse.ditto.internal.utils.config.ScopedConfig; import org.eclipse.ditto.internal.utils.metrics.DittoMetrics; +import org.eclipse.ditto.internal.utils.pekko.PingCommand; +import org.eclipse.ditto.internal.utils.pekko.logging.CommonMdcEntryKey; +import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter; +import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; @@ -124,27 +139,12 @@ import org.eclipse.ditto.internal.utils.persistentactors.commands.CommandStrategy; import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext; import org.eclipse.ditto.internal.utils.persistentactors.events.EventStrategy; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CreateSubscription; import com.typesafe.config.Config; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.Props; -import org.apache.pekko.actor.Status; -import org.apache.pekko.actor.SupervisorStrategy; -import org.apache.pekko.cluster.Cluster; -import org.apache.pekko.cluster.routing.ClusterRouterPool; -import org.apache.pekko.cluster.routing.ClusterRouterPoolSettings; -import org.apache.pekko.japi.pf.ReceiveBuilder; -import org.apache.pekko.pattern.Patterns; -import org.apache.pekko.persistence.RecoveryCompleted; -import org.apache.pekko.routing.Broadcast; -import org.apache.pekko.routing.ConsistentHashingPool; -import org.apache.pekko.routing.ConsistentHashingRouter; -import org.apache.pekko.routing.Pool; - /** * Handles {@code *Connection} commands and manages the persistence of connection. The actual connection handling to the * remote server is delegated to a child actor that uses a specific client (AMQP 1.0 or 0.9.1). @@ -572,22 +572,29 @@ private void askSelfForRetrieveConnectionStatus(@Nullable final CharSequence cor @Override public void onMutation(final Command command, final ConnectivityEvent event, - final WithDittoHeaders response, final boolean becomeCreated, final boolean becomeDeleted) { + final WithDittoHeaders response, final boolean becomeCreated, final boolean becomeDeleted, + @Nullable final StartedSpan startedSpan) { if (command instanceof StagedCommand stagedCommand) { interpretStagedCommand(stagedCommand.withSenderUnlessDefined(getSender())); + if (startedSpan != null) { + startedSpan.finish(); + } } else { - super.onMutation(command, event, response, becomeCreated, becomeDeleted); + super.onMutation(command, event, response, becomeCreated, becomeDeleted, startedSpan); } } @Override public void onStagedMutation(final Command command, final CompletionStage> event, final CompletionStage response, final boolean becomeCreated, - final boolean becomeDeleted) { + final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) { if (command instanceof StagedCommand stagedCommand) { interpretStagedCommand(stagedCommand.withSenderUnlessDefined(getSender())); + if (startedSpan != null) { + startedSpan.finish(); + } } else { - super.onStagedMutation(command, event, response, becomeCreated, becomeDeleted); + super.onStagedMutation(command, event, response, becomeCreated, becomeDeleted, startedSpan); } } diff --git a/edge/service/src/main/java/org/eclipse/ditto/edge/service/dispatching/ThingsAggregatorProxyActor.java b/edge/service/src/main/java/org/eclipse/ditto/edge/service/dispatching/ThingsAggregatorProxyActor.java index 837aa939db..da2a4897f7 100644 --- a/edge/service/src/main/java/org/eclipse/ditto/edge/service/dispatching/ThingsAggregatorProxyActor.java +++ b/edge/service/src/main/java/org/eclipse/ditto/edge/service/dispatching/ThingsAggregatorProxyActor.java @@ -27,19 +27,30 @@ import javax.annotation.Nullable; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.Props; +import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator; +import org.apache.pekko.japi.pf.PFBuilder; +import org.apache.pekko.japi.pf.ReceiveBuilder; +import org.apache.pekko.pattern.Patterns; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.SourceRef; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; import org.eclipse.ditto.base.model.entity.id.WithEntityId; import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; +import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.base.model.headers.DittoHeadersSettable; import org.eclipse.ditto.base.model.json.Jsonifiable; import org.eclipse.ditto.base.model.signals.commands.Command; import org.eclipse.ditto.base.model.signals.commands.CommandResponse; import org.eclipse.ditto.base.model.signals.commands.WithEntity; +import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess; import org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehaviorAndRequestCounting; import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; -import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess; import org.eclipse.ditto.internal.utils.tracing.DittoTracing; import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName; import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; @@ -53,18 +64,6 @@ import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThings; import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingsResponse; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.Props; -import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator; -import org.apache.pekko.japi.pf.PFBuilder; -import org.apache.pekko.japi.pf.ReceiveBuilder; -import org.apache.pekko.pattern.Patterns; -import org.apache.pekko.stream.Materializer; -import org.apache.pekko.stream.SourceRef; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; - /** * Acts as a client for {@code ThingsAggregatorActor} which responds * to a {@link RetrieveThings} command via a {@link SourceRef} which is a pointer in the cluster emitting the retrieved @@ -108,8 +107,8 @@ public static Props props(final ActorRef pubSubMediator) { @Override public Receive handleMessage() { return ReceiveBuilder.create() - .match(RetrieveThings.class, rt -> handleRetrieveThings(rt, rt)) - .match(SudoRetrieveThings.class, srt -> handleSudoRetrieveThings(srt, srt)) + .match(RetrieveThings.class, this::handleRetrieveThings) + .match(SudoRetrieveThings.class, this::handleSudoRetrieveThings) .matchAny(m -> { log.warning("Got unknown message: {}", m); unhandled(m); @@ -122,52 +121,47 @@ public void serviceUnbind(final Control serviceUnbind) { // nothing to do } - private void handleRetrieveThings(final RetrieveThings rt, final Object msgToAsk) { + private void handleRetrieveThings(final RetrieveThings rt) { final List thingIds = rt.getEntityIds(); log.withCorrelationId(rt) .info("Got '{}' message. Retrieving requested '{}' Things..", RetrieveThings.class.getSimpleName(), thingIds.size()); - - final ActorRef sender = getSender(); - askTargetActor(rt, thingIds, msgToAsk, sender); + askTargetActor(rt, thingIds, getSender()); } - private void handleSudoRetrieveThings(final SudoRetrieveThings srt, final Object msgToAsk) { + private void handleSudoRetrieveThings(final SudoRetrieveThings srt) { final List thingIds = srt.getThingIds(); log.withCorrelationId(srt) .info("Got '{}' message. Retrieving requested '{}' Things..", SudoRetrieveThings.class.getSimpleName(), thingIds.size()); - - final ActorRef sender = getSender(); - askTargetActor(srt, thingIds, msgToAsk, sender); + askTargetActor(srt, thingIds, getSender()); } - private void askTargetActor(final Command command, final List thingIds, - final Object msgToAsk, final ActorRef sender) { - - final Object tracedMsgToAsk; + private void askTargetActor(final Command command, final List thingIds, final ActorRef sender) + { + final DittoHeaders dittoHeaders = command.getDittoHeaders(); final var startedSpan = DittoTracing.newPreparedSpan( - command.getDittoHeaders(), + dittoHeaders, TRACE_AGGREGATOR_RETRIEVE_THINGS ) .tag("size", Integer.toString(thingIds.size())) .start(); - if (msgToAsk instanceof DittoHeadersSettable dittoHeadersSettable) { - tracedMsgToAsk = dittoHeadersSettable.setDittoHeaders( - DittoHeaders.of(startedSpan.propagateContext(dittoHeadersSettable.getDittoHeaders())) - ); - } else { - tracedMsgToAsk = msgToAsk; - } + final Command tracedCommand = command.setDittoHeaders( + DittoHeaders.of(startedSpan.propagateContext( + dittoHeaders.toBuilder() + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + )) + ); final DistributedPubSubMediator.Publish pubSubMsg = - DistPubSubAccess.publishViaGroup(command.getType(), tracedMsgToAsk); + DistPubSubAccess.publishViaGroup(tracedCommand.getType(), tracedCommand); withRequestCounting( Patterns.ask(pubSubMediator, pubSubMsg, Duration.ofSeconds(ASK_TIMEOUT)) .thenAccept(response -> { if (response instanceof SourceRef) { - handleSourceRef((SourceRef) response, thingIds, command, sender, startedSpan); + handleSourceRef((SourceRef) response, thingIds, tracedCommand, sender, startedSpan); } else if (response instanceof DittoRuntimeException dre) { startedSpan.tagAsFailed(dre).finish(); sender.tell(response, getSelf()); @@ -177,7 +171,7 @@ private void askTargetActor(final Command command, final List thingI response.getClass().getSimpleName(), response); final DittoInternalErrorException responseEx = DittoInternalErrorException.newBuilder() - .dittoHeaders(command.getDittoHeaders()) + .dittoHeaders(tracedCommand.getDittoHeaders()) .build(); startedSpan.tagAsFailed(responseEx).finish(); sender.tell(responseEx, getSelf()); @@ -187,7 +181,8 @@ private void askTargetActor(final Command command, final List thingI } private void handleSourceRef(final SourceRef sourceRef, final List thingIds, - final Command originatingCommand, final ActorRef originatingSender, final StartedSpan startedSpan) { + final Command originatingCommand, final ActorRef originatingSender, final StartedSpan startedSpan) + { final Function, PlainJson> thingPlainJsonSupplier; final Function, CommandResponse> overallResponseSupplier; final UnaryOperator> plainJsonSorter = supplyPlainJsonSorter(thingIds); diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java index d13624b0d5..d476e9b938 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java @@ -30,6 +30,13 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; +import org.apache.pekko.http.javadsl.model.HttpHeader; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.apache.pekko.http.javadsl.model.Uri; +import org.apache.pekko.http.javadsl.server.Complete; +import org.apache.pekko.http.javadsl.server.Route; +import org.apache.pekko.http.javadsl.server.RouteResult; import org.eclipse.ditto.base.model.common.HttpStatus; import org.eclipse.ditto.base.model.common.HttpStatusCodeOutOfRangeException; import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; @@ -43,14 +50,6 @@ import org.eclipse.ditto.internal.utils.tracing.span.SpanTagKey; import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; -import org.apache.pekko.http.javadsl.model.HttpHeader; -import org.apache.pekko.http.javadsl.model.HttpRequest; -import org.apache.pekko.http.javadsl.model.HttpResponse; -import org.apache.pekko.http.javadsl.model.Uri; -import org.apache.pekko.http.javadsl.server.Complete; -import org.apache.pekko.http.javadsl.server.Route; -import org.apache.pekko.http.javadsl.server.RouteResult; - /** * Custom Pekko Http directive tracing the request. */ @@ -116,7 +115,7 @@ private static SpanOperationName resolveSpanOperationName(final HttpRequest http private static URI getTraceUri(final HttpRequest httpRequest) { final var traceInformationGenerator = TraceInformationGenerator.getInstance(); - final var traceInformation = traceInformationGenerator.apply(String.valueOf(getRelativeUri(httpRequest))); + final var traceInformation = traceInformationGenerator.apply(getRelativeUriPath(httpRequest)); return traceInformation.getTraceUri(); } @@ -125,6 +124,12 @@ private static Uri getRelativeUri(final HttpRequest httpRequest) { return uri.toRelative(); } + private static String getRelativeUriPath(final HttpRequest httpRequest) { + final var uri = httpRequest.getUri(); + final var relativeUri = uri.toRelative(); + return relativeUri.path(); + } + private static String getRequestMethodName(final HttpRequest httpRequest) { final var httpMethod = httpRequest.method(); return httpMethod.name(); @@ -246,10 +251,7 @@ private static void addRequestResponseTags( @Nullable final CharSequence correlationId ) { startedSpan.tag(SpanTagKey.REQUEST_METHOD_NAME.getTagForValue(getRequestMethodName(httpRequest))); - @Nullable final var relativeRequestUri = tryToGetRelativeRequestUri(httpRequest, correlationId); - if (null != relativeRequestUri) { - startedSpan.tag(SpanTagKey.REQUEST_URI.getTagForValue(relativeRequestUri)); - } + startedSpan.tag(SpanTagKey.REQUEST_URI.getTagForValue(URI.create(getRelativeUri(httpRequest).toString()))); @Nullable final var httpStatus = tryToGetResponseHttpStatus(httpResponse, correlationId); if (null != httpStatus) { startedSpan.tag(SpanTagKey.HTTP_STATUS.getTagForValue(httpStatus)); diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java index 9eab428bcb..4c9652e348 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java @@ -16,6 +16,8 @@ import java.util.Collection; import java.util.concurrent.Executor; +import javax.annotation.Nullable; + import org.apache.pekko.actor.ActorSystem; import org.eclipse.ditto.gateway.service.security.authentication.AuthenticationChain; import org.eclipse.ditto.gateway.service.security.authentication.AuthenticationFailureAggregator; @@ -32,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.mongodb.lang.Nullable; import com.typesafe.config.Config; /** diff --git a/internal/models/signalenrichment/pom.xml b/internal/models/signalenrichment/pom.xml index 546413788c..34027c17e4 100755 --- a/internal/models/signalenrichment/pom.xml +++ b/internal/models/signalenrichment/pom.xml @@ -46,6 +46,10 @@ org.eclipse.ditto ditto-internal-models-signal + + org.eclipse.ditto + ditto-internal-utils-tracing + diff --git a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/ByRoundTripSignalEnrichmentFacade.java b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/ByRoundTripSignalEnrichmentFacade.java index 237ec09dbe..65bac9c234 100644 --- a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/ByRoundTripSignalEnrichmentFacade.java +++ b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/ByRoundTripSignalEnrichmentFacade.java @@ -22,20 +22,21 @@ import javax.annotation.Nullable; +import org.apache.pekko.actor.ActorSelection; +import org.apache.pekko.pattern.Patterns; import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder; import org.eclipse.ditto.base.model.json.JsonSchemaVersion; import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.internal.utils.tracing.DittoTracing; +import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName; import org.eclipse.ditto.json.JsonFieldSelector; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing; import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse; -import org.apache.pekko.actor.ActorSelection; -import org.apache.pekko.pattern.Patterns; - /** * Retrieve fixed parts of things by asking an actor. */ @@ -71,16 +72,29 @@ public CompletionStage retrievePartialThing(final ThingId thingId, final DittoHeadersBuilder dittoHeadersBuilder = dittoHeaders.toBuilder() .channel(null) .putHeader(DittoHeaderDefinition.DITTO_RETRIEVE_DELETED.getKey(), Boolean.TRUE.toString()); + if (dittoHeaders.getCorrelationId().isEmpty()) { dittoHeadersBuilder.correlationId(Optional.ofNullable(concernedSignal) .map(Signal::getDittoHeaders) .flatMap(DittoHeaders::getCorrelationId) .orElseGet(() -> UUID.randomUUID().toString()) + "-enrichment"); } - final DittoHeaders headersWithoutChannel = dittoHeadersBuilder.build(); + + final var startedSpan = DittoTracing.newPreparedSpan( + dittoHeaders, + SpanOperationName.of("retrieve_partial_thing") + ) + .start(); + if (null != jsonFieldSelector) { + startedSpan.tag("fields", jsonFieldSelector.toString()); + } final RetrieveThing command = - RetrieveThing.getBuilder(thingId, headersWithoutChannel) + RetrieveThing.getBuilder(thingId, DittoHeaders.of(startedSpan.propagateContext( + dittoHeadersBuilder + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + ))) .withSelectedFields(jsonFieldSelector) .build(); diff --git a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java index 7e393470a3..fc37e2586d 100644 --- a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java +++ b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java @@ -36,6 +36,8 @@ import org.eclipse.ditto.internal.utils.cache.config.CacheConfig; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger; +import org.eclipse.ditto.internal.utils.tracing.DittoTracing; +import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName; import org.eclipse.ditto.json.JsonFactory; import org.eclipse.ditto.json.JsonFieldSelector; import org.eclipse.ditto.json.JsonObject; @@ -124,18 +126,29 @@ private static DittoHeaders buildDittoHeadersNotAddedToCacheKey(final List dittoHeadersBuilder = DittoHeaders.newBuilder(); if (!events.isEmpty()) { dittoHeadersBuilder.correlationId( - events.get(events.size() - 1) - .getDittoHeaders() + events.getLast().getDittoHeaders() .getCorrelationId() .orElseGet(() -> UUID.randomUUID().toString()) + "-enrichment" ); } + if (atRevisionNumber > 0) { dittoHeadersBuilder .putHeader(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(atRevisionNumber)); } - return dittoHeadersBuilder.build(); + + final var startedSpan = DittoTracing.newPreparedSpan( + dittoHeadersBuilder.build(), + SpanOperationName.of("caching_enrichment_facade_retrieve_thing") + ) + .start(); + + return DittoHeaders.of(startedSpan.propagateContext( + dittoHeadersBuilder + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + )); } @Override diff --git a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractSignalEnrichmentFacadeTest.java b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractSignalEnrichmentFacadeTest.java index d3b81ce0db..cea3204ee2 100644 --- a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractSignalEnrichmentFacadeTest.java +++ b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractSignalEnrichmentFacadeTest.java @@ -24,6 +24,9 @@ import org.eclipse.ditto.base.model.entity.metadata.MetadataModelFactory; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.signals.DittoTestSystem; +import org.eclipse.ditto.internal.utils.tracing.DittoTracing; +import org.eclipse.ditto.internal.utils.tracing.config.TracingConfig; +import org.eclipse.ditto.internal.utils.tracing.filter.AcceptAllTracingFilter; import org.eclipse.ditto.json.JsonFieldSelector; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.json.JsonPointer; @@ -34,7 +37,10 @@ import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse; import org.eclipse.ditto.things.model.signals.events.AttributeModified; import org.eclipse.ditto.things.model.signals.events.ThingDeleted; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; /** * Abstract base test for different {@link SignalEnrichmentFacade} implementations. @@ -55,6 +61,20 @@ abstract class AbstractSignalEnrichmentFacadeTest { .set("type", "x attribute") .build()); + @BeforeClass + public static void beforeClass() { + final TracingConfig tracingConfigMock = Mockito.mock(TracingConfig.class); + Mockito.when(tracingConfigMock.isTracingEnabled()).thenReturn(true); + Mockito.when(tracingConfigMock.getPropagationChannel()).thenReturn("default"); + Mockito.when(tracingConfigMock.getTracingFilter()).thenReturn(AcceptAllTracingFilter.getInstance()); + DittoTracing.init(tracingConfigMock); + } + + @AfterClass + public static void afterClass() { + DittoTracing.reset(); + } + @Test public void success() { DittoTestSystem.run(this, kit -> { diff --git a/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/AbstractJsonifiableWithDittoHeadersSerializer.java b/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/AbstractJsonifiableWithDittoHeadersSerializer.java index 71cd5fe63f..034e60be6f 100755 --- a/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/AbstractJsonifiableWithDittoHeadersSerializer.java +++ b/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/AbstractJsonifiableWithDittoHeadersSerializer.java @@ -33,6 +33,7 @@ import org.apache.pekko.serialization.SerializerWithStringManifest; import org.eclipse.ditto.base.model.exceptions.DittoJsonException; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; +import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; @@ -210,7 +211,11 @@ private static JsonObject getDittoHeadersWithSpanContextAsJson( final DittoHeaders dittoHeaders, final StartedSpan startedSpan ) { - final var dittoHeadersWithSpanContext = DittoHeaders.of(startedSpan.propagateContext(dittoHeaders)); + final var dittoHeadersWithSpanContext = DittoHeaders.of(startedSpan.propagateContext( + dittoHeaders.toBuilder() + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + )); return dittoHeadersWithSpanContext.toJson(); } @@ -330,7 +335,11 @@ private Jsonifiable createJsonifiableFrom( beforeDeserializeInstant ); final var result = - deserializeJson(payload, manifest, DittoHeaders.of(startedSpan.propagateContext(dittoHeaders))); + deserializeJson(payload, manifest, DittoHeaders.of(startedSpan.propagateContext( + dittoHeaders.toBuilder() + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + ))); try { return result; } finally { diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java index cb5c45017b..3053b4156b 100755 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java @@ -507,7 +507,7 @@ private void historicalRetrieveHandleLoadSnapshotResult(final C command, entityWithEvent.revision, command ).accept(new HistoricalResultListener(sender, - entityWithEvent.event.getDittoHeaders())); + entityWithEvent.event.getDittoHeaders()), null); } else { if (!atHistoricalTimestamp.equals(Instant.EPOCH)) { sender.tell(newHistoryNotAccessibleExceptionBuilder(atHistoricalTimestamp).build(), @@ -730,31 +730,33 @@ private > void handleByStrategy(final T command, @Nullable .start(); final var tracedCommand = - command.setDittoHeaders(DittoHeaders.of(startedSpan.propagateContext(command.getDittoHeaders()))); + command.setDittoHeaders(DittoHeaders.of(startedSpan.propagateContext( + command.getDittoHeaders() + .toBuilder() + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + ))); accessCounter++; Result result; try { result = strategy.apply(getStrategyContext(), workEntity, getNextRevisionNumber(), (T) tracedCommand); - result.accept(this); + result.accept(this, startedSpan); } catch (final CompletionException | DittoRuntimeException e) { final DittoRuntimeException dittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(e, throwable -> DittoInternalErrorException.newBuilder() .dittoHeaders(tracedCommand.getDittoHeaders()) .build()); - startedSpan.tagAsFailed(e); result = ResultFactory.newErrorResult(dittoRuntimeException, tracedCommand); - result.accept(this); - } finally { - startedSpan.finish(); + result.accept(this, startedSpan); } reportSudoCommandDone(tracedCommand); } @Override public void onMutation(final Command command, final E event, final WithDittoHeaders response, - final boolean becomeCreated, final boolean becomeDeleted) { + final boolean becomeCreated, final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) { final ActorRef sender = getSender(); persistAndApplyEvent(event, (persistedEvent, resultingEntity) -> { @@ -767,6 +769,9 @@ public void onMutation(final Command command, final E event, final WithDittoH if (becomeCreated) { becomeCreatedHandler(); } + if (startedSpan != null) { + startedSpan.finish(); + } }); } @@ -775,7 +780,8 @@ public void onStagedMutation(final Command command, final CompletionStage event, final CompletionStage response, final boolean becomeCreated, - final boolean becomeDeleted) { + final boolean becomeDeleted, + @Nullable final StartedSpan startedSpan) { final ActorRef sender = getSender(); persistAndApplyEventAsync(event, (persistedEvent, resultingEntity) -> { @@ -788,6 +794,9 @@ public void onStagedMutation(final Command command, if (becomeCreated) { becomeCreatedHandler(); } + if (startedSpan != null) { + startedSpan.finish(); + } }, throwable -> { final DittoRuntimeException dittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(throwable, t -> @@ -810,7 +819,8 @@ public void onQuery(final Command command, final WithDittoHeaders response) { } @Override - public void onStagedQuery(final Command command, final CompletionStage response) { + public void onStagedQuery(final Command command, final CompletionStage response, + @Nullable final StartedSpan startedSpan) { if (command.getDittoHeaders().isResponseRequired()) { final ActorRef sender = getSender(); response.whenComplete((r, throwable) -> { @@ -819,7 +829,14 @@ public void onStagedQuery(final Command command, final CompletionStage command, final E event, final WithDittoHeaders response, - final boolean becomeCreated, final boolean becomeDeleted) { + final boolean becomeCreated, final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) { throw new UnsupportedOperationException("Mutating historical entity not supported."); } @Override public void onStagedMutation(final Command command, final CompletionStage event, final CompletionStage response, - final boolean becomeCreated, final boolean becomeDeleted) { + final boolean becomeCreated, final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) { throw new UnsupportedOperationException("Mutating historical entity not supported."); } @@ -1155,7 +1172,8 @@ public void onQuery(final Command command, final WithDittoHeaders response) { } @Override - public void onStagedQuery(final Command command, final CompletionStage response) { + public void onStagedQuery(final Command command, final CompletionStage response, + @Nullable final StartedSpan startedSpan) { if (command.getDittoHeaders().isResponseRequired()) { response.whenComplete((r, throwable) -> { if (throwable instanceof DittoRuntimeException dittoRuntimeException) { @@ -1174,6 +1192,9 @@ public void onStagedQuery(final Command command, final CompletionStage> EmptyResult getInstance() { } @Override - public void accept(final ResultVisitor visitor) { + public void accept(final ResultVisitor visitor, @Nullable final StartedSpan startedSpan) { visitor.onEmpty(); + if (startedSpan != null) { + startedSpan.finish(); + } } @Override diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ErrorResult.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ErrorResult.java index 4cf6822691..d1c3d531a9 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ErrorResult.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ErrorResult.java @@ -15,9 +15,12 @@ import java.util.concurrent.CompletionStage; import java.util.function.Function; +import javax.annotation.Nullable; + import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.signals.commands.Command; import org.eclipse.ditto.base.model.signals.events.Event; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; /** * Result signifying an error. @@ -41,8 +44,11 @@ public String toString() { } @Override - public void accept(final ResultVisitor visitor) { + public void accept(final ResultVisitor visitor, @Nullable final StartedSpan startedSpan) { visitor.onError(dittoRuntimeException, errorCausingCommand); + if (startedSpan != null) { + startedSpan.tagAsFailed(dittoRuntimeException).finish(); + } } @Override diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/MutationResult.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/MutationResult.java index afdbd71156..fa51f72704 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/MutationResult.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/MutationResult.java @@ -20,6 +20,7 @@ import org.eclipse.ditto.base.model.headers.WithDittoHeaders; import org.eclipse.ditto.base.model.signals.commands.Command; import org.eclipse.ditto.base.model.signals.events.Event; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; /** * Result that demands persistence of a mutation event. @@ -52,11 +53,11 @@ public final class MutationResult> implements Result { } @Override - public void accept(final ResultVisitor visitor) { + public void accept(final ResultVisitor visitor, @Nullable final StartedSpan startedSpan) { if (eventToPersistStage != null && responseStage != null) { - visitor.onStagedMutation(command, eventToPersistStage, responseStage, becomeCreated, becomeDeleted); + visitor.onStagedMutation(command, eventToPersistStage, responseStage, becomeCreated, becomeDeleted, startedSpan); } else { - visitor.onMutation(command, eventToPersist, response, becomeCreated, becomeDeleted); + visitor.onMutation(command, eventToPersist, response, becomeCreated, becomeDeleted, startedSpan); } } diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/QueryResult.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/QueryResult.java index 9a82284f88..92ed9064f4 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/QueryResult.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/QueryResult.java @@ -20,6 +20,7 @@ import org.eclipse.ditto.base.model.headers.WithDittoHeaders; import org.eclipse.ditto.base.model.signals.commands.Command; import org.eclipse.ditto.base.model.signals.events.Event; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; /** * Result for query commands. @@ -50,11 +51,14 @@ public String toString() { } @Override - public void accept(final ResultVisitor visitor) { + public void accept(final ResultVisitor visitor, @Nullable final StartedSpan startedSpan) { if (responseStage != null) { - visitor.onStagedQuery(command, responseStage); + visitor.onStagedQuery(command, responseStage, startedSpan); } else { visitor.onQuery(command, response); + if (startedSpan != null) { + startedSpan.finish(); + } } } diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/Result.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/Result.java index cb92d2b907..eaefad3faa 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/Result.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/Result.java @@ -15,7 +15,10 @@ import java.util.concurrent.CompletionStage; import java.util.function.Function; +import javax.annotation.Nullable; + import org.eclipse.ditto.base.model.signals.events.Event; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; /** * The result of applying the strategy to the given command. @@ -26,8 +29,9 @@ public interface Result> { * Evaluate the result by a visitor. * * @param visitor the visitor to evaluate the result, typically the persistent actor itself. + * @param startedSpan the tracing span started for the command before applying the strategy. */ - void accept(ResultVisitor visitor); + void accept(ResultVisitor visitor, @Nullable StartedSpan startedSpan); /** * Convert the result with a function. diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultVisitor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultVisitor.java index cd3322d5f8..032b6c050b 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultVisitor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultVisitor.java @@ -14,10 +14,13 @@ import java.util.concurrent.CompletionStage; +import javax.annotation.Nullable; + import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; import org.eclipse.ditto.base.model.signals.commands.Command; import org.eclipse.ditto.base.model.signals.events.Event; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; /** * Evaluator of results of command strategies. @@ -41,8 +44,10 @@ default void onEmpty() { * @param response response of the command. * @param becomeCreated whether the actor should behave as if the entity is created. * @param becomeDeleted whether the actor should behave as if the entity is deleted. + * @param startedSpan the tracing span started for the command before applying the strategy. */ - void onMutation(Command command, E event, WithDittoHeaders response, boolean becomeCreated, boolean becomeDeleted); + void onMutation(Command command, E event, WithDittoHeaders response, boolean becomeCreated, boolean becomeDeleted, + @Nullable StartedSpan startedSpan); /** * Evaluate a mutation result. @@ -52,9 +57,10 @@ default void onEmpty() { * @param response response of the command. * @param becomeCreated whether the actor should behave as if the entity is created. * @param becomeDeleted whether the actor should behave as if the entity is deleted. + * @param startedSpan the tracing span started for the command before applying the strategy. */ void onStagedMutation(Command command, CompletionStage event, CompletionStage response, - boolean becomeCreated, boolean becomeDeleted); + boolean becomeCreated, boolean becomeDeleted, @Nullable StartedSpan startedSpan); /** * Evaluate a query result. @@ -69,8 +75,10 @@ void onStagedMutation(Command command, CompletionStage event, CompletionSt * * @param command the query command. * @param response the response. + * @param startedSpan the tracing span started for the command before applying the strategy. */ - void onStagedQuery(Command command, CompletionStage response); + void onStagedQuery(Command command, CompletionStage response, + @Nullable StartedSpan startedSpan); /** * Evaluate an error result. diff --git a/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/DittoTracing.java b/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/DittoTracing.java index 5fd51b0bfb..81ea28fb67 100644 --- a/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/DittoTracing.java +++ b/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/DittoTracing.java @@ -112,7 +112,7 @@ public static StartedSpan newStartedSpanByTimer( * Resets DittoTracing to uninitialized state. * This is the inverse function of {@link DittoTracing#init(TracingConfig)}. */ - static void reset() { + public static void reset() { final var instance = getInstance(); instance.stateHolder.set(new UninitializedState(instance.stateHolder::set)); } diff --git a/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/TraceInformationGenerator.java b/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/TraceInformationGenerator.java index d5b9251f5a..0e8aecf40c 100644 --- a/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/TraceInformationGenerator.java +++ b/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/TraceInformationGenerator.java @@ -53,7 +53,7 @@ public final class TraceInformationGenerator implements Function[" + FIRST_API_VERSION + "-" + LATEST_API_VERSION + "])"; - private static final Set SUB_PATHS_TO_SHORTEN = Set.of("things", "policies", "search/things"); + private static final Set SUB_PATHS_TO_SHORTEN = Set.of("things", "policies"); private static final String PATHS_TO_SHORTEN_GROUP = "shorten"; private static final String PATHS_TO_SHORTEN_REGEX_TEMPLATE = "(?<" + PATHS_TO_SHORTEN_GROUP + ">^/(api)/" + API_VERSIONS + @@ -63,6 +63,8 @@ public final class TraceInformationGenerator implements Function URI.create(traceUri + "/" + s + SHORTENED_PATH_SUFFIX)) - .orElse(traceUri); + traceUri = getMatcherValue("subEntityType", matcher) + .map(s -> + URI.create(pathToShorten + SHORTENED_PATH_SUFFIX + "/" + s + SHORTENED_PATH_SUFFIX)) + .orElseGet(() -> URI.create(pathToShorten + SHORTENED_PATH_SUFFIX)); + sanitizedUri = traceUri; } } else { final var pathFullLength = matcher.group(PATHS_EXACT_LENGTH_GROUP); diff --git a/internal/utils/tracing/src/test/java/org/eclipse/ditto/internal/utils/tracing/TraceInformationGeneratorTest.java b/internal/utils/tracing/src/test/java/org/eclipse/ditto/internal/utils/tracing/TraceInformationGeneratorTest.java index dd3e3ebea5..a6b769ae63 100644 --- a/internal/utils/tracing/src/test/java/org/eclipse/ditto/internal/utils/tracing/TraceInformationGeneratorTest.java +++ b/internal/utils/tracing/src/test/java/org/eclipse/ditto/internal/utils/tracing/TraceInformationGeneratorTest.java @@ -60,7 +60,7 @@ public void api2FeaturePropertyUpdate() { ); assertThat(traceInformation) - .isEqualTo(TraceInformation.newInstance(traceUri, TagSet.ofTag(getRequestUriTag(sanitizedUri)))); + .isEqualTo(TraceInformation.newInstance(sanitizedUri, TagSet.ofTag(getRequestUriTag(sanitizedUri)))); } private static Tag getRequestUriTag(final URI requestUri) { @@ -76,7 +76,7 @@ public void api2FeatureDefinitionUpdate() { underTest.apply("/api/2/things/abc:1a4ed3df-308b-462e-9cfc-b78891f18c39/features/Vehicle/definition"); assertThat(traceInformation) - .isEqualTo(TraceInformation.newInstance(traceUri, TagSet.ofTag(getRequestUriTag(sanitizedUri)))); + .isEqualTo(TraceInformation.newInstance(sanitizedUri, TagSet.ofTag(getRequestUriTag(sanitizedUri)))); } @Test @@ -97,7 +97,7 @@ public void api2ThingsSearchUriIsShortened() { @Test public void api2ThingsSearchCountUriIsShortened() { - final var expectedUri = URI.create("/api/2/search/things" + TraceInformationGenerator.SHORTENED_PATH_SUFFIX); + final var expectedUri = URI.create("/api/2/search/things/count"); assertThat(underTest.apply("/api/2/search/things/count")) .isEqualTo(TraceInformation.newInstance(expectedUri, TagSet.ofTag(getRequestUriTag(expectedUri)))); diff --git a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java index dffd048f47..86444f9783 100755 --- a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java +++ b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java @@ -39,6 +39,7 @@ import org.eclipse.ditto.internal.utils.persistentactors.commands.CommandStrategy; import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext; import org.eclipse.ditto.internal.utils.persistentactors.events.EventStrategy; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; import org.eclipse.ditto.policies.api.PolicyTag; import org.eclipse.ditto.policies.model.Policy; import org.eclipse.ditto.policies.model.PolicyEntry; @@ -248,7 +249,7 @@ private void publishPolicyTag(final PolicyEvent event) { @Override public void onMutation(final Command command, final PolicyEvent event, final WithDittoHeaders response, - final boolean becomeCreated, final boolean becomeDeleted) { + final boolean becomeCreated, final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) { final ActorRef sender = getSender(); persistAndApplyEvent(event, (persistedEvent, resultingEntity) -> { @@ -261,13 +262,16 @@ public void onMutation(final Command command, final PolicyEvent event, fin if (becomeCreated) { becomeCreatedHandler(); } + if (startedSpan != null) { + startedSpan.finish(); + } }); } @Override public void onStagedMutation(final Command command, final CompletionStage> event, final CompletionStage response, - final boolean becomeCreated, final boolean becomeDeleted) { + final boolean becomeCreated, final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) { final ActorRef sender = getSender(); persistAndApplyEventAsync(event, (persistedEvent, resultingEntity) -> { @@ -280,6 +284,9 @@ public void onStagedMutation(final Command command, final CompletionStage { final DittoRuntimeException dittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(throwable, t -> diff --git a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/TopLevelPolicyActionCommandStrategy.java b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/TopLevelPolicyActionCommandStrategy.java index b01fa52ca7..236b809c31 100644 --- a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/TopLevelPolicyActionCommandStrategy.java +++ b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/TopLevelPolicyActionCommandStrategy.java @@ -35,6 +35,7 @@ import org.eclipse.ditto.internal.utils.persistentactors.results.Result; import org.eclipse.ditto.internal.utils.persistentactors.results.ResultFactory; import org.eclipse.ditto.internal.utils.persistentactors.results.ResultVisitor; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; import org.eclipse.ditto.policies.model.Policy; import org.eclipse.ditto.policies.model.PolicyEntry; import org.eclipse.ditto.policies.model.PolicyId; @@ -155,7 +156,7 @@ private static ResultCollectionVisitor collectResults( final ResultCollectionVisitor visitor = new ResultCollectionVisitor(); for (final PolicyActionCommand command : commands) { strategy.typeCheckAndApply(context, policy, nextRevision, command) - .ifPresent(result -> result.accept(visitor)); + .ifPresent(result -> result.accept(visitor, null)); } return visitor; } @@ -177,23 +178,29 @@ private Optional> aggregateEvents() { @Override public void onMutation(final Command command, final PolicyActionEvent event, final WithDittoHeaders response, final boolean becomeCreated, - final boolean becomeDeleted) { + final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) { if (firstEvent == null) { firstEvent = event; } else { otherEvents.add(event); } + if (startedSpan != null) { + startedSpan.finish(); + } } @Override public void onStagedMutation(final Command command, final CompletionStage> event, final CompletionStage response, final boolean becomeCreated, - final boolean becomeDeleted) { + final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) { if (firstEvent == null) { firstEvent = event.toCompletableFuture().join(); } else { otherEvents.add(event.thenApply(x -> (PolicyActionEvent) x).toCompletableFuture().join()); } + if (startedSpan != null) { + startedSpan.finish(); + } } @Override @@ -202,8 +209,12 @@ public void onQuery(final Command command, final WithDittoHeaders response) { } @Override - public void onStagedQuery(final Command command, final CompletionStage response) { + public void onStagedQuery(final Command command, final CompletionStage response, + @Nullable final StartedSpan startedSpan) { // do nothing + if (startedSpan != null) { + startedSpan.finish(); + } } @Override diff --git a/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/AbstractPolicyCommandStrategyTest.java b/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/AbstractPolicyCommandStrategyTest.java index 1bb02fa8ea..85e500b246 100644 --- a/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/AbstractPolicyCommandStrategyTest.java +++ b/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/AbstractPolicyCommandStrategyTest.java @@ -42,6 +42,7 @@ import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext; import org.eclipse.ditto.internal.utils.persistentactors.results.Result; import org.eclipse.ditto.internal.utils.persistentactors.results.ResultVisitor; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; import org.eclipse.ditto.policies.model.Policy; import org.eclipse.ditto.policies.model.PolicyId; import org.eclipse.ditto.policies.model.signals.commands.PolicyCommandResponse; @@ -110,10 +111,10 @@ void assertModificationResult( final ArgumentCaptor event = ArgumentCaptor.forClass(expectedEventClass); final ArgumentCaptor response = ArgumentCaptor.forClass(expectedResponseClass); final Dummy mock = Dummy.mock(); - result.accept(cast(mock)); + result.accept(cast(mock), null); verify(mock).onMutation(any(), event.capture(), response.capture(), - anyBoolean(), eq(false)); + anyBoolean(), eq(false), eq(null)); assertThat(event.getValue()).isInstanceOf(expectedEventClass); assertThat(response.getValue()).isInstanceOf(expectedResponseClass); @@ -148,7 +149,7 @@ protected static > void assertErrorResult( final DittoRuntimeException expectedException) { final Dummy mock = Dummy.mock(); - applyStrategy(underTest, getDefaultContext(), policy, command).accept(cast(mock)); + applyStrategy(underTest, getDefaultContext(), policy, command).accept(cast(mock), null); verify(mock).onError(eq(expectedException), eq(command)); } @@ -160,7 +161,8 @@ protected static > void assertExceptionIsThrown( final Dummy mock = Dummy.mock(); assertThatException() - .isThrownBy(() -> applyStrategy(underTest, getDefaultContext(), policy, command).accept(cast(mock))) + .isThrownBy(() -> applyStrategy(underTest, getDefaultContext(), policy, command).accept(cast(mock), + null)) .isEqualTo(expectedException); } @@ -173,9 +175,10 @@ private static > T assertModificationResult(final Resul final Dummy mock = Dummy.mock(); - result.accept(cast(mock)); + result.accept(cast(mock), null); - verify(mock).onMutation(any(), event.capture(), eq(expectedResponse), anyBoolean(), eq(becomeDeleted)); + verify(mock).onMutation(any(), event.capture(), eq(expectedResponse), anyBoolean(), eq(becomeDeleted), + eq(null)); assertThat(event.getValue()).isInstanceOf(eventClazz); return event.getValue(); } @@ -196,7 +199,8 @@ static > E getEvent(final Result result) { public void onMutation(final Command command, final E event, final WithDittoHeaders response, final boolean becomeCreated, - final boolean becomeDeleted) { + final boolean becomeDeleted, + @Nullable final StartedSpan startedSpan) { box.add(event); } @@ -205,7 +209,8 @@ public void onMutation(final Command command, final E event, public void onStagedMutation(final Command command, final CompletionStage event, final CompletionStage response, final boolean becomeCreated, - final boolean becomeDeleted) { + final boolean becomeDeleted, + @Nullable final StartedSpan startedSpan) { box.add(event.toCompletableFuture().join()); } @@ -216,7 +221,8 @@ public void onQuery(final Command command, final WithDittoHeaders response) { } @Override - public void onStagedQuery(final Command command, final CompletionStage response) { + public void onStagedQuery(final Command command, final CompletionStage response, + @Nullable final StartedSpan startedSpan) { throw new AssertionError("Expect mutation result, got query response: " + response); } @@ -224,7 +230,7 @@ public void onStagedQuery(final Command command, final CompletionStage errorCausingCommand) { throw new AssertionError("Expect mutation result, got error: " + error); } - }); + }, null); return box.get(0); } diff --git a/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/PolicyConflictStrategyTest.java b/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/PolicyConflictStrategyTest.java index 354faf2b0d..ad5b734cad 100644 --- a/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/PolicyConflictStrategyTest.java +++ b/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/PolicyConflictStrategyTest.java @@ -18,6 +18,8 @@ import java.util.concurrent.CompletionStage; +import javax.annotation.Nullable; + import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; @@ -29,6 +31,7 @@ import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext; import org.eclipse.ditto.internal.utils.persistentactors.results.Result; import org.eclipse.ditto.internal.utils.persistentactors.results.ResultVisitor; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; import org.eclipse.ditto.policies.model.PoliciesModelFactory; import org.eclipse.ditto.policies.model.Policy; import org.eclipse.ditto.policies.model.PolicyId; @@ -62,7 +65,7 @@ public void createConflictResultWithoutPrecondition() { mockLoggingAdapter(), ACTOR_SYSTEM_RESOURCE.getActorSystem()); final CreatePolicy command = CreatePolicy.of(policy, DittoHeaders.empty()); final Result> result = underTest.apply(context, policy, 26L, command); - result.accept(new ExpectErrorVisitor(PolicyConflictException.class)); + result.accept(new ExpectErrorVisitor(PolicyConflictException.class), null); } @Test @@ -77,7 +80,7 @@ public void createPreconditionFailedResultWithPrecondition() { .ifNoneMatch(EntityTagMatchers.fromStrings("*")) .build()); final Result> result = underTest.apply(context, policy, 26L, command); - result.accept(new ExpectErrorVisitor(PolicyPreconditionFailedException.class)); + result.accept(new ExpectErrorVisitor(PolicyPreconditionFailedException.class), null); } private static DittoDiagnosticLoggingAdapter mockLoggingAdapter() { @@ -97,14 +100,14 @@ private ExpectErrorVisitor(final Class clazz) { @Override public void onMutation(final Command command, final PolicyEvent event, final WithDittoHeaders response, final boolean becomeCreated, - final boolean becomeDeleted) { + final boolean becomeDeleted, final StartedSpan startedSpan) { throw new AssertionError("Expect error, got mutation: " + event); } @Override public void onStagedMutation(final Command command, final CompletionStage> event, final CompletionStage response, final boolean becomeCreated, - final boolean becomeDeleted) { + final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) { throw new AssertionError("Expect error, got mutation: " + event); } @@ -114,7 +117,8 @@ public void onQuery(final Command command, final WithDittoHeaders response) { } @Override - public void onStagedQuery(final Command command, final CompletionStage response) { + public void onStagedQuery(final Command command, final CompletionStage response, + @Nullable final StartedSpan startedSpan) { throw new AssertionError("Expect error, got query: " + response); } diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java index fd973e4410..146e4f5c1e 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java @@ -17,6 +17,10 @@ import javax.annotation.Nullable; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.Props; +import org.apache.pekko.japi.pf.ReceiveBuilder; +import org.apache.pekko.persistence.RecoveryCompleted; import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder; import org.eclipse.ditto.base.model.headers.DittoHeaders; @@ -34,6 +38,7 @@ import org.eclipse.ditto.internal.utils.persistentactors.events.EventStrategy; import org.eclipse.ditto.internal.utils.pubsub.DistributedPub; import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; import org.eclipse.ditto.json.JsonFactory; import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThing; import org.eclipse.ditto.things.model.Thing; @@ -53,11 +58,6 @@ import org.eclipse.ditto.things.service.persistence.actors.strategies.commands.ThingCommandStrategies; import org.eclipse.ditto.things.service.persistence.actors.strategies.events.ThingEventStrategies; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.Props; -import org.apache.pekko.japi.pf.ReceiveBuilder; -import org.apache.pekko.persistence.RecoveryCompleted; - /** * PersistentActor which "knows" the state of a single {@link Thing}. */ @@ -125,9 +125,15 @@ public void onQuery(final Command command, final WithDittoHeaders response) { } @Override - public void onStagedQuery(final Command command, final CompletionStage response) { + public void onStagedQuery(final Command command, final CompletionStage response, + @Nullable final StartedSpan startedSpan) { final ActorRef sender = getSender(); - response.thenAccept(r -> doOnQuery(command, r, sender)); + response.thenAccept(r -> { + doOnQuery(command, r, sender); + if (startedSpan != null) { + startedSpan.finish(); + } + }); } private void doOnQuery(final Command command, final WithDittoHeaders response, final ActorRef sender) { diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/AbstractCommandStrategyTest.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/AbstractCommandStrategyTest.java index a5cbd7ac97..58cefd237b 100644 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/AbstractCommandStrategyTest.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/AbstractCommandStrategyTest.java @@ -138,7 +138,7 @@ protected static > void assertErrorResult( final DittoRuntimeException expectedException) { final ResultVisitor> mock = mock(Dummy.class); - applyStrategy(underTest, getDefaultContext(), thing, command).accept(mock); + applyStrategy(underTest, getDefaultContext(), thing, command).accept(mock, null); verify(mock).onError(eq(expectedException), eq(command)); } @@ -159,7 +159,7 @@ protected static > void assertQueryResult( final Result> thingEventResult = applyStrategy(underTest, getDefaultContext(), thing, command); final ResultVisitor> mock = mock(Dummy.class); - thingEventResult.accept(mock); + thingEventResult.accept(mock, null); final ArgumentCaptor> captor = ArgumentCaptor.forClass(CommandResponse.class); verify(mock).onQuery(any(), captor.capture()); commandResponseAssertions.accept(captor.getValue()); @@ -173,7 +173,7 @@ protected static > void assertUnhandledResult( final DittoRuntimeException expectedResponse) { final ResultVisitor> mock = mock(Dummy.class); - underTest.unhandled(getDefaultContext(), thing, NEXT_REVISION, command).accept(mock); + underTest.unhandled(getDefaultContext(), thing, NEXT_REVISION, command).accept(mock, null); verify(mock).onError(eq(expectedResponse), eq(command)); } @@ -186,9 +186,10 @@ private static > T assertModificationResult(fina final ResultVisitor> mock = mock(Dummy.class); - result.accept(mock); + result.accept(mock, null); - verify(mock).onMutation(any(), event.capture(), eq(expectedResponse), anyBoolean(), eq(becomeDeleted)); + verify(mock).onMutation(any(), event.capture(), eq(expectedResponse), anyBoolean(), eq(becomeDeleted), + eq(null)); assertThat(event.getValue()).isInstanceOf(eventClazz); return event.getValue(); } @@ -203,9 +204,10 @@ private static > T assertStagedModificationResul final ResultVisitor> mock = mock(Dummy.class); - result.accept(mock); + result.accept(mock, null); - verify(mock).onStagedMutation(any(), eventStage.capture(), responseStage.capture(), anyBoolean(), eq(becomeDeleted)); + verify(mock).onStagedMutation(any(), eventStage.capture(), responseStage.capture(), anyBoolean(), eq(becomeDeleted), + eq(null)); assertThat(eventStage.getValue()).isInstanceOf(CompletionStage.class); CompletableFutureAssert.assertThatCompletionStage(eventStage.getValue()) .isCompletedWithValueMatching(t -> eventClazz.isAssignableFrom(t.getClass())); @@ -217,7 +219,7 @@ private static > T assertStagedModificationResul private static void assertInfoResult(final Result> result, final WithDittoHeaders infoResponse) { final ResultVisitor> mock = mock(Dummy.class); - result.accept(mock); + result.accept(mock, null); verify(mock).onQuery(any(), eq(infoResponse)); } diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ResultFactoryTest.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ResultFactoryTest.java index 76bf0ab8c2..e82ba1f725 100644 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ResultFactoryTest.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ResultFactoryTest.java @@ -44,7 +44,7 @@ public final class ResultFactoryTest { @Test public void notifyQueryResponse() { final Result> result = ResultFactory.newQueryResult(thingQueryCommand, response); - result.accept(mock); + result.accept(mock, null); verify(mock).onQuery(eq(thingQueryCommand), eq(response)); } @@ -52,7 +52,7 @@ public void notifyQueryResponse() { public void notifyException() { final Command command = mock(Command.class); final Result> result = ResultFactory.newErrorResult(exception, command); - result.accept(this.mock); + result.accept(this.mock, null); verify(this.mock).onError(eq(exception), eq(command)); } @@ -76,9 +76,9 @@ private void assertNotifyMutationResponse(final boolean becomeCreated, final boo ResultFactory.newMutationResult(thingModifyCommand, thingModifiedEvent, response, becomeCreated, becomeDeleted); - result.accept(mock); + result.accept(mock, null); verify(mock).onMutation(eq(thingModifyCommand), eq(thingModifiedEvent), eq(response), eq(becomeCreated), - eq(becomeDeleted)); + eq(becomeDeleted), eq(null)); } interface Dummy extends ResultVisitor> { diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ThingConflictStrategyTest.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ThingConflictStrategyTest.java index 2909b153e2..7dfa78758c 100644 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ThingConflictStrategyTest.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ThingConflictStrategyTest.java @@ -18,6 +18,8 @@ import java.util.concurrent.CompletionStage; +import javax.annotation.Nullable; + import org.apache.pekko.actor.ActorSystem; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.headers.DittoHeaders; @@ -30,6 +32,7 @@ import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext; import org.eclipse.ditto.internal.utils.persistentactors.results.Result; import org.eclipse.ditto.internal.utils.persistentactors.results.ResultVisitor; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; import org.eclipse.ditto.things.model.Thing; import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.ThingsModelFactory; @@ -62,7 +65,7 @@ public void createConflictResultWithoutPrecondition() { mockLoggingAdapter(), ACTOR_SYSTEM_RESOURCE.getActorSystem()); final CreateThing command = CreateThing.of(thing, null, DittoHeaders.empty()); final Result> result = underTest.apply(context, thing, 26L, command); - result.accept(new ExpectErrorVisitor(ThingConflictException.class)); + result.accept(new ExpectErrorVisitor(ThingConflictException.class), null); } @Test @@ -77,7 +80,7 @@ public void createPreconditionFailedResultWithPrecondition() { .ifNoneMatch(EntityTagMatchers.fromStrings("*")) .build()); final Result> result = underTest.apply(context, thing, 26L, command); - result.accept(new ExpectErrorVisitor(ThingPreconditionFailedException.class)); + result.accept(new ExpectErrorVisitor(ThingPreconditionFailedException.class), null); } private static DittoDiagnosticLoggingAdapter mockLoggingAdapter() { @@ -97,14 +100,14 @@ private ExpectErrorVisitor(final Class clazz) { @Override public void onMutation(final Command command, final ThingEvent event, final WithDittoHeaders response, final boolean becomeCreated, - final boolean becomeDeleted) { + final boolean becomeDeleted, final StartedSpan startedSpan) { throw new AssertionError("Expect error, got mutation: " + event); } @Override public void onStagedMutation(final Command command, final CompletionStage> event, final CompletionStage response, final boolean becomeCreated, - final boolean becomeDeleted) { + final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) { throw new AssertionError("Expect error, got mutation: " + event); } @@ -114,7 +117,8 @@ public void onQuery(final Command command, final WithDittoHeaders response) { } @Override - public void onStagedQuery(final Command command, final CompletionStage response) { + public void onStagedQuery(final Command command, final CompletionStage response, + @Nullable final StartedSpan startedSpan) { throw new AssertionError("Expect error, got query: " + response); } diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/SearchActor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/SearchActor.java index 5ad7c4108d..982a3312b7 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/SearchActor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/SearchActor.java @@ -45,9 +45,9 @@ import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException; import org.eclipse.ditto.base.model.exceptions.DittoJsonException; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; +import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; -import org.eclipse.ditto.base.model.json.JsonSchemaVersion; import org.eclipse.ditto.base.model.signals.Signal; import org.eclipse.ditto.base.model.signals.commands.Command; import org.eclipse.ditto.base.service.signaltransformer.SignalTransformer; @@ -62,6 +62,7 @@ import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger; import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.internal.utils.tracing.DittoTracing; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; import org.eclipse.ditto.json.JsonArray; import org.eclipse.ditto.json.JsonCollectors; import org.eclipse.ditto.json.JsonObject; @@ -118,7 +119,6 @@ public final class SearchActor extends AbstractActorWithShutdownBehaviorAndReque private static final String QUERY_PARSING_SEGMENT_NAME = "query_parsing"; private static final String DATABASE_ACCESS_SEGMENT_NAME = "database_access"; private static final String QUERY_TYPE_TAG = "query_type"; - private static final String API_VERSION_TAG = "api_version"; private static final Map NAMESPACE_INSPECTION_LOGGERS = new HashMap<>(); @@ -297,20 +297,27 @@ private > CompletionStage executeCount(final T coun final ActorRef sender) { final var dittoHeaders = countCommand.getDittoHeaders(); - final JsonSchemaVersion version = countCommand.getImplementedSchemaVersion(); final var queryType = "count"; - final StartedTimer countTimer = startNewTimer(version, queryType, countCommand); + final SpanWithTimer spanWithTimer = startNewTimer(queryType, countCommand); + final StartedTimer countTimer = spanWithTimer.startedTimer; final StartedTimer queryParsingTimer = countTimer.startNewSegment(QUERY_PARSING_SEGMENT_NAME); + @SuppressWarnings("unchecked") + final T tracedCountCommand = (T) countCommand.setDittoHeaders( + DittoHeaders.of(spanWithTimer.startedSpan.propagateContext(dittoHeaders.toBuilder() + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + ))); + final Source countThingsResponseSource = - createQuerySource(queryParseFunction, countCommand) + createQuerySource(queryParseFunction, tracedCountCommand) .flatMapConcat(query -> { stopTimer(queryParsingTimer); final StartedTimer databaseAccessTimer = countTimer.startNewSegment(DATABASE_ACCESS_SEGMENT_NAME); final Source countResultSource = - DittoJsonException.wrapJsonRuntimeException(query, countCommand.getDittoHeaders(), + DittoJsonException.wrapJsonRuntimeException(query, tracedCountCommand.getDittoHeaders(), (theQuery, headers) -> isSudo ? searchPersistence.sudoCount(theQuery) : searchPersistence.count(theQuery, @@ -327,7 +334,7 @@ private > CompletionStage executeCount(final T coun }); final Source replySourceWithErrorHandling = - countThingsResponseSource.via(stopTimerAndHandleError(countTimer, countCommand)); + countThingsResponseSource.via(stopTimerAndHandleError(countTimer, tracedCountCommand)); final var replyFuture = replySourceWithErrorHandling.runWith(Sink.head(), SystemMaterializer.get(getSystem()).materializer()); @@ -339,28 +346,34 @@ private CompletionStage performStream(final StreamThings streamThings, f final ThreadSafeDittoLoggingAdapter l) { final var queryType = "query"; // same as queryThings - final var searchTimer = - startNewTimer(streamThings.getImplementedSchemaVersion(), queryType, streamThings); + final SpanWithTimer spanWithTimer = startNewTimer(queryType, streamThings); + final var searchTimer = spanWithTimer.startedTimer; final var queryParsingTimer = searchTimer.startNewSegment(QUERY_PARSING_SEGMENT_NAME); final var namespaces = streamThings.getNamespaces().orElse(null); + final StreamThings tracedStreamThings = streamThings.setDittoHeaders( + DittoHeaders.of(spanWithTimer.startedSpan.propagateContext(streamThings.getDittoHeaders().toBuilder() + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + ))); + final Source, NotUsed> thingIdSourceRefSource = - ThingsSearchCursor.extractCursor(streamThings).flatMapConcat(cursor -> { + ThingsSearchCursor.extractCursor(tracedStreamThings).flatMapConcat(cursor -> { cursor.ifPresent(c -> c.logCursorCorrelationId(l)); - return createQuerySource(queryParser::parse, streamThings).map(parsedQuery -> { + return createQuerySource(queryParser::parse, tracedStreamThings).map(parsedQuery -> { final var query = ThingsSearchCursor.adjust(cursor, parsedQuery, queryParser.getCriteriaFactory()); stopTimer(queryParsingTimer); searchTimer.startNewSegment( DATABASE_ACCESS_SEGMENT_NAME); // segment stopped by stopTimerAndHandleError final List subjectIds = - streamThings.getDittoHeaders() + tracedStreamThings.getDittoHeaders() .getAuthorizationContext() .getAuthorizationSubjectIds(); final Source findAllUnlimitedResult = - DittoJsonException.wrapJsonRuntimeException(query, streamThings.getDittoHeaders(), + DittoJsonException.wrapJsonRuntimeException(query, tracedStreamThings.getDittoHeaders(), (theQuery, headers) -> searchPersistence.findAllUnlimited(theQuery, subjectIds, namespaces) ); @@ -374,7 +387,7 @@ private CompletionStage performStream(final StreamThings streamThings, f final Source replySourceWithErrorHandling = thingIdSourceRefSource.via(streamKillSwitch.flow()) - .via(stopTimerAndHandleError(searchTimer, streamThings)); + .via(stopTimerAndHandleError(searchTimer, tracedStreamThings)); final var replyFuture = replySourceWithErrorHandling.runWith(Sink.head(), SystemMaterializer.get(getSystem()).materializer()); @@ -453,19 +466,25 @@ private CompletionStage performQuery(final QueryThings queryThings, fina l.debug("Starting to process QueryThings command: {}", queryThings); final var queryType = "query"; - final var searchTimer = - startNewTimer(queryThings.getImplementedSchemaVersion(), queryType, queryThings); + final SpanWithTimer spanWithTimer = startNewTimer(queryType, queryThings); + final var searchTimer = spanWithTimer.startedTimer; final var queryParsingTimer = searchTimer.startNewSegment(QUERY_PARSING_SEGMENT_NAME); final var namespaces = queryThings.getNamespaces().orElse(null); + final QueryThings tracedQueryThings = queryThings.setDittoHeaders( + DittoHeaders.of(spanWithTimer.startedSpan.propagateContext(queryThings.getDittoHeaders().toBuilder() + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + ))); + final Source queryThingsResponseSource = - ThingsSearchCursor.extractCursor(queryThings, getSystem()).flatMapConcat(cursor -> { + ThingsSearchCursor.extractCursor(tracedQueryThings, getSystem()).flatMapConcat(cursor -> { cursor.ifPresent(c -> c.logCursorCorrelationId(l)); - final QueryThings command = ThingsSearchCursor.adjust(cursor, queryThings); + final QueryThings command = ThingsSearchCursor.adjust(cursor, tracedQueryThings); final var dittoHeaders = command.getDittoHeaders(); l.info("Processing QueryThings command with namespaces <{}> and filter: <{}>", - queryThings.getNamespaces(), queryThings.getFilter()); - l.debug("Processing QueryThings command: <{}>", queryThings); + tracedQueryThings.getNamespaces(), tracedQueryThings.getFilter()); + l.debug("Processing QueryThings command: <{}>", tracedQueryThings); return createQuerySource(queryParser::parse, command) .flatMapConcat(parsedQuery -> { @@ -477,7 +496,7 @@ private CompletionStage performQuery(final QueryThings queryThings, fina final StartedTimer databaseAccessTimer = searchTimer.startNewSegment(DATABASE_ACCESS_SEGMENT_NAME); - final boolean isSudo = queryThings.getDittoHeaders() + final boolean isSudo = tracedQueryThings.getDittoHeaders() .isSudo(); final List subjectIds = isSudo ? null @@ -499,7 +518,7 @@ private CompletionStage performQuery(final QueryThings queryThings, fina }); final Source replySourceWithErrorHandling = - queryThingsResponseSource.via(stopTimerAndHandleError(searchTimer, queryThings)); + queryThingsResponseSource.via(stopTimerAndHandleError(searchTimer, tracedQueryThings)); final var replyFuture = replySourceWithErrorHandling.runWith(Sink.head(), SystemMaterializer.get(getSystem()).materializer()); @@ -561,15 +580,15 @@ private static Source createQuerySource(final Function computeEventMetadata(final ThingEvent thingEvent, .tag(ConsistencyLag.TAG_SHOULD_ACK, Boolean.toString(shouldAcknowledge)) .onExpiration(t -> l.warning("Timer measuring consistency lag timed out for event <{}>", thingEvent)) .start(); - DittoTracing.newStartedSpanByTimer(thingEvent.getDittoHeaders(), startedTimer); + final StartedSpan startedSpan = DittoTracing.newStartedSpanByTimer(thingEvent.getDittoHeaders(), startedTimer); ConsistencyLag.startS1InUpdater(startedTimer); - final var metadata = exportMetadataWithSender(shouldAcknowledge, thingEvent, getAckRecipient( - thingEvent.getDittoHeaders()), startedTimer, data) + final var tracedEvent = thingEvent.setDittoHeaders(DittoHeaders.of(startedSpan.propagateContext( + thingEvent.getDittoHeaders().toBuilder() + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + ))); + final var metadata = exportMetadataWithSender(shouldAcknowledge, tracedEvent, getAckRecipient( + tracedEvent.getDittoHeaders()), startedTimer, data) .withUpdateReason(UpdateReason.THING_UPDATE); return Optional.of(metadata); diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java index ed23a869f8..02add8e1f9 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java @@ -35,6 +35,9 @@ import org.apache.pekko.testkit.javadsl.TestKit; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.json.FieldType; +import org.eclipse.ditto.internal.utils.tracing.DittoTracing; +import org.eclipse.ditto.internal.utils.tracing.config.TracingConfig; +import org.eclipse.ditto.internal.utils.tracing.filter.AcceptAllTracingFilter; import org.eclipse.ditto.json.JsonArray; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.json.JsonPointer; @@ -64,10 +67,13 @@ import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingDeleteModel; import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingWriteModel; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import org.mockito.Mockito; import com.typesafe.config.ConfigFactory; @@ -85,6 +91,19 @@ public final class EnforcementFlowTest { private TestPublisher.Probe> sourceProbe; private TestSubscriber.Probe> sinkProbe; + @BeforeClass + public static void beforeClass() { + final TracingConfig tracingConfigMock = Mockito.mock(TracingConfig.class); + Mockito.when(tracingConfigMock.isTracingEnabled()).thenReturn(true); + Mockito.when(tracingConfigMock.getPropagationChannel()).thenReturn("default"); + Mockito.when(tracingConfigMock.getTracingFilter()).thenReturn(AcceptAllTracingFilter.getInstance()); + DittoTracing.init(tracingConfigMock); + } + + @AfterClass + public static void afterClass() { + DittoTracing.reset(); + } @Before public void init() { system = ActorSystem.create("test", ConfigFactory.load("actors-test.conf"));