From 574a98233c64c907d2cc54a0305e777a61abce9f Mon Sep 17 00:00:00 2001 From: zeyu10 Date: Thu, 13 Feb 2025 10:38:04 +0800 Subject: [PATCH] maintain trace context after async task --- .../flow/olympicene/traversal/DAGOperations.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java index ac1f30ac..d8c317e7 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java @@ -242,8 +242,14 @@ public void finishTaskSync(String executionId, String taskCategory, NotifyInfo n } if (isTaskCompleted(executionResult)) { timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo()); - dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName()); - invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext()); + // 在恢复的 context 中触发下一个任务 + Context parentContext = span != null ? Context.current() : Context.current(); + runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { + try (Scope ignored = parentContext.makeCurrent()) { + dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName()); + invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext()); + } + })); } if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) { dagTraversal.submitTraversal(executionId, executionResult.getTaskNameNeedToTraversal()); @@ -257,7 +263,7 @@ public void finishTaskSync(String executionId, String taskCategory, NotifyInfo n } finally { if (span != null && executionResult.getTaskStatus().isCompleted()) { span.setAttribute("status", executionResult.getTaskStatus().toString()); - span.end(); // 结束之前保存的 span + span.end(); } } }