Skip to content

Commit

Permalink
Fix #39: Add temporal support
Browse files Browse the repository at this point in the history
  • Loading branch information
xepozz authored Dec 29, 2024
1 parent 7b08dcf commit bf85a0b
Show file tree
Hide file tree
Showing 12 changed files with 425 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

## 3.0.0 February 22, 2024

- Enh #39: Add temporal support (@xepozz)
- Enh #35: Add composer require checker into CI (@xepozz)
- Chg #23: Rename `RoadRunnerApplicationRunner` to `RoadRunnerHttpApplicationRunner` (@s1lver)
- Chg #61: Increased minimum PHP version to 8.1 (@s1lver)
- Enh #67, #76: Added runner for gRPC requests (@s1lver)
Expand Down
80 changes: 80 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,86 @@ You can also use your own implementation of the `Spiral\RoadRunner\Http\PSR7Work
$runner = $runner->withPsr7Worker($psr7Worker);
```

## Temporal

Temporal is a distributed, scalable, durable, and highly available orchestration engine used to execute asynchronous long-running business logic in a scalable and resilient way.

Explore more about Temporal on [the official website](https://temporal.io) and in [the SDK repository](https://github.com/temporalio/sdk-php).

> If you want to add support for Temporal you need to install the SDK and configure workflows and activities as described below.

### Installation

```shell
composer require temporal/sdk
```

### Configuration

Temporal has at least two main class types: [Activity](https://docs.temporal.io/activities) and [Workflow](https://docs.temporal.io/workflows).
Any activity must have the attribute `\Temporal\Activity\ActivityInterface`:

```php
namespace App\Activity;
#[\Temporal\Activity\ActivityInterface]
class MyActivity
{
// ...
}
```

Any workflow must have the attribute `\Temporal\Workflow\WorkflowInterface`.

```php
namespace App\Workflow;
#[\Temporal\Workflow\WorkflowInterface]
class MyWorkflow
{
// ...
}
```

Configure Temporal engine to make it able to work with your activities and workflows in the `params.php`:

```php
// config/common/params.php
return [
'yiisoft/yii-runner-roadrunner' => [
'temporal' => [
'enabled' => true,
'host' => 'localhost:7233', // host of Temporal engine
'workflows' => [
\App\Workflow\MyWorkflow::class,
],
'activities' => [
\App\Activity\MyActivity::class,
],
],
],
]
```

> If you use another container instead of `yiisoft/di`, make sure that
> `\Yiisoft\Yii\Runner\RoadRunner\Temporal\TemporalDeclarationProvider`
> is registered and returns all of your workflows and activities.

The last thing is to call `withTemporalEnabled(true)` on the `\Yiisoft\Yii\Runner\RoadRunner\RoadRunnerHttpApplicationRunner` in the `public/index.php`:

```php
(new RoadRunnerHttpApplicationRunner())
->withTemporalEnabled(true)
->run();
```

## Testing

### Unit testing

The package is tested with [PHPUnit](https://phpunit.de/). To run tests:
## Documentation

- Guide: [English](docs/guide/en/README.md), [Português - Brasil](docs/guide/pt-BR/README.md), [Русский](docs/guide/ru/README.md)
Expand Down
6 changes: 6 additions & 0 deletions composer-require-checker.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"symbol-whitelist" : [
"Temporal\\Worker\\WorkerFactoryInterface",
"Temporal\\Worker\\Transport\\HostConnectionInterface"
]
}
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"rector/rector": "^2.0.3",
"roave/infection-static-analysis-plugin": "^1.16",
"spatie/phpunit-watcher": "^1.23",
"temporal/sdk": "^2.11",
"vimeo/psalm": "^5.22",
"yiisoft/middleware-dispatcher": "^5.0",
"yiisoft/test-support": "^3.0",
Expand All @@ -62,6 +63,9 @@
"grpc/grpc": "Needed for gRPC support",
"ext-grpc": "Needed for gRPC support"
},
"suggest": {
"temporal/sdk": "Add Temporal support"
},
"autoload": {
"psr-4": {
"Yiisoft\\Yii\\Runner\\RoadRunner\\": "src"
Expand All @@ -82,6 +86,7 @@
"build-merge-plan": false
},
"config-plugin": {
"di": "di-*.php",
"params": "params.php"
}
},
Expand Down
57 changes: 57 additions & 0 deletions config/di-temporal.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

declare(strict_types=1);

use Temporal\Client\GRPC\ServiceClient;
use Temporal\Client\GRPC\ServiceClientInterface;
use Temporal\Client\WorkflowClient;
use Temporal\Client\WorkflowClientInterface;
use Temporal\DataConverter\DataConverter;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Worker\Transport\Goridge;
use Temporal\Worker\Transport\HostConnectionInterface;
use Temporal\Worker\Transport\RoadRunner;
use Temporal\Worker\Transport\RPCConnectionInterface;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\WorkerFactory;
use Yiisoft\Definitions\Reference;
use Yiisoft\Yii\Runner\RoadRunner\Temporal\TemporalDeclarationProvider;

/**
* @var $params array
*/

$temporalParams = $params['yiisoft/yii-runner-roadrunner']['temporal'];
if (!($temporalParams['enabled'] ?? false)) {
return [];
}

return [
DataConverterInterface::class => DataConverter::class,
DataConverter::class => fn () => DataConverter::createDefault(),

RPCConnectionInterface::class => Goridge::class,
Goridge::class => fn () => Goridge::create(),

WorkerFactoryInterface::class => WorkerFactory::class,
WorkerFactory::class => fn () => WorkerFactory::create(),

HostConnectionInterface::class => RoadRunner::class,
RoadRunner::class => fn () => RoadRunner::create(),

WorkflowClientInterface::class => WorkflowClient::class,
WorkflowClient::class => [
'class' => WorkflowClient::class,
'__construct()' => [
Reference::to(ServiceClientInterface::class),
],
],

ServiceClientInterface::class => ServiceClient::class,
ServiceClient::class => fn () => ServiceClient::create($temporalParams['host']),

TemporalDeclarationProvider::class => fn () => new TemporalDeclarationProvider(
$temporalParams['workflows'] ?? [],
$temporalParams['activities'] ?? [],
),
];
6 changes: 6 additions & 0 deletions config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,11 @@
'grpc' => [
'services' => [],
],
'temporal' => [
'enabled' => false,
'host' => 'localhost:7233',
'workflows' => [],
'activities' => [],
],
],
];
142 changes: 115 additions & 27 deletions src/RoadRunnerHttpApplicationRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@
namespace Yiisoft\Yii\Runner\RoadRunner;

use ErrorException;
use Exception;
use JsonException;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Http\Message\ResponseInterface;
use ReflectionClass;
use RuntimeException;
use Spiral\RoadRunner\Environment;
use Spiral\RoadRunner\Environment\Mode;
use Spiral\RoadRunner\Http\PSR7WorkerInterface;
use Temporal\Worker\Transport\HostConnectionInterface;
use Temporal\Worker\WorkerFactoryInterface;
use Throwable;
use Yiisoft\Definitions\Exception\CircularReferenceException;
use Yiisoft\Definitions\Exception\InvalidConfigException;
Expand All @@ -23,8 +30,10 @@
use Yiisoft\Log\Target\File\FileTarget;
use Yiisoft\Yii\Http\Application;
use Yiisoft\Yii\Runner\ApplicationRunner;
use Yiisoft\Yii\Runner\RoadRunner\Temporal\TemporalDeclarationProvider;

use function gc_collect_cycles;
use function interface_exists;

/**
* `RoadRunnerHttpApplicationRunner` runs the Yii HTTP application using RoadRunner.
Expand All @@ -33,6 +42,7 @@ final class RoadRunnerHttpApplicationRunner extends ApplicationRunner
{
private ?ErrorHandler $temporaryErrorHandler = null;
private ?PSR7WorkerInterface $psr7Worker = null;
private bool $isTemporalEnabled = false;

/**
* @param string $rootPath The absolute path to the project root.
Expand Down Expand Up @@ -113,6 +123,19 @@ public function withPsr7Worker(PSR7WorkerInterface $worker): self
return $new;
}

/**
* Returns a new instance with enabled temporal support.
*/
public function withTemporalEnabled(bool $value): self
{
if (!$this->isTemporalSDKInstalled()) {
throw new Exception('Temporal SDK is not installed. To install the SDK run `composer require temporal/sdk`.');
}
$new = clone $this;
$new->isTemporalEnabled = $value;
return $new;
}

/**
* {@inheritDoc}
*
Expand All @@ -135,37 +158,27 @@ public function run(): void
$this->runBootstrap();
$this->checkEvents();

$worker = new RoadRunnerHttpWorker($container, $this->psr7Worker);

/** @var Application $application */
$application = $container->get(Application::class);
$application->start();

while (true) {
$request = $worker->waitRequest();
$response = null;

if ($request === null) {
break;
}

if ($request instanceof Throwable) {
$response = $worker->respondWithError($request);
$this->afterRespond($application, $container, $response);
continue;
}
$env = Environment::fromGlobals();

try {
$response = $application->handle($request);
$worker->respond($response);
} catch (Throwable $t) {
$response = $worker->respondWithError($t, $request);
} finally {
$this->afterRespond($application, $container, $response);
if ($env->getMode() === Mode::MODE_TEMPORAL) {
if (!$this->isTemporalEnabled) {
throw new RuntimeException(
'Temporal support is disabled. You should call `withTemporalEnabled(true)` to enable temporal support.',
);
}
$this->runTemporal($container);
return;
}
if ($env->getMode() === Mode::MODE_HTTP) {
$this->runRoadRunner($container);
return;
}

$application->shutdown();
throw new RuntimeException(sprintf(
'Unsupported mode "%s", supported modes are: "%s".',
$env->getMode(),
implode('", "', [Mode::MODE_HTTP, Mode::MODE_TEMPORAL]),
));
}

private function createTemporaryErrorHandler(): ErrorHandler
Expand Down Expand Up @@ -204,4 +217,79 @@ private function afterRespond(
->reset(); // We should reset the state of such services every request.
gc_collect_cycles();
}

private function runRoadRunner(ContainerInterface $container): void
{
$worker = new RoadRunnerHttpWorker($container, $this->psr7Worker);

/** @var Application $application */
$application = $container->get(Application::class);
$application->start();

while (true) {
$request = $worker->waitRequest();
$response = null;

if ($request === null) {
break;
}

if ($request instanceof Throwable) {
$response = $worker->respondWithError($request);
$this->afterRespond($application, $container, $response);
continue;
}

try {
$response = $application->handle($request);
$worker->respond($response);
} catch (Throwable $t) {
$response = $worker->respondWithError($t, $request);
} finally {
$this->afterRespond($application, $container, $response);
}
}

$application->shutdown();
}

private function runTemporal(ContainerInterface $container): void
{
/** @var TemporalDeclarationProvider $temporalDeclarationProvider */
$temporalDeclarationProvider = $container->get(TemporalDeclarationProvider::class);
/** @var HostConnectionInterface $host */
$host = $container->get(HostConnectionInterface::class);

/** @var WorkerFactoryInterface $factory */
$factory = $container->get(WorkerFactoryInterface::class);
$worker = $factory->newWorker('default');

$workflows = $temporalDeclarationProvider->getWorkflows();
$activities = $temporalDeclarationProvider->getActivities();

$worker->registerWorkflowTypes(...$workflows);

/** @psalm-suppress MixedReturnStatement,MixedInferredReturnType */
$activityFactory = static fn (ReflectionClass $class): object => $container->get($class->getName());
$activityFinalizer = static function () use ($container): void {
/** @psalm-suppress MixedMethodCall */
$container
->get(StateResetter::class)
->reset(); // We should reset the state of such services every request.
gc_collect_cycles();
};

foreach ($activities as $activity) {
$worker->registerActivity($activity, $activityFactory);
}
$worker->registerActivityFinalizer($activityFinalizer);

/** @psalm-suppress TooManyArguments */
$factory->run($host);
}

private function isTemporalSDKInstalled(): bool
{
return interface_exists(WorkerFactoryInterface::class);
}
}
Loading

0 comments on commit bf85a0b

Please sign in to comment.