diff --git a/Console/Command/FeedBufferCommand.php b/Console/Command/FeedBufferCommand.php
new file mode 100644
index 0000000..d2e5edc
--- /dev/null
+++ b/Console/Command/FeedBufferCommand.php
@@ -0,0 +1,80 @@
+
+ * @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
+ */
+
+namespace LizardMedia\RabbitMqPlayground\Console\Command;
+
+use LizardMedia\RabbitMqPlayground\Model\Data\Entity;
+use Magento\Framework\Console\Cli;
+use Magento\Framework\MessageQueue\PublisherInterface;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
+/**
+ * Class MigrationCommand
+ * @package LizardMedia\ReviewsDataMigration\Console\Command
+ * @codeCoverageIgnore
+ */
+class FeedBufferCommand extends Command
+{
+ /**
+ * @var string
+ */
+ private const TOPIC = 'entity.confirm';
+
+ /**
+ * @var string
+ */
+ private const COMMAND_NAME = 'feed:buffer';
+
+ /**
+ * @var string
+ */
+ private const COMMAND_DESC = 'Feed buffer';
+
+ /**
+ * @var PublisherInterface
+ */
+ private PublisherInterface $publisher;
+
+ /**
+ * FeedBufferCommand constructor.
+ * @param PublisherInterface $publisher
+ * @param string|null $name
+ */
+ public function __construct(PublisherInterface $publisher, string $name = null)
+ {
+ parent::__construct($name);
+ $this->publisher = $publisher;
+ }
+
+ /**
+ * @return void
+ */
+ protected function configure(): void
+ {
+ parent::configure();
+ $this->setName(self::COMMAND_NAME)
+ ->setDescription(self::COMMAND_DESC);
+ }
+
+ /**
+ * @param InputInterface $input
+ * @param OutputInterface $output
+ * @return int
+ */
+ protected function execute(InputInterface $input, OutputInterface $output): int
+ {
+ $entity = new Entity(rand(1, 100));
+ $this->publisher->publish(self::TOPIC, $entity);
+ return Cli::RETURN_SUCCESS;
+ }
+}
diff --git a/Console/Command/FeedCommand.php b/Console/Command/FeedCommand.php
new file mode 100644
index 0000000..78ba7ec
--- /dev/null
+++ b/Console/Command/FeedCommand.php
@@ -0,0 +1,80 @@
+
+ * @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
+ */
+
+namespace LizardMedia\RabbitMqPlayground\Console\Command;
+
+use LizardMedia\RabbitMqPlayground\Model\Data\Entity;
+use Magento\Framework\Console\Cli;
+use Magento\Framework\MessageQueue\PublisherInterface;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
+/**
+ * Class MigrationCommand
+ * @package LizardMedia\ReviewsDataMigration\Console\Command
+ * @codeCoverageIgnore
+ */
+class FeedCommand extends Command
+{
+ /**
+ * @var string
+ */
+ private const TOPIC = 'entity.create';
+
+ /**
+ * @var string
+ */
+ private const COMMAND_NAME = 'feed';
+
+ /**
+ * @var string
+ */
+ private const COMMAND_DESC = 'Feed';
+
+ /**
+ * @var PublisherInterface
+ */
+ private PublisherInterface $publisher;
+
+ /**
+ * FeedBufferCommand constructor.
+ * @param PublisherInterface $publisher
+ * @param string|null $name
+ */
+ public function __construct(PublisherInterface $publisher, string $name = null)
+ {
+ parent::__construct($name);
+ $this->publisher = $publisher;
+ }
+
+ /**
+ * @return void
+ */
+ protected function configure(): void
+ {
+ parent::configure();
+ $this->setName(self::COMMAND_NAME)
+ ->setDescription(self::COMMAND_DESC);
+ }
+
+ /**
+ * @param InputInterface $input
+ * @param OutputInterface $output
+ * @return int
+ */
+ protected function execute(InputInterface $input, OutputInterface $output): int
+ {
+ $entity = new Entity(rand(1, 100));
+ $this->publisher->publish(self::TOPIC, $entity);
+ return Cli::RETURN_SUCCESS;
+ }
+}
diff --git a/Console/Command/MassFeedCommand.php b/Console/Command/MassFeedCommand.php
new file mode 100644
index 0000000..638de07
--- /dev/null
+++ b/Console/Command/MassFeedCommand.php
@@ -0,0 +1,88 @@
+
+ * @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
+ */
+
+namespace LizardMedia\RabbitMqPlayground\Console\Command;
+
+use LizardMedia\RabbitMqPlayground\Model\Data\Entity;
+use Magento\Framework\Console\Cli;
+use Magento\Framework\MessageQueue\PublisherInterface;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
+/**
+ * Class MigrationCommand
+ * @package LizardMedia\ReviewsDataMigration\Console\Command
+ * @codeCoverageIgnore
+ */
+class MassFeedCommand extends Command
+{
+ /**
+ * @var string
+ */
+ private const TOPIC_CREATE = 'entity.create';
+
+ /**
+ * @var string
+ */
+ private const TOPIC_CONFIRM = 'entity.confirm';
+
+ /**
+ * @var string
+ */
+ private const COMMAND_NAME = 'mass-feed';
+
+ /**
+ * @var string
+ */
+ private const COMMAND_DESC = 'Feed';
+
+ /**
+ * @var PublisherInterface
+ */
+ private PublisherInterface $publisher;
+
+ /**
+ * FeedBufferCommand constructor.
+ * @param PublisherInterface $publisher
+ * @param string|null $name
+ */
+ public function __construct(PublisherInterface $publisher, string $name = null)
+ {
+ parent::__construct($name);
+ $this->publisher = $publisher;
+ }
+
+ /**
+ * @return void
+ */
+ protected function configure(): void
+ {
+ parent::configure();
+ $this->setName(self::COMMAND_NAME)
+ ->setDescription(self::COMMAND_DESC);
+ }
+
+ /**
+ * @param InputInterface $input
+ * @param OutputInterface $output
+ * @return int
+ */
+ protected function execute(InputInterface $input, OutputInterface $output): int
+ {
+ for ($i = 0; $i <= 10000; $i++) {
+ $entity = new Entity($i);
+ $this->publisher->publish(self::TOPIC_CREATE, $entity);
+ $this->publisher->publish(self::TOPIC_CONFIRM, $entity);
+ }
+ return Cli::RETURN_SUCCESS;
+ }
+}
diff --git a/Model/Data/Entity.php b/Model/Data/Entity.php
new file mode 100644
index 0000000..56aa54b
--- /dev/null
+++ b/Model/Data/Entity.php
@@ -0,0 +1,52 @@
+
+ * @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
+ */
+
+namespace LizardMedia\RabbitMqPlayground\Model\Data;
+
+/**
+ * Class Entity
+ * @package LizardMedia\Model\Data\RabbitMqPlayground
+ * @codeCoverageIgnore
+ */
+class Entity
+{
+ /**
+ * @var int
+ */
+ private int $id;
+
+ /**
+ * Entity constructor.
+ * @param int $id
+ */
+ public function __construct(int $id)
+ {
+ $this->id = $id;
+ }
+
+ /**
+ * @return int
+ */
+ public function getId(): int
+ {
+ return $this->id;
+ }
+
+ /**
+ * @param int $id
+ * @return void
+ * @SuppressWarnings(PHPMD.ShortVariable)
+ */
+ public function setId(int $id): void
+ {
+ $this->id = $id;
+ }
+}
diff --git a/Queue/Consumer/RetryConsumerHandler.php b/Queue/Consumer/RetryConsumerHandler.php
new file mode 100644
index 0000000..415f798
--- /dev/null
+++ b/Queue/Consumer/RetryConsumerHandler.php
@@ -0,0 +1,41 @@
+
+ * @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
+ */
+
+namespace LizardMedia\RabbitMqPlayground\Queue\Consumer;
+
+use LizardMedia\MessageQueue\Api\Queue\Consumer\EnvelopeCallbackFactoryInterface;
+use LizardMedia\MessageQueue\Queue\Consumer\ConsumerWithInjectableEnvelopeCallback;
+use Magento\Framework\MessageQueue\CallbackInvokerInterface;
+use Magento\Framework\MessageQueue\ConsumerConfigurationInterface as UsedConsumerConfig;
+
+/**
+ * Class RetryConsumerHandler
+ * @package LizardMedia\RabbitMqPlayground\Queue\Consumer
+ * @codeCoverageIgnore
+ */
+class RetryConsumerHandler extends ConsumerWithInjectableEnvelopeCallback
+{
+ /**
+ * RetryConsumerHandler constructor.
+ * @param EnvelopeCallbackFactoryInterface $envelopeCallbackFactory
+ * @param CallbackInvokerInterface $invoker
+ * @param UsedConsumerConfig $configuration
+ * @param string $envelopeCallbackType
+ */
+ public function __construct(
+ EnvelopeCallbackFactoryInterface $envelopeCallbackFactory,
+ CallbackInvokerInterface $invoker,
+ UsedConsumerConfig $configuration,
+ string $envelopeCallbackType
+ ) {
+ parent::__construct($envelopeCallbackFactory, $invoker, $configuration, $envelopeCallbackType);
+ }
+}
diff --git a/Queue/ConsumerHandler/Entity/Cancellation.php b/Queue/ConsumerHandler/Entity/Cancellation.php
new file mode 100644
index 0000000..a8a8d75
--- /dev/null
+++ b/Queue/ConsumerHandler/Entity/Cancellation.php
@@ -0,0 +1,39 @@
+
+ * @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
+ */
+
+namespace LizardMedia\RabbitMqPlayground\Queue\ConsumerHandler\Entity;
+
+use LizardMedia\RabbitMqPlayground\Model\Data\Entity;
+use RuntimeException;
+
+/**
+ * Class Creation
+ * @package LizardMedia\RabbitMqPlayground\Queue\ConsumerHandler\Entity
+ * @codeCoverageIgnore
+ */
+class Cancellation
+{
+ /**
+ * @param Entity $entity
+ * @return void
+ */
+ public function execute(Entity $entity): void
+ {
+ $id = $entity->getId();
+ sleep(10);
+ if ($id > 50) {
+ echo $entity->getId(), "\n";
+ } else {
+ echo $entity->getId(), "\n", 'Error cancellation...', "\n";
+ throw new RuntimeException();
+ }
+ }
+}
diff --git a/Queue/ConsumerHandler/Entity/Confirmation.php b/Queue/ConsumerHandler/Entity/Confirmation.php
new file mode 100644
index 0000000..ed4fbb6
--- /dev/null
+++ b/Queue/ConsumerHandler/Entity/Confirmation.php
@@ -0,0 +1,39 @@
+
+ * @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
+ */
+
+namespace LizardMedia\RabbitMqPlayground\Queue\ConsumerHandler\Entity;
+
+use LizardMedia\RabbitMqPlayground\Model\Data\Entity;
+use RuntimeException;
+
+/**
+ * Class Creation
+ * @package LizardMedia\RabbitMqPlayground\Queue\ConsumerHandler\Entity
+ * @codeCoverageIgnore
+ */
+class Confirmation
+{
+ /**
+ * @param Entity $entity
+ * @return void
+ */
+ public function execute(Entity $entity): void
+ {
+ $id = $entity->getId();
+ sleep(10);
+ if ($id < 5000) {
+ echo $entity->getId(), "\n";
+ } else {
+ echo $entity->getId(), "\n", 'Error confirmation...', "\n";
+ throw new RuntimeException();
+ }
+ }
+}
diff --git a/Queue/ConsumerHandler/Entity/Creation.php b/Queue/ConsumerHandler/Entity/Creation.php
new file mode 100644
index 0000000..b3e4372
--- /dev/null
+++ b/Queue/ConsumerHandler/Entity/Creation.php
@@ -0,0 +1,39 @@
+
+ * @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
+ */
+
+namespace LizardMedia\RabbitMqPlayground\Queue\ConsumerHandler\Entity;
+
+use LizardMedia\RabbitMqPlayground\Model\Data\Entity;
+use RuntimeException;
+
+/**
+ * Class Creation
+ * @package LizardMedia\RabbitMqPlayground\Queue\ConsumerHandler\Entity
+ * @codeCoverageIgnore
+ */
+class Creation
+{
+ /**
+ * @param Entity $entity
+ * @return void
+ */
+ public function execute(Entity $entity): void
+ {
+ $id = $entity->getId();
+ sleep(10);
+ if ($id < 5000) {
+ echo $entity->getId(), "\n";
+ } else {
+ echo $entity->getId(), "\n", 'Error creation...', "\n";
+ throw new RuntimeException();
+ }
+ }
+}
diff --git a/Queue/ConsumerHandler/Entity/Failure.php b/Queue/ConsumerHandler/Entity/Failure.php
new file mode 100644
index 0000000..754c928
--- /dev/null
+++ b/Queue/ConsumerHandler/Entity/Failure.php
@@ -0,0 +1,50 @@
+
+ * @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
+ */
+
+namespace LizardMedia\RabbitMqPlayground\Queue\ConsumerHandler\Entity;
+
+use LizardMedia\RabbitMqPlayground\Model\Data\Entity;
+use Psr\Log\LoggerInterface;
+use RuntimeException;
+
+/**
+ * Class Failure
+ * @package LizardMedia\RabbitMqPlayground\Queue\ConsumerHandler\Entity
+ * @codeCoverageIgnore
+ */
+class Failure
+{
+ /**
+ * @var LoggerInterface
+ */
+ private LoggerInterface $logger;
+
+ /**
+ * Failure constructor.
+ * @param LoggerInterface $logger
+ */
+ public function __construct(LoggerInterface $logger)
+ {
+ $this->logger = $logger;
+ }
+
+ /**
+ * @param Entity $entity
+ * @return void
+ */
+ public function execute(Entity $entity): void
+ {
+ sleep(10);
+ echo $entity->getId(), "\n", 'Error...', "\n";
+ $this->logger->critical('iteration', [$entity->getId()]);
+ throw new RuntimeException();
+ }
+}
diff --git a/Queue/ConsumerHandler/Entity/Success.php b/Queue/ConsumerHandler/Entity/Success.php
new file mode 100644
index 0000000..9aeff22
--- /dev/null
+++ b/Queue/ConsumerHandler/Entity/Success.php
@@ -0,0 +1,33 @@
+
+ * @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
+ */
+
+namespace LizardMedia\RabbitMqPlayground\Queue\ConsumerHandler\Entity;
+
+use LizardMedia\RabbitMqPlayground\Model\Data\Entity;
+use RuntimeException;
+
+/**
+ * Class Success
+ * @package LizardMedia\RabbitMqPlayground\Queue\ConsumerHandler\Entity
+ * @codeCoverageIgnore
+ */
+class Success
+{
+ /**
+ * @param Entity $entity
+ * @return void
+ */
+ public function execute(Entity $entity): void
+ {
+ sleep(10);
+ echo $entity->getId(), "\n";
+ }
+}
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..a47a480
--- /dev/null
+++ b/README.md
@@ -0,0 +1,70 @@
+# Lizard Media RabbitMqPlayground #
+
+A module shows how more advanced topologies could be built, using DLX and message-ttl.
+Module shows example of topology with error fallback and retry mechanism. Additionally for entity.confirm and entity.cancel
+topics consumption of messages are delayed. In order to make it work, please apply our patches for magento modules and libraries.
+
+## Getting Started
+
+These instructions will get you a copy of the project up and running on your local machine for development and testing purposes.
+
+### Prerequisites
+
+* Magento 2.3/2.4
+* PHP 7.3/7.4
+* RabbitMQ 3.8.*
+* Apply [our patches](https://github.com/lizardmedia/magento2-mq-patches) for Magento Message Queue features.
+
+### Installing
+
+#### Download the module
+
+##### Using composer (suggested)
+
+Simply run
+
+```
+composer require lizardmedia/module-rabbitmq-playground
+```
+
+##### Downloading ZIP
+
+Download a ZIP version of the module and unpack it into your project into
+```
+app/code/LizardMedia/RabbitmqPlayground
+```
+If you use ZIP file you will need to install all dependencies of the module
+manually
+
+
+#### Install the module
+
+Run this command
+```
+bin/magento module:enable LizardMedia_RabbitmqPlayground
+bin/magento setup:upgrade
+```
+
+## Usage
+
+Just install module and investigate topology created. Play around by
+publishing messages (take a look at console commands) and observe how messages are handled. Every consumer handler
+has a sleep function inside to make sure that message processing is visible in rabbitmq admin panel
+
+## Contributing
+
+Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details on our code of conduct, and the process for submitting pull requests to us.
+
+## Versioning
+
+We use [SemVer](http://semver.org/) for versioning. For the versions available, see the [tags on this repository](https://github.com/lizardmedia/varnish-warmer-magento2/tags).
+
+## Authors
+
+* **Bartosz Kubicki** - *Initial work, fixes & maintenance* - [Lizard Media](https://github.com/bartoszkubicki)
+
+See also the list of [contributors](https://github.com/lizardmedia/rabbitmq-playground/contributors) who participated in this project.
+
+## License
+
+This project is licensed under the MIT License - see the [LICENSE.md](LICENSE.md) file for details
diff --git a/composer.json b/composer.json
new file mode 100644
index 0000000..5e11f48
--- /dev/null
+++ b/composer.json
@@ -0,0 +1,20 @@
+{
+ "name": "lizardmedia/module-rabbitmq-playground",
+ "description": "Module testing rabbitmq implemention",
+ "require": {
+ "php": "~7.3.*||~7.4.*",
+ "lizardmedia/module-message-queue": "1.0.*",
+ "magento/framework-message-queue": "100.4.*",
+ "magento/framework": "103.0.*"
+ },
+ "type": "magento2-module",
+ "version": "1.0.0",
+ "autoload": {
+ "files": [
+ "registration.php"
+ ],
+ "psr-4": {
+ "LizardMedia\\RabbitMqPlayground\\": ""
+ }
+ }
+}
diff --git a/etc/communication.xml b/etc/communication.xml
new file mode 100644
index 0000000..b90a79c
--- /dev/null
+++ b/etc/communication.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
diff --git a/etc/di.xml b/etc/di.xml
new file mode 100644
index 0000000..eefc5bb
--- /dev/null
+++ b/etc/di.xml
@@ -0,0 +1,29 @@
+
+
+
+
+
+
+
+ - LizardMedia\RabbitMqPlayground\Console\Command\FeedBufferCommand
+ - LizardMedia\RabbitMqPlayground\Console\Command\FeedCommand
+ - LizardMedia\RabbitMqPlayground\Console\Command\MassFeedCommand
+
+
+
+
+
+
+
+
+ LizardMedia\MessageQueue\Queue\Consumer\EnvelopeCallback\RetryLimit
+
+
+
+
diff --git a/etc/module.xml b/etc/module.xml
new file mode 100644
index 0000000..a6281c0
--- /dev/null
+++ b/etc/module.xml
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
+
diff --git a/etc/queue_consumer.xml b/etc/queue_consumer.xml
new file mode 100644
index 0000000..7751348
--- /dev/null
+++ b/etc/queue_consumer.xml
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+
diff --git a/etc/queue_publisher.xml b/etc/queue_publisher.xml
new file mode 100644
index 0000000..4ac655a
--- /dev/null
+++ b/etc/queue_publisher.xml
@@ -0,0 +1,18 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/etc/queue_topology.xml b/etc/queue_topology.xml
new file mode 100644
index 0000000..2e5a1fd
--- /dev/null
+++ b/etc/queue_topology.xml
@@ -0,0 +1,70 @@
+
+
+
+
+
+
+
+ entity
+ 25000
+
+
+
+
+
+
+
+
+
+ entity.dead_letter
+
+
+
+
+ entity.dead_letter
+
+
+
+
+ entity.dead_letter
+
+
+
+
+
+
+
+
+
+ entity.retry
+ 15000
+
+
+
+
+
+
+
+
+
+ entity.dead_letter
+
+
+
+
+ entity.dead_letter
+
+
+
+
+ entity.dead_letter
+
+
+
+
+
diff --git a/registration.php b/registration.php
new file mode 100644
index 0000000..c9b64ce
--- /dev/null
+++ b/registration.php
@@ -0,0 +1,16 @@
+
+ * @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
+ */
+
+use Magento\Framework\Component\ComponentRegistrar;
+
+ComponentRegistrar::register(
+ ComponentRegistrar::MODULE,
+ 'LizardMedia_RabbitMqPlayground',
+ __DIR__
+);