From 2e977e0d7e5a59fe3299cee626e97cb8f64959dd Mon Sep 17 00:00:00 2001 From: Yohann Genre <5088279+Yokann@users.noreply.github.com> Date: Wed, 23 Aug 2023 11:35:20 +0200 Subject: [PATCH] feat: bump amp to v3 --- .github/workflows/ci.yml | 2 +- composer.json | 6 +- src/Adapter/Amp/EventLoop.php | 133 ++++++++++---------- src/Adapter/Amp/Internal/Deferred.php | 13 +- src/Adapter/Amp/Internal/PromiseWrapper.php | 18 +-- tests/Adapter/Amp/LoopReset.php | 3 +- 6 files changed, 83 insertions(+), 92 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6ba4149..e92e009 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: - php: [ '7.3', '7.4', '8.0' ] + php: [ '8.1' ] steps: - name: 'Init repository' diff --git a/composer.json b/composer.json index 931ab32..6181fef 100644 --- a/composer.json +++ b/composer.json @@ -20,15 +20,15 @@ } }, "require": { - "php": "^7.3|^8.0", + "php": "^8.1", "psr/http-message": "^1.0" }, "require-dev": { "phpunit/phpunit": "^9.4.4", - "amphp/amp": "^2.0", + "amphp/amp": "^3.0", "guzzlehttp/guzzle": "^6.3", "m6web/php-cs-fixer-config": "^2.0", - "ext-curl": "^7.3|^8.0", + "ext-curl": "^8.0", "react/event-loop": "^1.0", "react/promise": "^2.7", "phpstan/phpstan": "^0.12", diff --git a/src/Adapter/Amp/EventLoop.php b/src/Adapter/Amp/EventLoop.php index a0a6d7b..4ba3225 100644 --- a/src/Adapter/Amp/EventLoop.php +++ b/src/Adapter/Amp/EventLoop.php @@ -2,6 +2,8 @@ namespace M6Web\Tornado\Adapter\Amp; +use Amp\DeferredFuture; +use Amp\Future; use M6Web\Tornado\Adapter\Common; use M6Web\Tornado\Deferred; use M6Web\Tornado\Promise; @@ -14,25 +16,24 @@ class EventLoop implements \M6Web\Tornado\EventLoop public function wait(Promise $promise) { try { - $result = \Amp\Promise\wait( - Internal\PromiseWrapper::toHandledPromise($promise, $this->unhandledFailingPromises)->getAmpPromise() - ); + $result = \Amp\Future\await([Internal\PromiseWrapper::toHandledPromise($promise, $this->unhandledFailingPromises)->getAmpFuture()]); $this->unhandledFailingPromises->throwIfWatchedFailingPromiseExists(); - return $result; + return $result[0]; } catch (\Error $error) { // Modify exceptions sent by Amp itself if ($error->getCode() !== 0) { throw $error; } - switch ($error->getMessage()) { - case 'Loop stopped without resolving the promise': - throw new \Error('Impossible to resolve the promise, no more task to execute.', 0, $error); - case 'Loop exceptionally stopped without resolving the promise': - throw $error->getPrevious() ?? $error; - default: - throw $error; + + if (str_starts_with($error->getMessage(), 'Event loop terminated without resuming the current suspension')) { + throw new \Error('Impossible to resolve the promise, no more task to execute.', 0, $error); } + + throw match ($error->getMessage()) { + 'Loop exceptionally stopped without resolving the promise' => $error->getPrevious() ?? $error, + default => $error, + }; } } @@ -41,7 +42,7 @@ public function wait(Promise $promise) */ public function async(\Generator $generator): Promise { - $wrapper = function (\Generator $generator, \Amp\Deferred $deferred): \Generator { + $wrapper = function (\Generator $generator, \Amp\DeferredFuture $deferred) { try { while ($generator->valid()) { $blockingPromise = $generator->current(); @@ -51,13 +52,13 @@ public function async(\Generator $generator): Promise $blockingPromise = Internal\PromiseWrapper::toHandledPromise( $blockingPromise, $this->unhandledFailingPromises - )->getAmpPromise(); + )->getAmpFuture(); // Forwards promise value/exception to underlying generator $blockingPromiseValue = null; $blockingPromiseException = null; try { - $blockingPromiseValue = yield $blockingPromise; + $blockingPromiseValue = $blockingPromise->await(); } catch (\Throwable $throwable) { $blockingPromiseException = $throwable; } @@ -68,18 +69,18 @@ public function async(\Generator $generator): Promise } } } catch (\Throwable $throwable) { - $deferred->fail($throwable); + $deferred->error($throwable); return; } - $deferred->resolve($generator->getReturn()); + $deferred->complete($generator->getReturn()); }; - $deferred = new \Amp\Deferred(); - \Amp\Promise\rethrow(new \Amp\Coroutine($wrapper($generator, $deferred))); + $deferred = new \Amp\DeferredFuture(); + \Amp\async(fn() => $wrapper($generator, $deferred)); - return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises); + return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises); } /** @@ -87,20 +88,17 @@ public function async(\Generator $generator): Promise */ public function promiseAll(Promise ...$promises): Promise { - return Internal\PromiseWrapper::createUnhandled( - \Amp\Promise\all( - array_map( - function (Promise $promise) { - return Internal\PromiseWrapper::toHandledPromise( - $promise, - $this->unhandledFailingPromises - )->getAmpPromise(); - }, - $promises - ) - ), - $this->unhandledFailingPromises - ); + $generator = function() use ($promises): \Generator { + $result = []; + + foreach ($promises as $promise) { + $result[] = yield Internal\PromiseWrapper::toHandledPromise($promise, $this->unhandledFailingPromises); + } + + return $result; + }; + + return $this->async($generator()); } /** @@ -125,39 +123,39 @@ public function promiseRace(Promise ...$promises): Promise return $this->promiseFulfilled(null); } - $deferred = new \Amp\Deferred(); + $deferred = new \Amp\DeferredFuture(); $isFirstPromise = true; - $wrapPromise = function (\Amp\Promise $promise) use ($deferred, &$isFirstPromise): \Generator { + $wrapPromise = function (\Amp\Future $future) use ($deferred, &$isFirstPromise) { try { - $result = yield $promise; + $result = $future->await(); if ($isFirstPromise) { $isFirstPromise = false; - $deferred->resolve($result); + $deferred->complete($result); } } catch (\Throwable $throwable) { if ($isFirstPromise) { $isFirstPromise = false; - $deferred->fail($throwable); + $deferred->error($throwable); } } }; - $promises = array_map( + $futures = array_map( function (Promise $promise) { return Internal\PromiseWrapper::toHandledPromise( $promise, $this->unhandledFailingPromises - )->getAmpPromise(); + )->getAmpFuture(); }, $promises ); - foreach ($promises as $index => $promise) { - \Amp\Promise\rethrow(new \Amp\Coroutine($wrapPromise($promise))); + foreach ($futures as $index => $future) { + \Amp\async(fn() => $wrapPromise($future)); } - return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises); + return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises); } /** @@ -165,7 +163,7 @@ function (Promise $promise) { */ public function promiseFulfilled($value): Promise { - return Internal\PromiseWrapper::createHandled(new \Amp\Success($value)); + return Internal\PromiseWrapper::createHandled(Future::complete($value)); } /** @@ -173,8 +171,7 @@ public function promiseFulfilled($value): Promise */ public function promiseRejected(\Throwable $throwable): Promise { - // Manually created promises are considered as handled. - return Internal\PromiseWrapper::createHandled(new \Amp\Failure($throwable)); + return Internal\PromiseWrapper::createHandled(Future::error($throwable)); } /** @@ -182,13 +179,13 @@ public function promiseRejected(\Throwable $throwable): Promise */ public function idle(): Promise { - $deferred = new \Amp\Deferred(); + $deferred = new \Amp\DeferredFuture(); - \Amp\Loop::defer(function () use ($deferred) { - $deferred->resolve(); + \Revolt\EventLoop::defer(function () use ($deferred) { + $deferred->complete(); }); - return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises); + return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises); } /** @@ -196,13 +193,13 @@ public function idle(): Promise */ public function delay(int $milliseconds): Promise { - $deferred = new \Amp\Deferred(); + $deferred = new \Amp\DeferredFuture(); - \Amp\Loop::delay($milliseconds, function () use ($deferred) { - $deferred->resolve(); + \Revolt\EventLoop::delay($milliseconds/1000, function () use ($deferred) { + $deferred->complete(); }); - return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises); + return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises); } /** @@ -211,9 +208,8 @@ public function delay(int $milliseconds): Promise public function deferred(): Deferred { return new Internal\Deferred( - $deferred = new \Amp\Deferred(), - // Manually created promises are considered as handled. - Internal\PromiseWrapper::createHandled($deferred->promise()) + $deferred = new \Amp\DeferredFuture(), + Internal\PromiseWrapper::createHandled($deferred->getFuture()) ); } @@ -222,17 +218,17 @@ public function deferred(): Deferred */ public function readable($stream): Promise { - $deferred = new \Amp\Deferred(); + $deferred = new \Amp\DeferredFuture(); - \Amp\Loop::onReadable( + \Revolt\EventLoop::onReadable( $stream, function ($watcherId, $stream) use ($deferred) { - \Amp\Loop::cancel($watcherId); - $deferred->resolve($stream); + \Revolt\EventLoop::cancel($watcherId); + $deferred->complete($stream); } ); - return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises); + return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises); } /** @@ -240,17 +236,17 @@ function ($watcherId, $stream) use ($deferred) { */ public function writable($stream): Promise { - $deferred = new \Amp\Deferred(); + $deferred = new \Amp\DeferredFuture(); - \Amp\Loop::onWritable( + \Revolt\EventLoop::onWritable( $stream, function ($watcherId, $stream) use ($deferred) { - \Amp\Loop::cancel($watcherId); - $deferred->resolve($stream); + \Revolt\EventLoop::cancel($watcherId); + $deferred->complete($stream); } ); - return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises); + return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises); } public function __construct() @@ -258,6 +254,5 @@ public function __construct() $this->unhandledFailingPromises = new Common\Internal\FailingPromiseCollection(); } - /** @var Common\Internal\FailingPromiseCollection */ - private $unhandledFailingPromises; + private Common\Internal\FailingPromiseCollection $unhandledFailingPromises; } diff --git a/src/Adapter/Amp/Internal/Deferred.php b/src/Adapter/Amp/Internal/Deferred.php index 6b412bd..186c956 100644 --- a/src/Adapter/Amp/Internal/Deferred.php +++ b/src/Adapter/Amp/Internal/Deferred.php @@ -10,13 +10,10 @@ */ class Deferred implements \M6Web\Tornado\Deferred { - /** @var \Amp\Deferred */ - private $ampDeferred; + private \Amp\DeferredFuture $ampDeferred; + private PromiseWrapper $promise; - /** @var PromiseWrapper */ - private $promise; - - public function __construct(\Amp\Deferred $ampDeferred, PromiseWrapper $promise) + public function __construct(\Amp\DeferredFuture $ampDeferred, PromiseWrapper $promise) { $this->ampDeferred = $ampDeferred; $this->promise = $promise; @@ -40,7 +37,7 @@ public function getPromiseWrapper(): PromiseWrapper */ public function resolve($value): void { - $this->ampDeferred->resolve($value); + $this->ampDeferred->complete($value); } /** @@ -48,6 +45,6 @@ public function resolve($value): void */ public function reject(\Throwable $throwable): void { - $this->ampDeferred->fail($throwable); + $this->ampDeferred->error($throwable); } } diff --git a/src/Adapter/Amp/Internal/PromiseWrapper.php b/src/Adapter/Amp/Internal/PromiseWrapper.php index b5ab182..3d177b2 100644 --- a/src/Adapter/Amp/Internal/PromiseWrapper.php +++ b/src/Adapter/Amp/Internal/PromiseWrapper.php @@ -11,11 +11,9 @@ */ class PromiseWrapper implements Promise { - /** @var \Amp\Promise */ - private $ampPromise; + private \Amp\Future $ampPromise; - /** @var bool */ - private $isHandled; + private bool $isHandled; /** * Use named (static) constructor instead @@ -24,13 +22,13 @@ private function __construct() { } - public static function createUnhandled(\Amp\Promise $ampPromise, FailingPromiseCollection $failingPromiseCollection): self + public static function createUnhandled(\Amp\Future $ampPromise, FailingPromiseCollection $failingPromiseCollection): self { $promiseWrapper = new self(); $promiseWrapper->isHandled = false; $promiseWrapper->ampPromise = $ampPromise; - $promiseWrapper->ampPromise->onResolve( - function (?\Throwable $reason, $value) use ($promiseWrapper, $failingPromiseCollection) { + $promiseWrapper->ampPromise->catch( + function (?\Throwable $reason) use ($promiseWrapper, $failingPromiseCollection) { if ($reason !== null && !$promiseWrapper->isHandled) { $failingPromiseCollection->watchFailingPromise($promiseWrapper, $reason); } @@ -40,8 +38,9 @@ function (?\Throwable $reason, $value) use ($promiseWrapper, $failingPromiseColl return $promiseWrapper; } - public static function createHandled(\Amp\Promise $ampPromise): self + public static function createHandled(\Amp\Future $ampPromise): self { + $ampPromise->ignore(); $promiseWrapper = new self(); $promiseWrapper->isHandled = true; $promiseWrapper->ampPromise = $ampPromise; @@ -49,7 +48,7 @@ public static function createHandled(\Amp\Promise $ampPromise): self return $promiseWrapper; } - public function getAmpPromise(): \Amp\Promise + public function getAmpFuture(): \Amp\Future { return $this->ampPromise; } @@ -59,6 +58,7 @@ public static function toHandledPromise(Promise $promise, FailingPromiseCollecti assert($promise instanceof self, new \Error('Input promise was not created by this adapter.')); $promise->isHandled = true; + $promise->getAmpFuture()->ignore(); $failingPromiseCollection->unwatchPromise($promise); return $promise; diff --git a/tests/Adapter/Amp/LoopReset.php b/tests/Adapter/Amp/LoopReset.php index f643f25..6e31871 100644 --- a/tests/Adapter/Amp/LoopReset.php +++ b/tests/Adapter/Amp/LoopReset.php @@ -5,6 +5,7 @@ use PHPUnit\Framework\Test; use PHPUnit\Framework\TestListener; use PHPUnit\Framework\TestListenerDefaultImplementation; +use Revolt\EventLoop\Driver\StreamSelectDriver; // From https://github.com/amphp/phpunit-util/blob/master/src/LoopReset.php class LoopReset implements TestListener @@ -13,7 +14,5 @@ class LoopReset implements TestListener public function endTest(Test $test, float $time): void { - \Amp\Loop::set((new \Amp\Loop\DriverFactory())->create()); - gc_collect_cycles(); // extensions using an event loop may otherwise leak the file descriptors to the loop } }