diff --git a/composer.json b/composer.json
index 1b89bd3..301ec66 100644
--- a/composer.json
+++ b/composer.json
@@ -61,8 +61,9 @@
"phpunit/phpunit": "^5.3",
"phpunit/php-invoker": "^1.1.4",
"fabpot/php-cs-fixer": "^1.11",
- "bookdown/bookdown": "1.x-dev as 1.0.0",
- "tobiju/bookdown-bootswatch-templates": "1.0.x-dev"
+ "bookdown/bookdown": "^1.0",
+ "tobiju/bookdown-bootswatch-templates": "1.1.1",
+ "react/promise": "^2.5"
},
"suggest": {
},
diff --git a/doc/configuration_reference.md b/doc/configuration_reference.md
index da98639..5b962a8 100644
--- a/doc/configuration_reference.md
+++ b/doc/configuration_reference.md
@@ -11,6 +11,8 @@ prooph_service_bus:
# Service ID of the message factory
message_factory: 'prooph_service_bus.message_factory'
router:
+ # Service ID of the async message producer e.g. for Amazon AWS SQS
+ async_switch: 'my_async_message_producer'
# Service ID of the router
type: 'prooph_service_bus.command_bus_router'
# Routing definition constructed as
@@ -28,6 +30,8 @@ prooph_service_bus:
# Service ID of the message factory
message_factory: 'prooph_service_bus.message_factory'
router:
+ # Service ID of the async message producer e.g. for Amazon AWS SQS
+ async_switch: 'my_async_message_producer'
# Service ID of the router
type: 'prooph_service_bus.event_bus_router'
# Routing definition constructed as
@@ -46,6 +50,8 @@ prooph_service_bus:
# Service ID of the message factory
message_factory: 'prooph_service_bus.message_factory'
router:
+ # Service ID of the async message producer e.g. for Amazon AWS SQS
+ async_switch: 'my_async_message_producer'
# Service ID of the router
type: 'prooph_service_bus.query_bus_router'
# Routing definition constructed as
diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php
index 5b2122e..34c149f 100644
--- a/src/DependencyInjection/Configuration.php
+++ b/src/DependencyInjection/Configuration.php
@@ -155,6 +155,16 @@ private function addServiceBusSection(string $type, ArrayNodeDefinition $node)
->end()
->defaultValue('prooph_service_bus.' . $type . '_bus_router')
->end()
+ ->scalarNode('async_switch')
+ ->beforeNormalization()
+ ->ifTrue(function ($v) {
+ return strpos($v, '@') === 0;
+ })
+ ->then(function ($v) {
+ return substr($v, 1);
+ })
+ ->end()
+ ->end()
->append($routesNode)
->end()
->end()
diff --git a/src/DependencyInjection/ProophServiceBusExtension.php b/src/DependencyInjection/ProophServiceBusExtension.php
index 3392ac2..92c4a92 100644
--- a/src/DependencyInjection/ProophServiceBusExtension.php
+++ b/src/DependencyInjection/ProophServiceBusExtension.php
@@ -153,11 +153,34 @@ private function loadBus(string $type, string $name, array $options, ContainerBu
if (!empty($options['router'])) {
$routerId = sprintf('prooph_service_bus.%s.router', $name);
- $routerDefinition = $container->setDefinition(
- $routerId,
- new DefinitionDecorator($options['router']['type'])
- );
- $routerDefinition->setArguments([$options['router']['routes'] ?? []]);
+ if (isset($options['router']['async_switch'])) {
+ $decoratedRouterId = sprintf('prooph_service_bus.%s.decorated_router', $name);
+
+ $routerDefinition = $container->setDefinition(
+ $decoratedRouterId,
+ new DefinitionDecorator($options['router']['type'])
+ );
+ $routerDefinition->setArguments([$options['router']['routes'] ?? []]);
+
+ $container->setDefinition($decoratedRouterId, $routerDefinition);
+
+
+ // replace router definition with async switch message router
+ $routerDefinition = new DefinitionDecorator('prooph_service_bus.async_switch_message_router');
+ $routerDefinition->setArguments([
+ new Reference($decoratedRouterId),
+ new Reference($options['router']['async_switch']),
+ ]);
+ $routerDefinition->setPublic(true);
+ $container->setDefinition($routerId, $routerDefinition);
+ } else {
+ $routerDefinition = $container->setDefinition(
+ $routerId,
+ new DefinitionDecorator($options['router']['type'])
+ );
+ $routerDefinition->setArguments([$options['router']['routes'] ?? []]);
+ }
+
$serviceBusDefinition->addMethodCall('utilize', [new Reference($routerId)]);
}
diff --git a/src/MessageBusFactory.php b/src/MessageBusFactory.php
new file mode 100644
index 0000000..8ed843b
--- /dev/null
+++ b/src/MessageBusFactory.php
@@ -0,0 +1,31 @@
+get($pluginId);
+ $plugin->attachToMessageBus($bus);
+ }
+
+ return $bus;
+ }
+}
diff --git a/src/Resources/config/service_bus.xml b/src/Resources/config/service_bus.xml
index b14d489..1a47a66 100644
--- a/src/Resources/config/service_bus.xml
+++ b/src/Resources/config/service_bus.xml
@@ -23,5 +23,6 @@
+
diff --git a/test/BundleTest.php b/test/BundleTest.php
index 8150fbe..f0e9822 100644
--- a/test/BundleTest.php
+++ b/test/BundleTest.php
@@ -12,7 +12,10 @@
namespace ProophTest\Bundle\ServiceBus;
use PHPUnit_Framework_TestCase as TestCase;
+use Prooph\Bundle\ServiceBus\DependencyInjection\Compiler\PluginsPass;
+use Prooph\Bundle\ServiceBus\DependencyInjection\Compiler\RoutePass;
use Prooph\Bundle\ServiceBus\ProophServiceBusBundle;
+use Symfony\Component\DependencyInjection\Compiler\PassConfig;
use Symfony\Component\DependencyInjection\ContainerBuilder;
class BundleTest extends TestCase
@@ -29,6 +32,23 @@ public function it_builds_compiler_pass()
$config = $container->getCompilerPassConfig();
$passes = $config->getBeforeOptimizationPasses();
- // TODO assert something
+ $this->assertInstanceOf(PassConfig::class, $config);
+
+ $hasPluginPass = false;
+ $hasRoutePass = false;
+
+ foreach ($passes as $pass) {
+ if ($pass instanceof PluginsPass) {
+ $hasPluginPass = true;
+ continue;
+ }
+ if ($pass instanceof RoutePass) {
+ $hasRoutePass = true;
+ continue;
+ }
+ }
+
+ $this->assertTrue($hasPluginPass, 'No plugin pass configured');
+ $this->assertTrue($hasRoutePass, 'No route pass configured');
}
}
diff --git a/test/DependencyInjection/AbstractServiceBusExtensionTestCase.php b/test/DependencyInjection/AbstractServiceBusExtensionTestCase.php
index 023ecb4..d9711de 100644
--- a/test/DependencyInjection/AbstractServiceBusExtensionTestCase.php
+++ b/test/DependencyInjection/AbstractServiceBusExtensionTestCase.php
@@ -19,6 +19,7 @@
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Exception\CommandDispatchException;
use Prooph\ServiceBus\Exception\MessageDispatchException;
+use Prooph\ServiceBus\Plugin\Router\AsyncSwitchMessageRouter;
use Prooph\ServiceBus\Plugin\Router\CommandRouter;
use Prooph\ServiceBus\Plugin\Router\EventRouter;
use Prooph\ServiceBus\Plugin\Router\QueryRouter;
@@ -384,6 +385,27 @@ public function it_adds_event_bus_routes_based_on_tags()
self::assertSame($event, $mockListener->lastEvent());
}
+ /**
+ * @test
+ */
+ public function it_creates_a_command_bus_with_async_switch_message_router()
+ {
+ $container = $this->loadContainer('command_bus_async');
+
+ $config = $container->getDefinition('prooph_service_bus.command_bus_async');
+
+ self::assertEquals(CommandBus::class, $config->getClass());
+
+ /* @var $commandBus CommandBus */
+ $commandBus = $container->get('prooph_service_bus.command_bus_async');
+
+ self::assertInstanceOf(CommandBus::class, $commandBus);
+
+ $router = $container->get('prooph_service_bus.command_bus_async.router');
+
+ self::assertInstanceOf(AsyncSwitchMessageRouter::class, $router);
+ }
+
private function loadContainer($fixture, CompilerPassInterface $compilerPass = null)
{
$container = $this->getContainer();
diff --git a/test/DependencyInjection/Fixture/Model/AsyncMessageProducer.php b/test/DependencyInjection/Fixture/Model/AsyncMessageProducer.php
new file mode 100644
index 0000000..034a6c4
--- /dev/null
+++ b/test/DependencyInjection/Fixture/Model/AsyncMessageProducer.php
@@ -0,0 +1,16 @@
+
+
+
+
+
+
+ prooph_service_bus.handle_command_invoke_strategy
+
+ Acme\RegisterUserHandler
+
+
+
+
+
+
+
+
+
diff --git a/test/DependencyInjection/Fixture/config/yml/command_bus_async.yml b/test/DependencyInjection/Fixture/config/yml/command_bus_async.yml
new file mode 100644
index 0000000..ca211be
--- /dev/null
+++ b/test/DependencyInjection/Fixture/config/yml/command_bus_async.yml
@@ -0,0 +1,13 @@
+prooph_service_bus:
+ command_buses:
+ command_bus_async:
+ router:
+ async_switch: 'async_message_producer'
+ routes:
+ 'Acme\RegisterUser': 'Acme\RegisterUserHandler'
+
+services:
+ 'Acme\RegisterUserHandler':
+ class: ProophTest\Bundle\ServiceBus\DependencyInjection\Fixture\Model\AcmeRegisterUserHandler
+ 'async_message_producer':
+ class: ProophTest\Bundle\ServiceBus\DependencyInjection\Fixture\Model\AsyncMessageProducer