From fd8ac30f07b744fb4adb91e044182443ba05b7e1 Mon Sep 17 00:00:00 2001 From: Tobias Bachert Date: Fri, 2 Dec 2022 19:31:42 +0100 Subject: [PATCH] Run exports with the `Context` the `SpanProcessor` was created in (#880) * Run exports in spanprocessor construction context * Resolve psalm issues --- .../SpanProcessor/BatchSpanProcessor.php | 19 +++++++++----- .../SpanProcessor/SimpleSpanProcessor.php | 25 +++++++++++-------- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php b/src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php index b7ab06c13..4d935466c 100644 --- a/src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php +++ b/src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php @@ -9,6 +9,7 @@ use InvalidArgumentException; use OpenTelemetry\API\Metrics\MeterProviderInterface; use OpenTelemetry\API\Metrics\ObserverInterface; +use OpenTelemetry\Context\Context; use OpenTelemetry\Context\ContextInterface; use OpenTelemetry\SDK\Behavior\LogsMessagesTrait; use OpenTelemetry\SDK\Common\Future\CancellationInterface; @@ -44,6 +45,7 @@ class BatchSpanProcessor implements SpanProcessorInterface private int $scheduledDelayNanos; private int $maxExportBatchSize; private bool $autoFlush; + private ContextInterface $exportContext; private ?int $nextScheduledRun = null; private bool $running = false; @@ -55,7 +57,7 @@ class BatchSpanProcessor implements SpanProcessorInterface private array $batch = []; /** @var SplQueue> */ private SplQueue $queue; - /** @var SplQueue */ + /** @var SplQueue */ private SplQueue $flush; private bool $closed = false; @@ -93,6 +95,7 @@ public function __construct( $this->maxExportBatchSize = $maxExportBatchSize; $this->autoFlush = $autoFlush; + $this->exportContext = Context::getCurrent(); $this->queue = new SplQueue(); $this->flush = new SplQueue(); @@ -199,7 +202,7 @@ private function flush(?string $flushMethod = null, ?CancellationInterface $canc { if ($flushMethod !== null) { $flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch; - $this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running]); + $this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running, Context::getCurrent()]); } if ($this->running) { @@ -213,7 +216,8 @@ private function flush(?string $flushMethod = null, ?CancellationInterface $canc try { for (;;) { while (!$this->flush->isEmpty() && $this->flush->bottom()[0] <= $this->batchId) { - [, $flushMethod, $cancellation, $propagateResult] = $this->flush->dequeue(); + [, $flushMethod, $cancellation, $propagateResult, $context] = $this->flush->dequeue(); + $scope = $context->activate(); try { $result = $this->exporter->$flushMethod($cancellation); @@ -223,10 +227,11 @@ private function flush(?string $flushMethod = null, ?CancellationInterface $canc } catch (Throwable $e) { if ($propagateResult) { $exception = $e; - - continue; + } else { + self::logError(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]); } - self::logError(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]); + } finally { + $scope->detach(); } } @@ -239,6 +244,7 @@ private function flush(?string $flushMethod = null, ?CancellationInterface $canc } $batchSize = count($this->queue->bottom()); $this->batchId++; + $scope = $this->exportContext->activate(); try { $this->exporter->export($this->queue->dequeue())->await(); @@ -247,6 +253,7 @@ private function flush(?string $flushMethod = null, ?CancellationInterface $canc } finally { $this->processed += $batchSize; $this->queueSize -= $batchSize; + $scope->detach(); } } } finally { diff --git a/src/SDK/Trace/SpanProcessor/SimpleSpanProcessor.php b/src/SDK/Trace/SpanProcessor/SimpleSpanProcessor.php index 5fa102791..09221d115 100644 --- a/src/SDK/Trace/SpanProcessor/SimpleSpanProcessor.php +++ b/src/SDK/Trace/SpanProcessor/SimpleSpanProcessor.php @@ -5,6 +5,7 @@ namespace OpenTelemetry\SDK\Trace\SpanProcessor; use Closure; +use OpenTelemetry\Context\Context; use OpenTelemetry\Context\ContextInterface; use OpenTelemetry\SDK\Behavior\LogsMessagesTrait; use OpenTelemetry\SDK\Common\Future\CancellationInterface; @@ -21,9 +22,10 @@ class SimpleSpanProcessor implements SpanProcessorInterface use LogsMessagesTrait; private SpanExporterInterface $exporter; + private ContextInterface $exportContext; private bool $running = false; - /** @var SplQueue */ + /** @var SplQueue */ private SplQueue $queue; private bool $closed = false; @@ -32,6 +34,7 @@ public function __construct(SpanExporterInterface $exporter) { $this->exporter = $exporter; + $this->exportContext = Context::getCurrent(); $this->queue = new SplQueue(); } @@ -49,7 +52,7 @@ public function onEnd(ReadableSpanInterface $span): void } $spanData = $span->toSpanData(); - $this->flush(fn () => $this->exporter->export([$spanData])->await(), 'export'); + $this->flush(fn () => $this->exporter->export([$spanData])->await(), 'export', false, $this->exportContext); } public function forceFlush(?CancellationInterface $cancellation = null): bool @@ -58,7 +61,7 @@ public function forceFlush(?CancellationInterface $cancellation = null): bool return false; } - return $this->flush(fn (): bool => $this->exporter->forceFlush($cancellation), __FUNCTION__, true); + return $this->flush(fn (): bool => $this->exporter->forceFlush($cancellation), __FUNCTION__, true, Context::getCurrent()); } public function shutdown(?CancellationInterface $cancellation = null): bool @@ -69,12 +72,12 @@ public function shutdown(?CancellationInterface $cancellation = null): bool $this->closed = true; - return $this->flush(fn (): bool => $this->exporter->shutdown($cancellation), __FUNCTION__, true); + return $this->flush(fn (): bool => $this->exporter->shutdown($cancellation), __FUNCTION__, true, Context::getCurrent()); } - private function flush(Closure $task, string $taskName, bool $propagateResult = false): bool + private function flush(Closure $task, string $taskName, bool $propagateResult, ContextInterface $context): bool { - $this->queue->enqueue([$task, $taskName, $propagateResult && !$this->running]); + $this->queue->enqueue([$task, $taskName, $propagateResult && !$this->running, $context]); if ($this->running) { return false; @@ -86,7 +89,8 @@ private function flush(Closure $task, string $taskName, bool $propagateResult = try { while (!$this->queue->isEmpty()) { - [$task, $taskName, $propagateResult] = $this->queue->dequeue(); + [$task, $taskName, $propagateResult, $context] = $this->queue->dequeue(); + $scope = $context->activate(); try { $result = $task(); @@ -96,10 +100,11 @@ private function flush(Closure $task, string $taskName, bool $propagateResult = } catch (Throwable $e) { if ($propagateResult) { $exception = $e; - - continue; + } else { + self::logError(sprintf('Unhandled %s error', $taskName), ['exception' => $e]); } - self::logError(sprintf('Unhandled %s error', $taskName), ['exception' => $e]); + } finally { + $scope->detach(); } } } finally {