Skip to content

Commit

Permalink
Merge pull request #17 from patchlevel/job-can-stop
Browse files Browse the repository at this point in the history
job can now stop the worker
  • Loading branch information
DavidBadura authored Mar 12, 2024
2 parents 81874e1 + 39d864e commit bd0828a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 17 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ final class WorkerCommand extends Command
$logger = new ConsoleLogger($output);

$worker = DefaultWorker::create(
function (): void {
function ($stop): void {
// do something

if (/* some condition */) {
$stop();
}
},
[
'runLimit' => $input->getOption('run-limit'),
Expand Down
9 changes: 0 additions & 9 deletions baseline.xml
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<files psalm-version="5.21.1@8c473e2437be8b6a8fd8f630f0f11a16b114c494">
<file src="src/DefaultWorker.php">
<PossiblyNullArgument>
<code><![CDATA[$options['runLimit']]]></code>
<code><![CDATA[$options['timeLimit']]]></code>
</PossiblyNullArgument>
<PossiblyUndefinedArrayOffset>
<code><![CDATA[$options['memoryLimit']]]></code>
<code><![CDATA[$options['runLimit']]]></code>
<code><![CDATA[$options['timeLimit']]]></code>
</PossiblyUndefinedArrayOffset>
<TypeDoesNotContainType>
<code><![CDATA[$this->shouldStop]]></code>
<code><![CDATA[$this->shouldStop]]></code>
Expand Down
18 changes: 11 additions & 7 deletions src/DefaultWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ final class DefaultWorker implements Worker
{
private bool $shouldStop = false;

/** @param Closure(Closure):void $job */
public function __construct(
private readonly Closure $job,
private readonly EventDispatcherInterface $eventDispatcher,
private readonly LoggerInterface|null $logger = null,
) {
}

/** @param int $sleepTimer in milliseconds */
/** @param positive-int|0 $sleepTimer in milliseconds */
public function run(int $sleepTimer = 1000): void
{
$this->logger?->debug('Worker starting');
Expand All @@ -45,7 +46,7 @@ public function run(int $sleepTimer = 1000): void

$startTime = (int)round(microtime(true) * 1000);

($this->job)();
($this->job)($this->stop(...));

$endTime = (int)round(microtime(true) * 1000);
$ranTime = $endTime - $startTime;
Expand Down Expand Up @@ -81,28 +82,31 @@ public function stop(): void
$this->shouldStop = true;
}

/** @param array{runLimit?: (positive-int|null), memoryLimit?: (string|null), timeLimit?: (positive-int|null)} $options */
/**
* @param Closure(Closure):void $job
* @param array{runLimit?: (positive-int|null), memoryLimit?: (string|null), timeLimit?: (positive-int|null)} $options
*/
public static function create(
Closure $job,
array $options,
array $options = [],
LoggerInterface $logger = new NullLogger(),
): self {
$eventDispatcher = new EventDispatcher();
$eventDispatcher->addSubscriber(new StopWorkerOnSigtermSignalListener($logger));

if ($options['runLimit'] ?? null) {
if (isset($options['runLimit'])) {
$eventDispatcher->addSubscriber(
new StopWorkerOnIterationLimitListener($options['runLimit'], $logger),
);
}

if ($options['memoryLimit'] !== null) {
if (isset($options['memoryLimit'])) {
$eventDispatcher->addSubscriber(
new StopWorkerOnMemoryLimitListener(Bytes::parseFromString($options['memoryLimit']), $logger),
);
}

if ($options['timeLimit'] ?? null) {
if (isset($options['timeLimit'])) {
$eventDispatcher->addSubscriber(
new StopWorkerOnTimeLimitListener($options['timeLimit'], $logger),
);
Expand Down
26 changes: 26 additions & 0 deletions tests/Unit/DefaultWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,30 @@ static function (array $args) {
$worker = new DefaultWorker(static fn () => null, $evenDispatcher->reveal(), $logger->reveal());
$worker->run(200);
}

public function testJobStopWorker(): void
{
$evenDispatcher = $this->prophesize(EventDispatcherInterface::class);
$evenDispatcher->dispatch(Argument::type(WorkerStartedEvent::class))->shouldBeCalledTimes(1);
$evenDispatcher->dispatch(Argument::type(WorkerRunningEvent::class))->shouldBeCalledTimes(1);
$evenDispatcher->dispatch(Argument::type(WorkerStoppedEvent::class))->shouldBeCalledTimes(1);

$logger = $this->prophesize(LoggerInterface::class);
$logger->debug('Worker starting')->shouldBeCalledTimes(1);
$logger->debug('Worker starting job run')->shouldBeCalledTimes(1);
$logger->debug('Worker finished job run ({ranTime}ms)', Argument::any())->shouldBeCalledTimes(1);
$logger->debug('Worker received stop signal')->shouldBeCalledTimes(1);
$logger->debug('Worker stopped')->shouldBeCalledTimes(1);
$logger->debug('Worker terminated')->shouldBeCalledTimes(1);

$worker = new DefaultWorker(
static function ($stop): void {
$stop();
},
$evenDispatcher->reveal(),
$logger->reveal(),
);

$worker->run(0);
}
}

0 comments on commit bd0828a

Please sign in to comment.