Skip to content

Commit

Permalink
feat: bump amp to v3
Browse files Browse the repository at this point in the history
  • Loading branch information
Yokann committed Aug 23, 2023
1 parent a9546b7 commit b33e40f
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:

strategy:
matrix:
php: [ '7.3', '7.4', '8.0' ]
php: [ '8.1' ]

steps:
- name: 'Init repository'
Expand Down
8 changes: 4 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@
}
},
"require": {
"php": "^7.3|^8.0",
"php": "^8.1",
"psr/http-message": "^1.0"
},
"require-dev": {
"phpunit/phpunit": "^9.4.4",
"amphp/amp": "^2.0",
"amphp/amp": "^3.0",
"guzzlehttp/guzzle": "^6.3",
"m6web/php-cs-fixer-config": "^2.0",
"ext-curl": "^7.3|^8.0",
"ext-curl": "^8.0",
"react/event-loop": "^1.0",
"react/promise": "^2.7",
"phpstan/phpstan": "^0.12",
"phpstan/phpstan": "^1.10",
"symfony/http-client": "^4.3",
"psr/http-factory": "^1.0",
"http-interop/http-factory-guzzle": "^1.0"
Expand Down
132 changes: 63 additions & 69 deletions src/Adapter/Amp/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace M6Web\Tornado\Adapter\Amp;

use Amp\Future;
use M6Web\Tornado\Adapter\Common;
use M6Web\Tornado\Deferred;
use M6Web\Tornado\Promise;
Expand All @@ -14,25 +15,24 @@ class EventLoop implements \M6Web\Tornado\EventLoop
public function wait(Promise $promise)
{
try {
$result = \Amp\Promise\wait(
Internal\PromiseWrapper::toHandledPromise($promise, $this->unhandledFailingPromises)->getAmpPromise()
);
$result = \Amp\Future\await([Internal\PromiseWrapper::toHandledPromise($promise, $this->unhandledFailingPromises)->getAmpFuture()]);
$this->unhandledFailingPromises->throwIfWatchedFailingPromiseExists();

return $result;
return $result[0] ?? null;
} catch (\Error $error) {
// Modify exceptions sent by Amp itself
if ($error->getCode() !== 0) {
throw $error;
}
switch ($error->getMessage()) {
case 'Loop stopped without resolving the promise':
throw new \Error('Impossible to resolve the promise, no more task to execute.', 0, $error);
case 'Loop exceptionally stopped without resolving the promise':
throw $error->getPrevious() ?? $error;
default:
throw $error;

if (str_starts_with($error->getMessage(), 'Event loop terminated without resuming the current suspension')) {
throw new \Error('Impossible to resolve the promise, no more task to execute.', 0, $error);
}

throw match ($error->getMessage()) {
'Loop exceptionally stopped without resolving the promise' => $error->getPrevious() ?? $error,
default => $error,
};
}
}

Expand All @@ -41,7 +41,7 @@ public function wait(Promise $promise)
*/
public function async(\Generator $generator): Promise
{
$wrapper = function (\Generator $generator, \Amp\Deferred $deferred): \Generator {
$wrapper = function (\Generator $generator, \Amp\DeferredFuture $deferred) {
try {
while ($generator->valid()) {
$blockingPromise = $generator->current();
Expand All @@ -51,13 +51,13 @@ public function async(\Generator $generator): Promise
$blockingPromise = Internal\PromiseWrapper::toHandledPromise(
$blockingPromise,
$this->unhandledFailingPromises
)->getAmpPromise();
)->getAmpFuture();

// Forwards promise value/exception to underlying generator
$blockingPromiseValue = null;
$blockingPromiseException = null;
try {
$blockingPromiseValue = yield $blockingPromise;
$blockingPromiseValue = $blockingPromise->await();
} catch (\Throwable $throwable) {
$blockingPromiseException = $throwable;
}
Expand All @@ -68,39 +68,36 @@ public function async(\Generator $generator): Promise
}
}
} catch (\Throwable $throwable) {
$deferred->fail($throwable);
$deferred->error($throwable);

return;
}

$deferred->resolve($generator->getReturn());
$deferred->complete($generator->getReturn());
};

$deferred = new \Amp\Deferred();
\Amp\Promise\rethrow(new \Amp\Coroutine($wrapper($generator, $deferred)));
$deferred = new \Amp\DeferredFuture();
\Amp\async(fn() => $wrapper($generator, $deferred));

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises);
}

/**
* {@inheritdoc}
*/
public function promiseAll(Promise ...$promises): Promise
{
return Internal\PromiseWrapper::createUnhandled(
\Amp\Promise\all(
array_map(
function (Promise $promise) {
return Internal\PromiseWrapper::toHandledPromise(
$promise,
$this->unhandledFailingPromises
)->getAmpPromise();
},
$promises
)
),
$this->unhandledFailingPromises
);
$generator = function() use ($promises): \Generator {
$result = [];

foreach ($promises as $promise) {
$result[] = yield Internal\PromiseWrapper::toHandledPromise($promise, $this->unhandledFailingPromises);
}

return $result;
};

return $this->async($generator());
}

/**
Expand All @@ -125,84 +122,83 @@ public function promiseRace(Promise ...$promises): Promise
return $this->promiseFulfilled(null);
}

$deferred = new \Amp\Deferred();
$deferred = new \Amp\DeferredFuture();
$isFirstPromise = true;

$wrapPromise = function (\Amp\Promise $promise) use ($deferred, &$isFirstPromise): \Generator {
$wrapPromise = function (\Amp\Future $future) use ($deferred, &$isFirstPromise) {
try {
$result = yield $promise;
$result = $future->await();
if ($isFirstPromise) {
$isFirstPromise = false;
$deferred->resolve($result);
$deferred->complete($result);
}
} catch (\Throwable $throwable) {
if ($isFirstPromise) {
$isFirstPromise = false;
$deferred->fail($throwable);
$deferred->error($throwable);
}
}
};

$promises = array_map(
$futures = array_map(
function (Promise $promise) {
return Internal\PromiseWrapper::toHandledPromise(
$promise,
$this->unhandledFailingPromises
)->getAmpPromise();
)->getAmpFuture();
},
$promises
);

foreach ($promises as $index => $promise) {
\Amp\Promise\rethrow(new \Amp\Coroutine($wrapPromise($promise)));
foreach ($futures as $index => $future) {
\Amp\async(fn() => $wrapPromise($future));
}

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises);
}

/**
* {@inheritdoc}
*/
public function promiseFulfilled($value): Promise
{
return Internal\PromiseWrapper::createHandled(new \Amp\Success($value));
return Internal\PromiseWrapper::createHandled(Future::complete($value));
}

/**
* {@inheritdoc}
*/
public function promiseRejected(\Throwable $throwable): Promise
{
// Manually created promises are considered as handled.
return Internal\PromiseWrapper::createHandled(new \Amp\Failure($throwable));
return Internal\PromiseWrapper::createHandled(Future::error($throwable));
}

/**
* {@inheritdoc}
*/
public function idle(): Promise
{
$deferred = new \Amp\Deferred();
$deferred = new \Amp\DeferredFuture();

\Amp\Loop::defer(function () use ($deferred) {
$deferred->resolve();
\Revolt\EventLoop::defer(function () use ($deferred) {
$deferred->complete();
});

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises);
}

/**
* {@inheritdoc}
*/
public function delay(int $milliseconds): Promise
{
$deferred = new \Amp\Deferred();
$deferred = new \Amp\DeferredFuture();

\Amp\Loop::delay($milliseconds, function () use ($deferred) {
$deferred->resolve();
\Revolt\EventLoop::delay($milliseconds/1000, function () use ($deferred) {
$deferred->complete();
});

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises);
}

/**
Expand All @@ -211,9 +207,8 @@ public function delay(int $milliseconds): Promise
public function deferred(): Deferred
{
return new Internal\Deferred(
$deferred = new \Amp\Deferred(),
// Manually created promises are considered as handled.
Internal\PromiseWrapper::createHandled($deferred->promise())
$deferred = new \Amp\DeferredFuture(),
Internal\PromiseWrapper::createHandled($deferred->getFuture())
);
}

Expand All @@ -222,42 +217,41 @@ public function deferred(): Deferred
*/
public function readable($stream): Promise
{
$deferred = new \Amp\Deferred();
$deferred = new \Amp\DeferredFuture();

\Amp\Loop::onReadable(
\Revolt\EventLoop::onReadable(
$stream,
function ($watcherId, $stream) use ($deferred) {
\Amp\Loop::cancel($watcherId);
$deferred->resolve($stream);
\Revolt\EventLoop::cancel($watcherId);
$deferred->complete($stream);
}
);

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises);
}

/**
* {@inheritdoc}
*/
public function writable($stream): Promise
{
$deferred = new \Amp\Deferred();
$deferred = new \Amp\DeferredFuture();

\Amp\Loop::onWritable(
\Revolt\EventLoop::onWritable(
$stream,
function ($watcherId, $stream) use ($deferred) {
\Amp\Loop::cancel($watcherId);
$deferred->resolve($stream);
\Revolt\EventLoop::cancel($watcherId);
$deferred->complete($stream);
}
);

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
return Internal\PromiseWrapper::createUnhandled($deferred->getFuture(), $this->unhandledFailingPromises);
}

public function __construct()
{
$this->unhandledFailingPromises = new Common\Internal\FailingPromiseCollection();
}

/** @var Common\Internal\FailingPromiseCollection */
private $unhandledFailingPromises;
private Common\Internal\FailingPromiseCollection $unhandledFailingPromises;
}
13 changes: 5 additions & 8 deletions src/Adapter/Amp/Internal/Deferred.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@
*/
class Deferred implements \M6Web\Tornado\Deferred
{
/** @var \Amp\Deferred */
private $ampDeferred;
private \Amp\DeferredFuture $ampDeferred;
private PromiseWrapper $promise;

/** @var PromiseWrapper */
private $promise;

public function __construct(\Amp\Deferred $ampDeferred, PromiseWrapper $promise)
public function __construct(\Amp\DeferredFuture $ampDeferred, PromiseWrapper $promise)
{
$this->ampDeferred = $ampDeferred;
$this->promise = $promise;
Expand All @@ -40,14 +37,14 @@ public function getPromiseWrapper(): PromiseWrapper
*/
public function resolve($value): void
{
$this->ampDeferred->resolve($value);
$this->ampDeferred->complete($value);
}

/**
* {@inheritdoc}
*/
public function reject(\Throwable $throwable): void
{
$this->ampDeferred->fail($throwable);
$this->ampDeferred->error($throwable);
}
}
Loading

0 comments on commit b33e40f

Please sign in to comment.