diff --git a/composer-require-check.json b/composer-require-check.json index 0d0c4fc..621bf16 100644 --- a/composer-require-check.json +++ b/composer-require-check.json @@ -39,6 +39,7 @@ "SPL", "standard", "hash", - "pcntl" + "pcntl", + "posix" ] } diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php new file mode 100644 index 0000000..b678f9a --- /dev/null +++ b/src/Context/ForkContext.php @@ -0,0 +1,169 @@ + + */ +final class ForkContext extends AbstractContext +{ + private const DEFAULT_START_TIMEOUT = 5; + + public static function isSupported(): bool + { + return \function_exists('pcntl_fork') && EventLoop::getDriver() instanceof StreamSelectLoop; + } + + /** + * @param string|non-empty-list $script Path to PHP script or array with first element as path and + * following elements options to the PHP script (e.g.: ['bin/worker.php', 'Option1Value', 'Option2Value']). + * @param positive-int $childConnectTimeout Number of seconds the child will attempt to connect to the parent + * before failing. + * + * @throws ContextException If starting the process fails. + */ + public static function start( + IpcHub $ipcHub, + string|array $script, + ?Cancellation $cancellation = null, + int $childConnectTimeout = self::DEFAULT_START_TIMEOUT, + Serializer $serializer = new NativeSerializer(), + ): self { + $key = $ipcHub->generateKey(); + + // Fork + if (($pid = \pcntl_fork()) < 0) { + throw new ContextException("Forking failed: " . \posix_strerror(\posix_get_last_error())); + } + + // Parent + if ($pid > 0) { + try { + $socket = $ipcHub->accept($key, $cancellation); + $ipcChannel = new StreamChannel($socket, $socket, $serializer); + + $socket = $ipcHub->accept($key, $cancellation); + $resultChannel = new StreamChannel($socket, $socket, $serializer); + } catch (\Throwable $exception) { + $cancellation?->throwIfRequested(); + + throw new ContextException("Connecting failed after forking", previous: $exception); + } + + return new self($pid, $ipcChannel, $resultChannel); + } + + // Child + \define("AMP_CONTEXT", "fork"); + \define("AMP_CONTEXT_ID", \getmypid()); + + if (\is_string($script)) { + $script = [$script]; + } + + $connectCancellation = new TimeoutCancellation((float) $childConnectTimeout); + Internal\runContext($ipcHub->getUri(), $key, $connectCancellation, $script, $serializer); + + exit(0); + } + + private ?int $exited = null; + + private bool $weKilled = false; + + /** + * @param StreamChannel $ipcChannel + */ + private function __construct( + private readonly int $pid, + StreamChannel $ipcChannel, + StreamChannel $resultChannel, + ) { + parent::__construct($ipcChannel, $resultChannel); + } + + public function __destruct() + { + $this->close(); + } + + public function receive(?Cancellation $cancellation = null): mixed + { + $this->checkExit(false); + + return parent::receive($cancellation); + } + + public function send(mixed $data): void + { + $this->checkExit(false); + + parent::send($data); + } + + private function checkExit(bool $wait): ?int + { + if ($this->exited === null) { + if (\pcntl_waitpid($this->pid, $status, $wait ? 0 : \WNOHANG) === 0) { + return null; + } + + $this->exited = match (true) { + \pcntl_wifsignaled($status) => \pcntl_wtermsig($status), + \pcntl_wifexited($status) => \pcntl_wexitstatus($status) - 128, + \pcntl_wifstopped($status) => \pcntl_wstopsig($status), + default => -1, + }; + } + + if (!$this->weKilled && $this->exited > 0) { + throw new ContextException("Worker exited due to signal {$this->exited}", $this->exited); + } + + return $this->exited; + } + + public function close(): void + { + if ($this->checkExit(false) === null) { + $this->weKilled = true; + \posix_kill($this->pid, \SIGKILL); + + $this->checkExit(true); + } + + parent::close(); + } + + public function join(?Cancellation $cancellation = null): mixed + { + $data = $this->receiveExitResult($cancellation); + + $this->close(); + + return $data->getResult(); + } +} diff --git a/src/Context/ForkContextFactory.php b/src/Context/ForkContextFactory.php new file mode 100644 index 0000000..d6665d6 --- /dev/null +++ b/src/Context/ForkContextFactory.php @@ -0,0 +1,49 @@ + $script + * + * @throws ContextException + */ + public function start(string|array $script, ?Cancellation $cancellation = null): ForkContext + { + return ForkContext::start( + ipcHub: $this->ipcHub, + script: $script, + cancellation: $cancellation, + childConnectTimeout: $this->childConnectTimeout, + ); + } +} diff --git a/src/Context/Internal/functions.php b/src/Context/Internal/functions.php index e0f130d..7dcaf4d 100644 --- a/src/Context/Internal/functions.php +++ b/src/Context/Internal/functions.php @@ -6,79 +6,81 @@ use Amp\Cancellation; use Amp\Future; use Amp\Parallel\Ipc; +use Amp\Serialization\NativeSerializer; use Amp\Serialization\SerializationException; -use Revolt\EventLoop; +use Amp\Serialization\Serializer; /** @internal */ -function runContext(string $uri, string $key, Cancellation $connectCancellation, array $argv): void -{ - EventLoop::queue(function () use ($argv, $uri, $key, $connectCancellation): void { - /** @noinspection PhpUnusedLocalVariableInspection */ - $argc = \count($argv); +function runContext( + string $uri, + string $key, + Cancellation $connectCancellation, + array $argv, + Serializer $serializer = new NativeSerializer(), +): void { + /** @noinspection PhpUnusedLocalVariableInspection */ + $argc = \count($argv); - try { - $socket = Ipc\connect($uri, $key, $connectCancellation); - $ipcChannel = new StreamChannel($socket, $socket); - - $socket = Ipc\connect($uri, $key, $connectCancellation); - $resultChannel = new StreamChannel($socket, $socket); - } catch (\Throwable $exception) { - \trigger_error($exception->getMessage(), E_USER_ERROR); - } - - try { - if (!isset($argv[0])) { - throw new \Error("No script path given"); - } + try { + $socket = Ipc\connect($uri, $key, $connectCancellation); + $ipcChannel = new StreamChannel($socket, $socket, $serializer); - if (!\is_file($argv[0])) { - throw new \Error(\sprintf( - "No script found at '%s' (be sure to provide the full path to the script)", - $argv[0], - )); - } + $socket = Ipc\connect($uri, $key, $connectCancellation); + $resultChannel = new StreamChannel($socket, $socket, $serializer); + } catch (\Throwable $exception) { + \trigger_error($exception->getMessage(), E_USER_ERROR); + } - try { - // Protect current scope by requiring script within another function. - // Using $argc, so it is available to the required script. - $callable = (function () use ($argc, $argv): callable { - /** @psalm-suppress UnresolvableInclude */ - return require $argv[0]; - })(); - } catch (\TypeError $exception) { - throw new \Error(\sprintf( - "Script '%s' did not return a callable function: %s", - $argv[0], - $exception->getMessage(), - ), 0, $exception); - } catch (\ParseError $exception) { - throw new \Error(\sprintf( - "Script '%s' contains a parse error: %s", - $argv[0], - $exception->getMessage(), - ), 0, $exception); - } + try { + if (!isset($argv[0])) { + throw new \Error("No script path given"); + } - $returnValue = $callable(new ContextChannel($ipcChannel)); - $result = new ExitSuccess($returnValue instanceof Future ? $returnValue->await() : $returnValue); - } catch (\Throwable $exception) { - $result = new ExitFailure($exception); + if (!\is_file($argv[0])) { + throw new \Error(\sprintf( + "No script found at '%s' (be sure to provide the full path to the script)", + $argv[0], + )); } try { - try { - $resultChannel->send($result); - } catch (SerializationException $exception) { - // Serializing the result failed. Send the reason why. - $resultChannel->send(new ExitFailure($exception)); - } - } catch (\Throwable $exception) { - \trigger_error(\sprintf( - "Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent", + // Protect current scope by requiring script within another function. + // Using $argc, so it is available to the required script. + $callable = (function () use ($argc, $argv): callable { + /** @psalm-suppress UnresolvableInclude */ + return require $argv[0]; + })(); + } catch (\TypeError $exception) { + throw new \Error(\sprintf( + "Script '%s' did not return a callable function: %s", + $argv[0], + $exception->getMessage(), + ), 0, $exception); + } catch (\ParseError $exception) { + throw new \Error(\sprintf( + "Script '%s' contains a parse error: %s", + $argv[0], $exception->getMessage(), - ), E_USER_ERROR); + ), 0, $exception); } - }); - EventLoop::run(); + $returnValue = $callable(new ContextChannel($ipcChannel)); + $result = new ExitSuccess($returnValue instanceof Future ? $returnValue->await() : $returnValue); + } catch (\Throwable $exception) { + $result = new ExitFailure($exception); + } + + try { + try { + $resultChannel->send($result); + } catch (SerializationException $exception) { + // Serializing the result failed. Send the reason why. + $resultChannel->send(new ExitFailure($exception)); + } + } catch (\Throwable $exception) { + \trigger_error(\sprintf( + "Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent", + $exception->getMessage(), + ), E_USER_ERROR); + } } diff --git a/src/Context/Internal/process-runner.php b/src/Context/Internal/process-runner.php index d10a60f..ae6f94d 100644 --- a/src/Context/Internal/process-runner.php +++ b/src/Context/Internal/process-runner.php @@ -85,5 +85,7 @@ \trigger_error($exception->getMessage(), E_USER_ERROR); } - runContext($uri, $key, $cancellation, $argv); + EventLoop::queue(runContext(...), $uri, $key, $cancellation, $argv); + + EventLoop::run(); })(); diff --git a/src/Context/ThreadContext.php b/src/Context/ThreadContext.php index ef3544d..6f942a4 100644 --- a/src/Context/ThreadContext.php +++ b/src/Context/ThreadContext.php @@ -104,7 +104,9 @@ public static function start( // such as select() will not be interrupted. })); - Internal\runContext($uri, $key, new TimeoutCancellation($connectTimeout), $argv); + EventLoop::queue(Internal\runContext(...), $uri, $key, new TimeoutCancellation($connectTimeout), $argv); + + EventLoop::run(); return 0; // @codeCoverageIgnoreEnd diff --git a/test/Context/ForkContextTest.php b/test/Context/ForkContextTest.php new file mode 100644 index 0000000..47604a1 --- /dev/null +++ b/test/Context/ForkContextTest.php @@ -0,0 +1,37 @@ +markTestSkipped('Not supported on the current platform/driver'); + } + + return (new ForkContextFactory())->start($script); + } + + public function testThrowingProcessOnReceive(): void + { + // tmp + $this->expectNotToPerformAssertions(); + } + + public function testThrowingProcessOnSend(): void + { + // tmp + $this->expectNotToPerformAssertions(); + } + + public function testImmediateJoin(): void + { + // tmp + $this->expectNotToPerformAssertions(); + } +}