Skip to content

Commit

Permalink
Add a writer that dispatch in a bus each item as message (#64)
Browse files Browse the repository at this point in the history
* Replace MessageBusInterface mocks with test dummies

* Add an item writer that will dispatch in a bus each written item as a message

* Add DispatchEachItemAsMessageWriter in main doc

* Remove phpspec/prophecy-phpunit dev dependency from packages that are no longer using it
  • Loading branch information
yann-eugone authored May 31, 2022
1 parent 9460b82 commit 3f3acb0
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 39 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ composer require yokai/batch-symfony-messenger
This package provides:

- a [job launcher](docs/job-launcher.md) that uses messages to launch jobs
- a [writer](docs/dispatch-each-item-writer.md) that will write each item as a message


## Contribution
Expand Down
1 change: 0 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
}
},
"require-dev": {
"phpspec/prophecy-phpunit": "^2.0",
"phpunit/phpunit": "^9.5"
},
"autoload-dev": {
Expand Down
1 change: 1 addition & 0 deletions docs/dispatch-each-item-writer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
todo \Yokai\Batch\Bridge\Symfony\Messenger\Writer\DispatchEachItemAsMessageWriter
35 changes: 35 additions & 0 deletions src/Writer/DispatchEachItemAsMessageWriter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace Yokai\Batch\Bridge\Symfony\Messenger\Writer;

use Symfony\Component\Messenger\MessageBusInterface;
use Yokai\Batch\Exception\UnexpectedValueException;
use Yokai\Batch\Job\Item\ItemWriterInterface;
use Yokai\Batch\Job\JobExecutionAwareInterface;
use Yokai\Batch\Job\JobExecutionAwareTrait;

/**
* This {@see ItemWriterInterface} will consider each written item to be a message.
* Every item will be sent individually to a {@see MessageBusInterface}.
*/
final class DispatchEachItemAsMessageWriter implements ItemWriterInterface, JobExecutionAwareInterface
{
use JobExecutionAwareTrait;

public function __construct(
private MessageBusInterface $messageBus,
) {
}

public function write(iterable $items): void
{
foreach ($items as $item) {
if (!\is_object($item)) {
throw UnexpectedValueException::type('object', $item);
}
$this->messageBus->dispatch($item);
}
}
}
55 changes: 17 additions & 38 deletions tests/DispatchMessageJobLauncherTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,25 @@
namespace Yokai\Batch\Tests\Bridge\Symfony\Messenger;

use PHPUnit\Framework\TestCase;
use Prophecy\Argument;
use Prophecy\PhpUnit\ProphecyTrait;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\MessageBusInterface;
use Yokai\Batch\BatchStatus;
use Yokai\Batch\Bridge\Symfony\Messenger\DispatchMessageJobLauncher;
use Yokai\Batch\Bridge\Symfony\Messenger\LaunchJobMessage;
use Yokai\Batch\Factory\JobExecutionFactory;
use Yokai\Batch\Factory\UniqidJobExecutionIdGenerator;
use Yokai\Batch\Test\Factory\SequenceJobExecutionIdGenerator;
use Yokai\Batch\Test\Storage\InMemoryJobExecutionStorage;
use Yokai\Batch\Tests\Bridge\Symfony\Messenger\Dummy\BufferingMessageBus;
use Yokai\Batch\Tests\Bridge\Symfony\Messenger\Dummy\FailingMessageBus;

final class DispatchMessageJobLauncherTest extends TestCase
{
use ProphecyTrait;

public function testLaunch(): void
{
$messageBus = $this->prophesize(MessageBusInterface::class);
$messageAssertions = Argument::that(
static function ($message): bool {
return $message instanceof LaunchJobMessage
&& $message->getJobName() === 'testing'
&& $message->getConfiguration() === ['_id' => '123456789', 'foo' => ['bar']];
}
);
$messageBus->dispatch($messageAssertions)
->shouldBeCalled()
->willReturn(new Envelope(new LaunchJobMessage('unused')));

$jobLauncher = new DispatchMessageJobLauncher(
new JobExecutionFactory(new UniqidJobExecutionIdGenerator()),
$storage = new InMemoryJobExecutionStorage(),
$messageBus->reveal()
$messageBus = new BufferingMessageBus()
);

$jobExecutionFromLauncher = $jobLauncher->launch('testing', ['_id' => '123456789', 'foo' => ['bar']]);
Expand All @@ -51,26 +35,15 @@ static function ($message): bool {
self::assertSame('123456789', $jobExecutionFromStorage->getId());
self::assertSame(BatchStatus::PENDING, $jobExecutionFromStorage->getStatus()->getValue());
self::assertSame(['bar'], $jobExecutionFromStorage->getParameters()->get('foo'));
self::assertJobWasTriggered($messageBus, 'testing', ['_id' => '123456789', 'foo' => ['bar']]);
}

public function testLaunchWithNoId(): void
{
$messageBus = $this->prophesize(MessageBusInterface::class);
$messageAssertions = Argument::that(
static function ($message): bool {
return $message instanceof LaunchJobMessage
&& $message->getJobName() === 'testing'
&& $message->getConfiguration() === ['_id' => '123456789'];
}
);
$messageBus->dispatch($messageAssertions)
->shouldBeCalled()
->willReturn(new Envelope(new LaunchJobMessage('unused')));

$jobLauncher = new DispatchMessageJobLauncher(
new JobExecutionFactory(new SequenceJobExecutionIdGenerator(['123456789'])),
$storage = new InMemoryJobExecutionStorage(),
$messageBus->reveal()
$messageBus = new BufferingMessageBus()
);

$jobExecutionFromLauncher = $jobLauncher->launch('testing');
Expand All @@ -81,19 +54,15 @@ static function ($message): bool {
self::assertSame('testing', $jobExecutionFromStorage->getJobName());
self::assertSame('123456789', $jobExecutionFromStorage->getId());
self::assertSame(BatchStatus::PENDING, $jobExecutionFromStorage->getStatus()->getValue());
self::assertJobWasTriggered($messageBus, 'testing', ['_id' => '123456789']);
}

public function testLaunchAndMessengerFail(): void
{
$messageBus = $this->prophesize(MessageBusInterface::class);
$messageBus->dispatch(Argument::any())
->shouldBeCalled()
->willThrow(new TransportException('This is a test'));

$jobLauncher = new DispatchMessageJobLauncher(
new JobExecutionFactory(new UniqidJobExecutionIdGenerator()),
$storage = new InMemoryJobExecutionStorage(),
$messageBus->reveal()
new FailingMessageBus(new TransportException('This is a test'))
);

$jobExecutionFromLauncher = $jobLauncher->launch('testing');
Expand All @@ -108,4 +77,14 @@ public function testLaunchAndMessengerFail(): void
self::assertSame(TransportException::class, $failure->getClass());
self::assertSame('This is a test', $failure->getMessage());
}

private static function assertJobWasTriggered(BufferingMessageBus $bus, string $jobName, array $config): void
{
$messages = $bus->getMessages();
self::assertCount(1, $messages);
$message = $messages[0];
self::assertInstanceOf(LaunchJobMessage::class, $message);
self::assertSame($jobName, $message->getJobName());
self::assertSame($config, $message->getConfiguration());
}
}
39 changes: 39 additions & 0 deletions tests/Dummy/BufferingMessageBus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

declare(strict_types=1);

namespace Yokai\Batch\Tests\Bridge\Symfony\Messenger\Dummy;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;

final class BufferingMessageBus implements MessageBusInterface
{
/**
* @var Envelope[]
*/
private array $envelopes = [];

public function dispatch(object $message, array $stamps = []): Envelope
{
$this->envelopes[] = $envelope = new Envelope($message, $stamps);

return $envelope;
}

/**
* @return object[]
*/
public function getMessages(): array
{
return \array_map(fn (Envelope $envelope) => $envelope->getMessage(), $this->envelopes);
}

/**
* @return Envelope[]
*/
public function getEnvelopes(): array
{
return $this->envelopes;
}
}
9 changes: 9 additions & 0 deletions tests/Dummy/DummyMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Yokai\Batch\Tests\Bridge\Symfony\Messenger\Dummy;

final class DummyMessage
{
}
22 changes: 22 additions & 0 deletions tests/Dummy/FailingMessageBus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

declare(strict_types=1);

namespace Yokai\Batch\Tests\Bridge\Symfony\Messenger\Dummy;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\ExceptionInterface;
use Symfony\Component\Messenger\MessageBusInterface;

final class FailingMessageBus implements MessageBusInterface
{
public function __construct(
private ExceptionInterface $exception,
) {
}

public function dispatch(object $message, array $stamps = []): Envelope
{
throw $this->exception;
}
}
29 changes: 29 additions & 0 deletions tests/Writer/DispatchEachItemAsMessageWriterTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Yokai\Batch\Tests\Bridge\Symfony\Messenger\Writer;

use Yokai\Batch\Bridge\Symfony\Messenger\Writer\DispatchEachItemAsMessageWriter;
use PHPUnit\Framework\TestCase;
use Yokai\Batch\Exception\UnexpectedValueException;
use Yokai\Batch\Tests\Bridge\Symfony\Messenger\Dummy\BufferingMessageBus;
use Yokai\Batch\Tests\Bridge\Symfony\Messenger\Dummy\DummyMessage;

class DispatchEachItemAsMessageWriterTest extends TestCase
{
public function test(): void
{
$writer = new DispatchEachItemAsMessageWriter($messageBus = new BufferingMessageBus());
$writer->write([$message1 = new DummyMessage(), $message2 = new DummyMessage()]);
self::assertSame([$message1, $message2], $messageBus->getMessages());
}

public function testInvalidItemType(): void
{
$this->expectException(UnexpectedValueException::class);
$this->expectExceptionMessage('Expecting argument to be object, but got int.');
$writer = new DispatchEachItemAsMessageWriter(new BufferingMessageBus());
$writer->write([1]);
}
}

0 comments on commit 3f3acb0

Please sign in to comment.