From fd8217ae269a1d36fc09926000f0a6fdbb4d15eb Mon Sep 17 00:00:00 2001 From: James M Snell Date: Fri, 25 Oct 2024 22:02:56 -0700 Subject: [PATCH 1/2] Implement SpanContext for user tracing / streaming tail workers --- src/workerd/io/BUILD.bazel | 1 + src/workerd/io/trace-test.c++ | 50 +++++++++++++ src/workerd/io/trace.c++ | 67 +++++++++++++++++ src/workerd/io/trace.h | 101 ++++++++++++++++++++++++++ src/workerd/io/worker-interface.capnp | 10 +++ 5 files changed, 229 insertions(+) diff --git a/src/workerd/io/BUILD.bazel b/src/workerd/io/BUILD.bazel index de35dfc0d2b..91c7a7c3fa7 100644 --- a/src/workerd/io/BUILD.bazel +++ b/src/workerd/io/BUILD.bazel @@ -334,5 +334,6 @@ kj_test( src = "trace-test.c++", deps = [ ":trace", + "//src/workerd/util:thread-scopes", ], ) diff --git a/src/workerd/io/trace-test.c++ b/src/workerd/io/trace-test.c++ index 045f03081dc..974f9c2cc24 100644 --- a/src/workerd/io/trace-test.c++ +++ b/src/workerd/io/trace-test.c++ @@ -3,7 +3,9 @@ // https://opensource.org/licenses/Apache-2.0 #include +#include +#include #include namespace workerd::tracing { @@ -62,5 +64,53 @@ KJ_TEST("can write trace ID protobuf format") { "\xfe\xdc\xba\x98\x76\x54\x32\x12\xfe\xdc\xba\x98\x76\x54\x32\x11"_kjb); } +KJ_TEST("InvocationSpanContext") { + setPredictableModeForTest(); + auto sc = InvocationSpanContext::newForInvocation(); + + // We can create an InvocationSpanContext... + static constexpr auto kCheck = TraceId(0x2a2a2a2a2a2a2a2a, 0x2a2a2a2a2a2a2a2a); + KJ_EXPECT(sc->getTraceId() == kCheck); + KJ_EXPECT(sc->getInvocationId() == kCheck); + KJ_EXPECT(sc->getSpanId() == 0); + + // And serialize that to a capnp struct... + capnp::MallocMessageBuilder builder; + auto root = builder.initRoot(); + sc->toCapnp(root); + + // Then back again... + auto sc2 = KJ_ASSERT_NONNULL(InvocationSpanContext::fromCapnp(root.asReader())); + KJ_EXPECT(sc2->getTraceId() == kCheck); + KJ_EXPECT(sc2->getInvocationId() == kCheck); + KJ_EXPECT(sc2->getSpanId() == 0); + KJ_EXPECT(sc2->isTrigger()); + + // The one that has been deserialized from capnp cannot create children... + try { + sc2->newChild(); + KJ_FAIL_ASSERT("should not be able to create child span with SpanContext from capnp"); + } catch (kj::Exception& ex) { + KJ_EXPECT(ex.getDescription() == + "expected counter != nullptr; unable to create child spans on this context"_kj); + } + + auto sc3 = sc->newChild(); + KJ_EXPECT(sc3->getTraceId() == kCheck); + KJ_EXPECT(sc3->getInvocationId() == kCheck); + KJ_EXPECT(sc3->getSpanId() == 1); + + auto sc4 = InvocationSpanContext::newForInvocation(sc2); + KJ_EXPECT(sc4->getTraceId() == kCheck); + KJ_EXPECT(sc4->getInvocationId() == kCheck); + KJ_EXPECT(sc4->getSpanId() == 0); + + auto& sc5 = KJ_ASSERT_NONNULL(sc4->getParent()); + KJ_EXPECT(sc5->getTraceId() == kCheck); + KJ_EXPECT(sc5->getInvocationId() == kCheck); + KJ_EXPECT(sc5->getSpanId() == 0); + KJ_EXPECT(sc5->isTrigger()); +} + } // namespace } // namespace workerd::tracing diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 22860300422..3c280371115 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -155,6 +155,73 @@ kj::String KJ_STRINGIFY(const TraceId& id) { return id; } +InvocationSpanContext::InvocationSpanContext(kj::Badge, + kj::Maybe> counter, + TraceId traceId, + TraceId invocationId, + kj::uint spanId, + kj::Maybe> parentSpanContext) + : counter(kj::mv(counter)), + traceId(kj::mv(traceId)), + invocationId(kj::mv(invocationId)), + spanId(spanId), + parentSpanContext(kj::mv(parentSpanContext)) {} + +kj::Rc InvocationSpanContext::newChild() { + auto& c = KJ_ASSERT_NONNULL(counter, "unable to create child spans on this context"); + return kj::rc(kj::Badge(), c.addRef(), traceId, + invocationId, c->next(), addRefToThis()); +} + +kj::Rc InvocationSpanContext::newForInvocation( + kj::Maybe&> triggerContext, + kj::Maybe entropySource) { + kj::Maybe> parent; + auto traceId = triggerContext + .map([&](kj::Rc& ctx) { + parent = ctx.addRef(); + return ctx->traceId; + }).orDefault([&] { return TraceId::fromEntropy(entropySource); }); + return kj::rc(kj::Badge(), kj::rc(), + kj::mv(traceId), TraceId::fromEntropy(entropySource), 0, kj::mv(parent)); +} + +TraceId TraceId::fromCapnp(rpc::InvocationSpanContext::TraceId::Reader reader) { + return TraceId(reader.getLow(), reader.getHigh()); +} + +void TraceId::toCapnp(rpc::InvocationSpanContext::TraceId::Builder writer) const { + writer.setLow(low); + writer.setHigh(high); +} + +kj::Maybe> InvocationSpanContext::fromCapnp( + rpc::InvocationSpanContext::Reader reader) { + if (!reader.hasTraceId() || !reader.hasInvocationId()) { + // If the reaer does not have a traceId or invocationId field then it is + // invalid and we will just ignore it. + return kj::none; + } + + auto sc = kj::rc(kj::Badge(), kj::none, + TraceId::fromCapnp(reader.getTraceId()), TraceId::fromCapnp(reader.getInvocationId()), + reader.getSpanId()); + // If the traceId or invocationId are invalid, then we'll ignore them. + if (!sc->getTraceId() || !sc->getInvocationId()) return kj::none; + return kj::mv(sc); +} + +void InvocationSpanContext::toCapnp(rpc::InvocationSpanContext::Builder writer) const { + traceId.toCapnp(writer.initTraceId()); + invocationId.toCapnp(writer.initInvocationId()); + writer.setSpanId(spanId); + kj::mv(getParent()); +} + +kj::String KJ_STRINGIFY(const kj::Rc& context) { + return kj::str(context->getTraceId(), "-", context->getInvocationId(), "-", context->getSpanId()); +} + } // namespace tracing // Approximately how much external data we allow in a trace before we start ignoring requests. We diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index 61ecf9c6297..50193ea7ed2 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -109,13 +109,114 @@ class TraceId final { return high; } + static TraceId fromCapnp(rpc::InvocationSpanContext::TraceId::Reader reader); + void toCapnp(rpc::InvocationSpanContext::TraceId::Builder writer) const; + private: uint64_t low = 0; uint64_t high = 0; }; constexpr TraceId TraceId::nullId = nullptr; +// The InvocationSpanContext is a tuple of a trace id, invocation id, and span id. +// The trace id represents a top-level request and should be shared across all +// invocation spans and events within those spans. The invocation id identifies +// a specific worker invocation. The span id identifies a specific span within an +// invocation. Every invocation of every worker should have an InvocationSpanContext. +// That may or may not have a trigger InvocationSpanContext. +class InvocationSpanContext final: public kj::Refcounted, + public kj::EnableAddRefToThis { +public: + // Spans within a InvocationSpanContext are identified by a span id that is a + // monotically increasing number. Every InvocationSpanContext has a root span + // whose ID is zero. Every child span context created within that context will + // have a span id that is one greater than the previously created one. + class SpanIdCounter final: public kj::Refcounted { + public: + SpanIdCounter() = default; + KJ_DISALLOW_COPY_AND_MOVE(SpanIdCounter); + + inline kj::uint next() { + static constexpr kj::uint kMax = kj::maxValue; + KJ_ASSERT(id < kMax, "max number of spans exceeded"); + return id++; + } + + private: + kj::uint id = 1; + }; + + // The constructor is public only so kj::rc can see it and create a new instance. + // User code should use the static factory methods or the newChild method. + InvocationSpanContext(kj::Badge, + kj::Maybe> counter, + TraceId traceId, + TraceId invocationId, + kj::uint spanId = 0, + kj::Maybe> parentSpanContext = kj::none); + KJ_DISALLOW_COPY_AND_MOVE(InvocationSpanContext); + + inline const TraceId& getTraceId() const { + return traceId; + } + + inline const TraceId& getInvocationId() const { + return invocationId; + } + + inline const kj::uint getSpanId() const { + return spanId; + } + + inline const kj::Maybe>& getParent() const { + return parentSpanContext; + } + + // Creates a new child span. If the current context does not have a counter, + // then this will assert. If isTrigger() is true then it will not have a + // counter. + kj::Rc newChild(); + + // An InvocationSpanContext is a trigger context if it has no counter. This + // generally means the SpanContext was create from a capnp message and + // represents an InvocationSpanContext that was propagated from a parent + // or triggering context. + bool isTrigger() const { + return counter == kj::none; + } + + // Creates a new InvocationSpanContext. If the triggerContext is given, then its + // traceId is used as the traceId for the newly created context. Otherwise a new + // traceId is generated. The invocationId is always generated new and the spanId + // will be 0 with no parent span. + static kj::Rc newForInvocation( + kj::Maybe&> triggerContext = kj::none, + kj::Maybe entropySource = kj::none); + + // Creates a new InvocationSpanContext from a capnp message. The returned + // InvocationSpanContext will not be capable of creating child spans and + // is considered only a "trigger" span. + static kj::Maybe> fromCapnp( + rpc::InvocationSpanContext::Reader reader); + void toCapnp(rpc::InvocationSpanContext::Builder writer) const; + +private: + // If there is no counter, then child spans cannot be created from + // this InvocationSpanContext. + kj::Maybe> counter; + const TraceId traceId; + const TraceId invocationId; + const kj::uint spanId; + + // The parentSpanContext can be either a direct parent or a trigger + // context. If it is a trigger context, then it should have the same + // traceId but a different invocationId (unless predictable mode for + // testing is enabled). The isTrigger() should also return true. + const kj::Maybe> parentSpanContext; +}; + kj::String KJ_STRINGIFY(const TraceId& id); +kj::String KJ_STRINGIFY(const kj::Rc& context); } // namespace tracing enum class PipelineLogLevel { diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 0875f0f4ecb..10c8b4a60e2 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -15,6 +15,16 @@ using import "/capnp/compat/byte-stream.capnp".ByteStream; using import "/workerd/io/outcome.capnp".EventOutcome; using import "/workerd/io/script-version.capnp".ScriptVersion; +struct InvocationSpanContext { + struct TraceId { + high @0 :UInt64; + low @1 :UInt64; + } + traceId @0 :TraceId; + invocationId @1 :TraceId; + spanId @2 :UInt32; +} + struct Trace @0x8e8d911203762d34 { logs @0 :List(Log); struct Log { From b59843c6a183a18824e3e2a7bd14bdf36a34c0ec Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 30 Oct 2024 15:39:02 -0700 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Yagiz Nizipli --- src/workerd/io/trace.c++ | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 3c280371115..65d60193764 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -198,7 +198,7 @@ void TraceId::toCapnp(rpc::InvocationSpanContext::TraceId::Builder writer) const kj::Maybe> InvocationSpanContext::fromCapnp( rpc::InvocationSpanContext::Reader reader) { if (!reader.hasTraceId() || !reader.hasInvocationId()) { - // If the reaer does not have a traceId or invocationId field then it is + // If the reader does not have a traceId or invocationId field then it is // invalid and we will just ignore it. return kj::none; }