Skip to content

Introduce BraveRpcService #6115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions brave/brave5/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
dependencies {
api libs.brave5
api libs.brave5.instrumentation.http
optionalImplementation libs.brave5.instrumentation.rpc

if (project.ext.targetJavaVersion >= 11) {
testImplementation project(':thrift0.18')
Expand Down
1 change: 1 addition & 0 deletions brave/brave6/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
dependencies {
api libs.brave6
api libs.brave6.instrumentation.http
optionalImplementation libs.brave6.instrumentation.rpc

if (project.ext.targetJavaVersion >= 11) {
testImplementation project(':thrift0.18')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2025 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.server.brave;

import static com.linecorp.armeria.server.brave.ArmeriaServerParser.annotateWireSpan;
import static com.linecorp.armeria.server.brave.BraveService.SERVICE_REQUEST_DECORATING_SCOPE;

import com.linecorp.armeria.common.Request;
import com.linecorp.armeria.common.Response;
import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.internal.common.RequestContextExtension;
import com.linecorp.armeria.server.Service;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.SimpleDecoratingService;
import com.linecorp.armeria.server.TransientServiceOption;

import brave.Span;
import brave.Tracer;
import brave.Tracer.SpanInScope;

abstract class AbstractBraveService<BI extends brave.Request, BO extends brave.Response,
I extends Request, O extends Response> extends SimpleDecoratingService<I, O> {

private final Tracer tracer;
private final RequestContextCurrentTraceContext currentTraceContext;

/**
* Creates a new instance that decorates the specified {@link Service}.
*/
protected AbstractBraveService(Service<I, O> delegate, Tracer tracer,
RequestContextCurrentTraceContext currentTraceContext) {
super(delegate);
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
}

@Override
public final O serve(ServiceRequestContext ctx, I req) throws Exception {
if (!ctx.config().transientServiceOptions().contains(TransientServiceOption.WITH_TRACING)) {
return unwrap().serve(ctx, req);
}
final BI braveReq = braveRequest(ctx);
final Span span = handleReceive(braveReq);

final RequestContextExtension ctxExtension = ctx.as(RequestContextExtension.class);
if (currentTraceContext.scopeDecoratorAdded() && !span.isNoop() && ctxExtension != null) {
// Run the scope decorators when the ctx is pushed to the thread local.
ctxExtension.hook(() -> currentTraceContext.decorateScope(span.context(),
SERVICE_REQUEST_DECORATING_SCOPE));
}

maybeAddTagsToSpan(ctx, braveReq, span);
try (SpanInScope ignored = tracer.withSpanInScope(span)) {
return unwrap().serve(ctx, req);
}
}

abstract BI braveRequest(ServiceRequestContext ctx);

abstract BO braveResponse(ServiceRequestContext ctx, RequestLog log, BI braveReq);

abstract Span handleReceive(BI braveReq);

abstract void handleSend(BO response, Span span);

void maybeAddTagsToSpan(ServiceRequestContext ctx, BI braveReq, Span span) {
if (span.isNoop()) {
// For no-op spans, nothing special to do.
return;
}

ctx.log().whenComplete().thenAccept(log -> {
annotateWireSpan(log, span);
final BO braveRes = braveResponse(ctx, log, braveReq);
handleSend(braveRes, span);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,26 @@

package com.linecorp.armeria.server.brave;

import com.linecorp.armeria.common.RpcRequest;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.internal.common.brave.SpanTags;
import com.linecorp.armeria.server.ServiceRequestContext;

import brave.Request;
import brave.Response;
import brave.Span;
import brave.SpanCustomizer;
import brave.http.HttpRequestParser;
import brave.http.HttpResponse;
import brave.http.HttpResponseParser;
import brave.propagation.TraceContext;

/**
* Default implementation of {@link HttpRequestParser} and {@link HttpResponseParser} for servers.
* This parser adds some custom tags and overwrites the name of span if {@link RequestLog#requestContent()}
* is {@link RpcRequest}.
* The following tags become available:
* <ul>
* <li>http.url</li>
* <li>http.host</li>
* <li>http.protocol</li>
* <li>http.serfmt</li>
* <li>address.remote</li>
* <li>address.local</li>
* </ul>
*/
final class ArmeriaHttpServerParser implements HttpRequestParser, HttpResponseParser {

private static final ArmeriaHttpServerParser INSTANCE = new ArmeriaHttpServerParser();

static ArmeriaHttpServerParser get() {
return INSTANCE;
}
final class ArmeriaServerParser {

private ArmeriaHttpServerParser() {
private ArmeriaServerParser() {
}

@Override
public void parse(brave.http.HttpRequest request, TraceContext context, SpanCustomizer span) {
HttpRequestParser.DEFAULT.parse(request, context, span);

final Object unwrapped = request.unwrap();
static void parseRequest(Request req, TraceContext context, SpanCustomizer span) {
final Object unwrapped = req.unwrap();
if (!(unwrapped instanceof ServiceRequestContext)) {
return;
}

final ServiceRequestContext ctx = (ServiceRequestContext) unwrapped;
span.tag(SpanTags.TAG_HTTP_HOST, ctx.request().authority())
.tag(SpanTags.TAG_HTTP_URL, ctx.request().uri().toString())
Expand All @@ -69,16 +44,12 @@ public void parse(brave.http.HttpRequest request, TraceContext context, SpanCust
.tag(SpanTags.TAG_ADDRESS_LOCAL, ctx.localAddress().toString());
}

@Override
public void parse(HttpResponse response, TraceContext context, SpanCustomizer span) {
HttpResponseParser.DEFAULT.parse(response, context, span);

final Object res = response.unwrap();
if (!(res instanceof ServiceRequestContext)) {
static void parseResponse(Response res, TraceContext context, SpanCustomizer span) {
final Object unwrapped = res.unwrap();
if (!(unwrapped instanceof ServiceRequestContext)) {
return;
}

final ServiceRequestContext ctx = (ServiceRequestContext) res;
final ServiceRequestContext ctx = (ServiceRequestContext) unwrapped;
final RequestLog requestLog = ctx.log().ensureComplete();
final String serFmt = ServiceRequestContextAdapter.serializationFormat(requestLog);
if (serFmt != null) {
Expand All @@ -90,4 +61,18 @@ public void parse(HttpResponse response, TraceContext context, SpanCustomizer sp
span.name(name);
}
}

static void annotateWireSpan(RequestLog log, Span span) {
span.start(log.requestStartTimeMicros());
final Long wireReceiveTimeNanos = log.requestFirstBytesTransferredTimeNanos();
assert wireReceiveTimeNanos != null;
SpanTags.logWireReceive(span, wireReceiveTimeNanos, log);

final Long wireSendTimeNanos = log.responseFirstBytesTransferredTimeNanos();
if (wireSendTimeNanos != null) {
SpanTags.logWireSend(span, wireSendTimeNanos, log);
} else {
// If the client timed-out the request, we will have never sent any response data at all.
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2025 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.server.brave;

import com.linecorp.armeria.common.annotation.UnstableApi;

import brave.http.HttpRequestParser;
import brave.http.HttpResponseParser;

/**
* Provides Armeria's default implementation for server-side HTTP request and response parsers.
* Users may use the default parser like the following:
* <pre>{@code
* Tracing tracing = ...
* HttpTracing httpTracing =
* HttpTracing.newBuilder(tracing)
* .serverRequestParser((req, ctx, span) -> {
* // Apply Brave's default request parser
* HttpRequestParser.DEFAULT.parse(req, ctx, span);
* // Apply Armeria's default request parser
* BraveHttpServerParsers.requestParser().parse(req, ctx, span);
* })
* .serverResponseParser((res, ctx, span) -> {
* // Apply Brave's default response parser
* HttpResponseParser.DEFAULT.parse(res, ctx, span);
* // Apply Armeria's default response parser
* BraveHttpServerParsers.responseParser().parse(res, ctx, span);
* });
* BraveService
* .newDecorator(httpTracing)
* ...
* }</pre>
* The following tags will be available by default:
* <ul>
* <li>http.url</li>
* <li>http.host</li>
* <li>http.protocol</li>
* <li>http.serfmt</li>
* <li>address.remote</li>
* <li>address.local</li>
* </ul>
*/
@UnstableApi
public final class BraveHttpServerParsers {

private static final HttpRequestParser defaultRequestParser = ArmeriaServerParser::parseRequest;

private static final HttpResponseParser defaultResponseParser = ArmeriaServerParser::parseResponse;

/**
* Returns the default {@link HttpRequestParser}.
*/
public static HttpRequestParser requestParser() {
return defaultRequestParser;
}

/**
* Returns the default {@link HttpResponseParser}.
*/
public static HttpResponseParser responseParser() {
return defaultResponseParser;
}

private BraveHttpServerParsers() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2025 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.server.brave;

import com.linecorp.armeria.common.annotation.UnstableApi;

import brave.rpc.RpcRequestParser;
import brave.rpc.RpcResponseParser;

/**
* Provides Armeria's default implementation for server-side RPC request and response parsers.
* Users may use the default parser like the following:
* <pre>{@code
* Tracing tracing = ...
* RpcTracing rpcTracing =
* RpcTracing.newBuilder(tracing)
* .serverRequestParser((req, ctx, span) -> {
* // Apply Brave's default request parser
* RpcRequestParser.DEFAULT.parse(req, ctx, span);
* // Apply Armeria's default request parser
* BraveRpcServerParsers.requestParser().parse(req, ctx, span);
* })
* .serverResponseParser((res, ctx, span) -> {
* // Apply Brave's default response parser
* RpcResponseParser.DEFAULT.parse(res, ctx, span);
* // Apply Armeria's default response parser
* BraveRpcServerParsers.responseParser().parse(res, ctx, span);
* });
* BraveRpcService
* .newDecorator(rpcTracing)
* ...
* }</pre>
* The following tags will be available by default:
* <ul>
* <li>http.url</li>
* <li>http.host</li>
* <li>http.protocol</li>
* <li>http.serfmt</li>
* <li>address.remote</li>
* <li>address.local</li>
* </ul>
*/
@UnstableApi
public final class BraveRpcServerParsers {

private static final RpcRequestParser defaultRequestParser = ArmeriaServerParser::parseRequest;

private static final RpcResponseParser defaultResponseParser = ArmeriaServerParser::parseResponse;

/**
* Returns the default {@link RpcRequestParser}.
*/
public static RpcRequestParser requestParser() {
return defaultRequestParser;
}

/**
* Returns the default {@link RpcResponseParser}.
*/
public static RpcResponseParser responseParser() {
return defaultResponseParser;
}

private BraveRpcServerParsers() {}
}
Loading
Loading