Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate response headers with grpc error metadata #32

Merged
merged 7 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/Internal/CallContext.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

declare(strict_types=1);

namespace Spiral\RoadRunner\GRPC\Internal;

use Spiral\RoadRunner\GRPC\ServiceInterface;

/**
* @internal
* @psalm-internal Spiral\RoadRunner\GRPC
*/
final class CallContext
{
/**
* @param class-string<ServiceInterface> $service
* @param non-empty-string $method
* @param array<string, array<string>> $context
*/
public function __construct(
public readonly string $service,
public readonly string $method,
public readonly array $context,
) {
rauanmayemir marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @throws \JsonException
*/
public static function decode(string $payload): self
{
/**
* @psalm-var array{
* service: class-string<ServiceInterface>,
* method: non-empty-string,
* context: array<string, array<string>>
* } $data
*/
$data = Json::decode($payload);

return new self(
service: $data['service'],
method: $data['method'],
context: $data['context'],
);
rauanmayemir marked this conversation as resolved.
Show resolved Hide resolved
}
}
73 changes: 73 additions & 0 deletions src/ResponseTrailers.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

declare(strict_types=1);

namespace Spiral\RoadRunner\GRPC;

use Spiral\RoadRunner\GRPC\Internal\Json;

/**
* @psalm-type THeaderKey = non-empty-string
* @psalm-type THeaderValue = string
* @implements \IteratorAggregate<THeaderKey, string>
*/
final class ResponseTrailers implements \IteratorAggregate, \Countable
{
/**
* @var array<THeaderKey, THeaderValue>
*/
private array $trailers = [];

/**
* @param iterable<THeaderKey, THeaderValue> $trailers
*/
public function __construct(iterable $trailers = [])
{
foreach ($trailers as $key => $value) {
$this->set($key, $value);
}
}

/**
* @param THeaderKey $key
* @param THeaderValue $value
*/
public function set(string $key, string $value): void
{
$this->trailers[$key] = $value;
}

/**
* @param THeaderKey $key
* @param string|null $default
* @return THeaderValue|null
*/
public function get(string $key, string $default = null): ?string
{
return $this->trailers[$key] ?? $default;
}

public function getIterator(): \Traversable
{
return new \ArrayIterator($this->trailers);
}

public function count(): int
{
return \count($this->trailers);
}

/**
* @throws \JsonException
*/
public function packTrailers(): string
{
// If an empty array is serialized, it is cast to the string "[]"
// instead of object string "{}"
if ($this->trailers === []) {
return '{}';
}

return Json::encode($this->trailers);
}
}
83 changes: 40 additions & 43 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Spiral\RoadRunner\GRPC\Exception\GRPCExceptionInterface;
use Spiral\RoadRunner\GRPC\Exception\NotFoundException;
use Spiral\RoadRunner\GRPC\Exception\ServiceException;
use Spiral\RoadRunner\GRPC\Internal\CallContext;
use Spiral\RoadRunner\GRPC\Internal\Json;
use Spiral\RoadRunner\Payload;
use Spiral\RoadRunner\Worker;
Expand All @@ -21,12 +22,6 @@
* @psalm-type ServerOptions = array{
* debug?: bool
* }
*
* @psalm-type ContextResponse = array{
* service: class-string<ServiceInterface>,
* method: non-empty-string,
* context: array<string, array<string>>
* }
*/
final class Server
{
Expand Down Expand Up @@ -77,15 +72,43 @@ public function serve(?WorkerInterface $worker = null, ?callable $finalize = nul
return;
}

try {
/** @var ContextResponse $context */
$context = Json::decode($request->header);
$responseHeaders = new ResponseHeaders();
$responseTrailers = new ResponseTrailers();

[$answerBody, $answerHeaders] = $this->tick($request->body, $context);

$this->workerSend($worker, $answerBody, $answerHeaders);
try {
$call = CallContext::decode($request->header);

$context = new Context(array_merge(
$call->context,
[
ResponseHeaders::class => $responseHeaders,
ResponseTrailers::class => $responseTrailers
]
));

$response = $this->invoke($call->service, $call->method, $context, $request->body);

$headers = [];
$responseHeaders->count() === 0 or $headers['headers'] = $responseHeaders->packHeaders();
$responseTrailers->count() === 0 or $headers['trailers'] = $responseTrailers->packTrailers();

$this->workerSend(
worker: $worker,
body: $response,
headers: $headers === [] ? '{}' : Json::encode($headers),
);
} catch (GRPCExceptionInterface $e) {
$this->workerGrpcError($worker, $e);
$headers = [
'error' => $this->createGrpcError($e),
];
$responseHeaders->count() === 0 or $headers['headers'] = $responseHeaders->packHeaders();
$responseTrailers->count() === 0 or $headers['trailers'] = $responseTrailers->packTrailers();

$this->workerSend(
worker: $worker,
body: '',
headers: Json::encode($headers),
);
} catch (\Throwable $e) {
$this->workerError($worker, $this->isDebugMode() ? (string) $e : $e->getMessage());
} finally {
Expand All @@ -112,24 +135,9 @@ protected function invoke(string $service, string $method, ContextInterface $con
return $this->services[$service]->invoke($method, $context, $body);
}

/**
* @param ContextResponse $data
* @return array{0: string, 1: string}
* @throws \JsonException
* @throws \Throwable
*/
private function tick(string $body, array $data): array
private function workerError(WorkerInterface $worker, string $message): void
{
$context = (new Context($data['context']))
->withValue(ResponseHeaders::class, new ResponseHeaders());

$response = $this->invoke($data['service'], $data['method'], $context, $body);

/** @var ResponseHeaders|null $responseHeaders */
$responseHeaders = $context->getValue(ResponseHeaders::class);
$responseHeadersString = $responseHeaders ? $responseHeaders->packHeaders() : '{}';

return [$response, $responseHeadersString];
$worker->error($message);
}

/**
Expand All @@ -140,12 +148,7 @@ private function workerSend(WorkerInterface $worker, string $body, string $heade
$worker->respond(new Payload($body, $headers));
}

private function workerError(WorkerInterface $worker, string $message): void
{
$worker->error($message);
}

private function workerGrpcError(WorkerInterface $worker, GRPCExceptionInterface $e): void
private function createGrpcError(GRPCExceptionInterface $e): string
{
$status = new Status([
'code' => $e->getCode(),
Expand All @@ -161,13 +164,7 @@ static function ($detail) {
),
]);

$this->workerSend(
$worker,
'',
Json::encode([
'error' => \base64_encode($status->serializeToString()),
]),
);
return \base64_encode($status->serializeToString());
}

/**
Expand Down