diff --git a/.github/workflows/phpunit.yml b/.github/workflows/phpunit.yml index 5bc1c79..bf06439 100644 --- a/.github/workflows/phpunit.yml +++ b/.github/workflows/phpunit.yml @@ -20,7 +20,7 @@ jobs: uses: shivammathur/setup-php@v2 with: php-version: '8.3' - extensions: posix,sockets,pcntl,openssl,parallel + extensions: posix,sockets,pcntl,openssl,parallel,curl env: phpts: ts diff --git a/.gitignore b/.gitignore index 60df71b..a645c4c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .idea p-ripple-core.iml +/build/ /vendor/ composer.lock diff --git a/composer.json b/composer.json index f3888ae..b5fa8a6 100644 --- a/composer.json +++ b/composer.json @@ -35,8 +35,10 @@ }, "require-dev": { "ext-curl": "*", + "ext-parallel": "*", "phpunit/phpunit": "^11.2", - "friendsofphp/php-cs-fixer": "*", - "ext-parallel": "*" - } + "friendsofphp/php-cs-fixer": "*" + }, + "minimum-stability": "dev", + "prefer-stable": true } diff --git a/example/parallel.php b/example/parallel.php index f3f585d..335c25d 100644 --- a/example/parallel.php +++ b/example/parallel.php @@ -32,21 +32,4 @@ * 由于软件或软件的使用或其他交易而引起的任何索赔、损害或其他责任承担责任。 */ -use parallel\Runtime; - include_once __DIR__ . '/../vendor/autoload.php'; - - -$channel = parallel\Channel::make('channel'); -$thread = new Runtime(); - -$thread->run(static function ($channel) { - \sleep(1); - $channel->send(true); -}, [$channel]); - - -while(1) { - \var_dump($channel->recv()); - \sleep(1); -} diff --git a/example/process.php b/example/process.php index 0df5769..b818aa0 100644 --- a/example/process.php +++ b/example/process.php @@ -32,14 +32,25 @@ * 由于软件或软件的使用或其他交易而引起的任何索赔、损害或其他责任承担责任。 */ -include_once __DIR__ .'/../vendor/autoload.php'; +use P\System; +use function P\async; +use function P\await; +use function P\defer; +use function P\tick; +include_once __DIR__ . '/../vendor/autoload.php'; -$task = \P\System::Process()->task(function () { - \P\repeat(function () {}, 1); +$code = \mt_rand(0, 255); +$async = async(function () use ($code) { + $task = System::Process()->task(function () use ($code) { + \P\sleep(1); + defer(function () use ($code) { + exit($code); + }); + }); + $runtime = $task->run(); + return await($runtime->getPromise()); }); - -$task->run(); - -\P\tick(); +\var_dump($code, $async->await()); +tick(); diff --git a/example/thread.php b/example/thread.php index a9c005c..4a1fb14 100644 --- a/example/thread.php +++ b/example/thread.php @@ -43,6 +43,10 @@ $thread = thread(static function ($context) { return \file_get_contents(__FILE__); }); + + $thread->run()->onValue(static function ($value) { + echo \strlen($value), \PHP_EOL; + }); } tick(); diff --git a/phpunit.xml b/phpunit.xml index 845fff5..e3f310d 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -32,13 +32,21 @@ ~ 由于软件或软件的使用或其他交易而引起的任何索赔、损害或其他责任承担责任。 --> - + tests - + + + + diff --git a/src/Core/Channel/Channel.php b/src/Core/Channel/Channel.php index ee86c79..febc5e1 100644 --- a/src/Core/Channel/Channel.php +++ b/src/Core/Channel/Channel.php @@ -234,7 +234,7 @@ public function close(): void $this->writeLock->close(); if ($this->owner) { - unlink($this->path); + file_exists($this->path) && unlink($this->path); } $this->closed = true; diff --git a/src/Core/Coroutine/Coroutine.php b/src/Core/Coroutine/Coroutine.php index 7212018..655170d 100644 --- a/src/Core/Coroutine/Coroutine.php +++ b/src/Core/Coroutine/Coroutine.php @@ -44,7 +44,7 @@ use function P\delay; use function P\registerForkHandler; -use function P\run; +use function P\tick; use function spl_object_hash; /** @@ -211,7 +211,6 @@ public function async(Closure $closure): Promise $this->handleEscapeException($exception); } - if ($fiber->isTerminated()) { try { $result = $fiber->getReturn(); @@ -270,7 +269,7 @@ public function sleep(float|int $second): void try { // 尝试恢复Fiber运行 $fiber->resume(); - } catch (EscapeException) { + } catch (EscapeException $exception) { // 恢复运行过程发生逃逸异常 $this->handleEscapeException($exception); } catch (Throwable $e) { @@ -332,19 +331,12 @@ public function getCoroutine(): array|null */ public function handleEscapeException(EscapeException $exception): void { - if (!Fiber::getCurrent()) { + if (!Fiber::getCurrent() || !$this->isCoroutine()) { $this->fiber2callback = array(); - - run(); + tick(); exit(0); - } - - if ($this->isCoroutine()) { - throw $exception; } else { - $this->fiber2callback = array(); - - Fiber::suspend(); + throw $exception; } } } diff --git a/src/Core/Parallel/Parallel.php b/src/Core/Parallel/Parallel.php index 0451408..b243d0d 100644 --- a/src/Core/Parallel/Parallel.php +++ b/src/Core/Parallel/Parallel.php @@ -34,6 +34,10 @@ namespace Psc\Core\Parallel; +if (!extension_loaded('parallel')) { + return; +} + use Closure; use Composer\Autoload\ClassLoader; use parallel\Events; @@ -48,6 +52,7 @@ use function array_shift; use function count; use function dirname; +use function extension_loaded; use function file_exists; use function intval; use function is_int; @@ -69,6 +74,8 @@ * 2024-08-07 * 0x00 允许保留USR2信号,以便在主线程中执行并行代码 * 0x01 用独立线程监听计数指令向主进程发送信号,原子性保留主进程events::poll的堵塞机制 + * + * PHP版本:8.3.0-8.3.8存在内存泄漏 */ class Parallel extends LibraryAbstract { @@ -84,17 +91,14 @@ class Parallel extends LibraryAbstract /*** @var Events */ private Events $events; - /*** @var Thread[] */ - private array $threads = []; - /*** @var Future[] */ private array $futures = []; /** - * 递归索引 + * 索引 * @var int */ - private int $index = 0; + private int $index; /** * 事件分发线程 @@ -112,7 +116,7 @@ class Parallel extends LibraryAbstract * 事件计数通道 * @var Channel */ - public Channel $counterChannel; + private Channel $counterChannel; /** * 事件计数标量 @@ -132,7 +136,7 @@ class Parallel extends LibraryAbstract /** * Parallel constructor. */ - public function __construct() + protected function __construct() { $this->initialize(); } @@ -164,34 +168,37 @@ private function initialize(): void // 初始化事件处理器 $this->events = new Events(); $this->events->setBlocking(true); + + $this->index = 0; + $this->registerForkHandler(); } /** * @return void */ - public function initializeCounter(): void + private function initializeCounter(): void { - if(isset($this->counterRuntime) && !$this->counterFuture->done()) { - return; - } - if(!isset($this->counterChannel)) { $this->counterChannel = $this->makeChannel('counter'); } + if (isset($this->counterFuture) && !$this->counterFuture->done()) { + return; + } + // 初始化标量同步器 $this->eventScalar = new Sync(0); // 初始化标量 $this->initScalar = new Sync(false); - $this->counterRuntime = new Runtime(); - $this->counterFuture = $this->counterRuntime->run(static function (\parallel\Channel $channel, Sync $eventScalar, Sync $initScalar) { + $this->counterFuture = $this->counterRuntime->run(static function ($channel, $eventScalar, $initScalar) { $initScalar(function () use ($initScalar) { while(!$initScalar->get()) { $initScalar->wait(); } }); + $processId = posix_getpid(); $count = 0; while($number = $channel->recv()) { @@ -203,12 +210,19 @@ public function initializeCounter(): void break; } } + return true; }, [$this->counterChannel->channel, $this->eventScalar,$this->initScalar]); - defer(function () { + //不可在信号处理器未注册前解锁 + if (EventLoop::getDriver()->isRunning()) { $this->initScalar->set(true); $this->initScalar->notify(); - }); + } else { + defer(function () { + $this->initScalar->set(true); + $this->initScalar->notify(); + }); + } } /** @@ -237,10 +251,9 @@ private function poll(): void case Events\Event\Type::Kill: case Events\Event\Type::Error: if(isset($this->futures[$event->source])) { + $this->counterChannel->send(-1); $this->futures[$event->source]->onEvent($event); unset($this->futures[$event->source]); - unset($this->threads[$event->source]); - $this->counterChannel->send(-1); } break; case Events\Event\Type::Read: @@ -248,12 +261,11 @@ private function poll(): void $name = $event->source; if($this->futures[$name] ?? null) { try { + $this->counterChannel->send(-1); $this->futures[$name]->resolve(); } catch (Throwable) { } finally { unset($this->futures[$name]); - unset($this->threads[$name]); - $this->counterChannel->send(-1); } } } @@ -261,7 +273,6 @@ private function poll(): void } } } - if (empty($this->futures)) { $this->unregisterSignalHandler(); while($callback = array_shift($this->onBusy)) { @@ -270,22 +281,10 @@ private function poll(): void } } - private array $onBusy = []; - /** - * @param Thread $thread - * @param ...$argv - * @return Future + * @var Closure[] */ - public function run(Thread $thread, ...$argv): Future - { - $this->registerSignalHandler(); - $this->initializeCounter(); - $future = $thread(...$argv); - $this->futures[$thread->name] = $future; - $this->events->addFuture($thread->name, $future->future); - return $future; - } + private array $onBusy = []; /** * @return void @@ -315,8 +314,21 @@ private function unregisterSignalHandler(): void cancel($this->signalHandlerId); unset($this->signalHandlerId); - $this->counterChannel->send(-1); + $this->counterChannel->close(); + + try { + $this->counterFuture->value(); + } catch (Throwable) { + // ignore + } + + $this->counterRuntime->kill(); + unset($this->counterChannel); + unset($this->counterFuture); + unset($this->counterRuntime); + unset($this->eventScalar); + unset($this->initScalar); } /** @@ -329,7 +341,6 @@ private function registerForkHandler(): void /*** @var int $cpuCount*/ /*** @var string $autoload*/ /*** @var Events $events*/ - /*** @var Thread[] $threads*/ /*** @var Future[] $futures*/ /*** @var int $index*/ /*** @var Runtime $counterRuntime*/ @@ -337,29 +348,26 @@ private function registerForkHandler(): void /*** @var Channel $counterChannel */ /*** @var Sync $eventScalar */ /*** @var Sync $initScalar */ - - foreach ($this->threads as $key => $thread) { - $thread->kill(); - $this->futures[$key]?->cancel(); - unset($this->threads[$key]); - unset($this->futures[$key]); - $this->events->remove($key); - } - unset($this->signalHandlerId); + unset($this->events); + $this->initialize(); }); } /** - * @param Closure $closure - * @return Thread + * @return void */ - public function thread(Closure $closure): Thread + public function wait(): void { - $name = strval($this->index++); - $thread = new Thread($closure, $name); - $this->threads[$name] = $thread; - return $thread; + if(empty($this->futures)) { + return; + } + + $suspension = EventLoop::getSuspension(); + $this->onBusy[] = static function () use ($suspension) { + $suspension->resume(); + }; + $suspension->suspend(); } /** @@ -384,18 +392,39 @@ public function makeChannel(string $name, ?int $capacity = null): Channel } /** - * @return void + * @param Closure $closure + * @return Thread */ - public function wait(): void + public function thread(Closure $closure): Thread { - if(empty($this->futures)) { - return; - } + $name = strval($this->index++); + $thread = new Thread($closure, $name); + return $thread; + } - $suspension = EventLoop::getSuspension(); - $this->onBusy[] = function () use ($suspension) { - $suspension->resume(); - }; - $suspension->suspend(); + /** + * @param Thread $thread + * @param ...$argv + * @return Future + */ + public function run(Thread $thread, ...$argv): Future + { + $this->registerSignalHandler(); + $this->initializeCounter(); + $future = $thread->__invoke(...$argv); + $this->futures[$thread->name] = $future; + $this->events->addFuture($thread->name, $future->future); + return $future; + } + + /** + * + */ + public function __destruct() + { + if (isset($this->counterChannel) && isset($this->counterFuture) && !$this->counterFuture->done()) { + $this->counterChannel->send(-1); + $this->counterChannel->close(); + } } } diff --git a/src/Core/Parallel/Thread.php b/src/Core/Parallel/Thread.php index 992ed9b..1d9cfe8 100644 --- a/src/Core/Parallel/Thread.php +++ b/src/Core/Parallel/Thread.php @@ -37,16 +37,11 @@ use Closure; use parallel\Runtime; -use function P\tick; - class Thread { /*** @var Runtime */ private readonly Runtime $runtime; - /*** @var Closure */ - private Closure $guide; - /*** @var Context */ private Context $context; @@ -59,15 +54,6 @@ public function __construct( public readonly string $name, ) { $this->runtime = new Runtime(Parallel::$autoload); - $this->guide = static function (Closure $handler, Context $context) { - $counterChannel = \parallel\Channel::open('counter'); - try { - return $handler($context); - } finally { - tick(); - $counterChannel->send(1); - } - }; $this->context = new Context(); } @@ -104,7 +90,15 @@ public function __invoke(mixed ...$argv): Future { $this->context->argv = $argv; $this->context->name = $this->name; - return new Future($this->runtime->run($this->guide, [ + + return new Future($this->runtime->run(static function (Closure $handler, Context $context) { + $counterChannel = \parallel\Channel::open('counter'); + try { + return $handler($context); + } finally { + $counterChannel->send(1); + } + }, [ $this->handler, $this->context, ])); diff --git a/src/Core/Process/Process.php b/src/Core/Process/Process.php index dd619c2..4a33af0 100644 --- a/src/Core/Process/Process.php +++ b/src/Core/Process/Process.php @@ -46,10 +46,9 @@ use Throwable; use function call_user_func; -use function count; use function P\cancel; use function P\cancelAll; -use function P\getIdentities; +use function P\defer; use function P\promise; use function P\tick; use function pcntl_fork; @@ -236,28 +235,20 @@ public function task(Closure $closure): Task|false * It is necessary to ensure that the final closure cannot be escaped by any means. */ - // Whether it belongs to the PRipple coroutine space - $isCoroutine = Coroutine::Coroutine()->isCoroutine(); - // forget all events cancelAll(); - // Handle recycling and new process mounting $this->forked(); - // call user mount - try { + // mouth user func + defer(function () use ($closure, $args) { call_user_func($closure, ...$args); - } catch (Throwable) { - exit(1); - } + }); - // Determine whether the event list is empty - if(count(getIdentities()) === 0) { - exit(0); - } - - if(!$isCoroutine) { + // Whether it belongs to the PRipple coroutine space + if(Coroutine::Coroutine()->isCoroutine()) { + throw new EscapeException('The process is abnormal.'); + } else { if(Fiber::getCurrent()) { Fiber::suspend(); } @@ -265,7 +256,19 @@ public function task(Closure $closure): Task|false exit(0); } - throw new EscapeException('The process is abnormal.'); + // // call user mount + // try { + // call_user_func($closure, ...$args); + // } catch (Throwable) { + // exit(1); + // } + // + // // Determine whether the event list is empty + // if(count(getIdentities()) === 0) { + // exit(0); + // } + // + // throw new EscapeException('The process is abnormal.'); } if(empty($this->process2runtime)) { @@ -303,24 +306,8 @@ public function onSignal(int $signalCode, Closure $handler): string /** * @return int */ - public function getProcessId(): int - { - return $this->processId; - } - - /** - * @return int - */ - public function getMainProcessId(): int + public function getRootProcessId(): int { return $this->rootProcessId; } - - /** - * @return bool - */ - public function isRootProcess(): bool - { - return $this->processId === $this->rootProcessId; - } } diff --git a/src/Kernel.php b/src/Kernel.php index 92035c7..4dab933 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -43,7 +43,6 @@ use Throwable; use function call_user_func; -use function count; use function extension_loaded; use function usleep; @@ -203,21 +202,25 @@ public function run(): void } } + private bool $running = false; + /** * @return bool */ public function tick(): mixed { - if (count($this->getIdentities()) === 0) { - //nothing to do - return false; + if ($this->running) { + $this->mainSuspension->resume(); } try { + $this->running = true; return $this->mainSuspension->suspend(); } catch (Throwable) { - $this->mainSuspension = EventLoop::getSuspension(); return false; + } finally { + $this->running = false; + $this->mainSuspension = EventLoop::getSuspension(); } } diff --git a/tests/CoroutineTest.php b/tests/CoroutineTest.php index 0e22a40..67c2ad7 100644 --- a/tests/CoroutineTest.php +++ b/tests/CoroutineTest.php @@ -92,10 +92,9 @@ public function test_coroutineStability(): void */ private function simulateWork(int $index): int { - // 模拟工作负载 if ($index % 100 === 0) { throw new RuntimeException("Simulated error in coroutine $index"); } - return $index * 2; // 返回模拟的计算结果 + return $index * 2; } } diff --git a/tests/HttpTest.php b/tests/HttpTest.php index a3fa097..c9cfa37 100644 --- a/tests/HttpTest.php +++ b/tests/HttpTest.php @@ -56,6 +56,7 @@ use function sys_get_temp_dir; use function tempnam; use function uniqid; +use function stream_context_create; class HttpTest extends TestCase { @@ -86,8 +87,13 @@ public function test_httpServer(): void cancelAll(); }); - - $server = Net::Http()->server('http://127.0.0.1:8008'); + $context = stream_context_create([ + 'socket' => [ + 'so_reuseport' => 1, + 'so_reuseaddr' => 1, + ], + ]); + $server = Net::Http()->server('http://127.0.0.1:8008', $context); $server->onRequest(function (Request $request, Response $response) { if($request->getRequestUri() === '/upload') { /** @@ -121,11 +127,11 @@ public function test_httpServer(): void * @return void * @throws Throwable */ - public function httpGet(): void + private function httpGet(): void { $hash = md5(uniqid()); $client = Plugin::Guzzle(); - $response = $client->get('http://127.0.0.1:8008/ss', [ + $response = $client->get('http://127.0.0.1:8008/', [ 'query' => [ 'query' => $hash, ], @@ -141,7 +147,7 @@ public function httpGet(): void * @return void * @throws Throwable */ - public function httpPost(): void + private function httpPost(): void { $hash = md5(uniqid()); $client = Plugin::Guzzle(); @@ -160,7 +166,7 @@ public function httpPost(): void * @return void * @throws GuzzleException */ - public function httpFile(): void + private function httpFile(): void { $client = Plugin::Guzzle(); $path = tempnam(sys_get_temp_dir(), 'test'); diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php new file mode 100644 index 0000000..ab0d040 --- /dev/null +++ b/tests/ParallelTest.php @@ -0,0 +1,70 @@ +task(function () use ($code) { + $thread = thread(static function ($context) { + sleep($context->argv[0]); + return $context->argv[1]; + }); + $future = $thread->run(1, $code); + $future->onValue(function ($value) { + exit($value); + }); + }); + $runtime = $task->run(); + $runtime->finally(function ($exitCode) use ($code) { + $this->assertEquals($code, $exitCode, 'Process exit code'); + }); + $runtime->await(); + } +} diff --git a/tests/ProcessTest.php b/tests/ProcessTest.php new file mode 100644 index 0000000..16424ff --- /dev/null +++ b/tests/ProcessTest.php @@ -0,0 +1,111 @@ +task(function () use ($code) { + \P\sleep(1); + exit($code); + }); + + $runtime = $task->run(); + $exitCode = $runtime->await(); + $this->assertEquals($code, $exitCode, 'Process exit code'); + } + + /** + * @return void + * @throws Throwable + */ + public function test_coroutine(): void + { + $code = mt_rand(0, 255); + $async = async(function () use ($code) { + $task = System::Process()->task(function () use ($code) { + \P\sleep(1); + defer(function () use ($code) { + exit($code); + }); + }); + $runtime = $task->run(); + return await($runtime->getPromise()); + }); + $exitCode = $async->await(); + $this->assertEquals($code, $exitCode, 'Process exit code'); + } + + /** + * @return void + * @throws Throwable + */ + public function test_parallel(): void + { + $code = mt_rand(0, 255); + $task = System::Process()->task(function () use ($code) { + $thread = thread(static function ($context) { + sleep($context->argv[0]); + return $context->argv[1]; + }); + $future = $thread->run(1, $code); + $future->onValue(function ($value) { + exit($value); + }); + }); + $runtime = $task->run(); + $runtime->finally(function ($exitCode) use ($code) { + $this->assertEquals($code, $exitCode, 'Process exit code'); + }); + $runtime->await(); + } +}