From 72e2ef5edf46a17e44b05d2fe950bfc788980516 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Mon, 26 Aug 2024 22:19:08 +0200 Subject: [PATCH 01/10] Self healing on acknowledge failures --- .../src/AmqpAcknowledgeCallbackWraper.php | 62 +++++++++++++++++++ .../Amqp/src/AmqpInboundChannelAdapter.php | 17 ++++- .../src/AmqpInboundChannelAdapterBuilder.php | 2 + .../AmqpReconnectableConnectionFactory.php | 2 +- packages/Amqp/tests/AmqpMessagingTest.php | 14 +++-- .../Receiver/Event/TicketCreated.php | 17 +++++ .../Receiver/TicketServiceReceiver.php | 25 ++++++++ .../TicketNotificationConfig.php | 17 +++++ .../TicketNotificationEventHandler.php | 40 ++++++++++++ .../Integration/DistributedCommandBusTest.php | 39 +++++++++++- .../Enqueue/src/CachedConnectionFactory.php | 5 ++ 11 files changed, 230 insertions(+), 10 deletions(-) create mode 100644 packages/Amqp/src/AmqpAcknowledgeCallbackWraper.php create mode 100644 packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/Event/TicketCreated.php create mode 100644 packages/Amqp/tests/Fixture/DistributedCommandBus/ReceiverEventHandler/TicketNotificationConfig.php create mode 100644 packages/Amqp/tests/Fixture/DistributedCommandBus/ReceiverEventHandler/TicketNotificationEventHandler.php diff --git a/packages/Amqp/src/AmqpAcknowledgeCallbackWraper.php b/packages/Amqp/src/AmqpAcknowledgeCallbackWraper.php new file mode 100644 index 000000000..d50e3d06d --- /dev/null +++ b/packages/Amqp/src/AmqpAcknowledgeCallbackWraper.php @@ -0,0 +1,62 @@ +acknowledgementCallback->isAutoAck(); + } + + public function disableAutoAck(): void + { + $this->acknowledgementCallback->disableAutoAck(); + } + + public function accept(): void + { + try { + $this->acknowledgementCallback->accept(); + }catch (\Exception $exception) { + $this->loggingGateway->info("Failed to acknowledge message, disconnecting AMQP Connection in order to self-heal. Failure happen due to: " . $exception->getMessage(), exception: $exception); + + $this->connectionFactory->reconnect(); + } + } + + public function reject(): void + { + try { + $this->acknowledgementCallback->reject(); + }catch (\Exception $exception) { + $this->loggingGateway->info("Failed to reject message, disconnecting AMQP Connection in order to self-heal. Failure happen due to: " . $exception->getMessage(), exception: $exception); + + $this->connectionFactory->reconnect(); + } + } + + public function requeue(): void + { + try { + $this->acknowledgementCallback->requeue(); + }catch (\Exception $exception) { + $this->loggingGateway->info("Failed to requeue message, disconnecting AMQP Connection in order to self-heal. Failure happen due to: " . $exception->getMessage(), exception: $exception); + + $this->connectionFactory->reconnect(); + } + } +} \ No newline at end of file diff --git a/packages/Amqp/src/AmqpInboundChannelAdapter.php b/packages/Amqp/src/AmqpInboundChannelAdapter.php index 7e3b222ec..c17cb79d3 100644 --- a/packages/Amqp/src/AmqpInboundChannelAdapter.php +++ b/packages/Amqp/src/AmqpInboundChannelAdapter.php @@ -6,13 +6,16 @@ use AMQPConnectionException; use Ecotone\Enqueue\CachedConnectionFactory; +use Ecotone\Enqueue\EnqueueHeader; use Ecotone\Enqueue\EnqueueInboundChannelAdapter; use Ecotone\Enqueue\InboundMessageConverter; use Ecotone\Messaging\Channel\QueueChannel; use Ecotone\Messaging\Conversion\ConversionService; use Ecotone\Messaging\Conversion\MediaType; use Ecotone\Messaging\Endpoint\PollingConsumer\ConnectionException; +use Ecotone\Messaging\Handler\Logger\LoggingGateway; use Ecotone\Messaging\Message; +use Ecotone\Messaging\MessageHeaders; use Ecotone\Messaging\Support\MessageBuilder; use Interop\Amqp\AmqpMessage; use Interop\Queue\Consumer; @@ -32,13 +35,14 @@ class AmqpInboundChannelAdapter extends EnqueueInboundChannelAdapter private QueueChannel $queueChannel; public function __construct( - CachedConnectionFactory $cachedConnectionFactory, + private CachedConnectionFactory $cachedConnectionFactory, private AmqpAdmin $amqpAdmin, bool $declareOnStartup, string $queueName, int $receiveTimeoutInMilliseconds, InboundMessageConverter $inboundMessageConverter, - ConversionService $conversionService + ConversionService $conversionService, + private LoggingGateway $loggingGateway, ) { parent::__construct( $cachedConnectionFactory, @@ -65,7 +69,14 @@ public function enrichMessage(EnqueueMessage $sourceMessage, MessageBuilder $tar $targetMessage = $targetMessage->setContentType(MediaType::parseMediaType($sourceMessage->getContentType())); } - return $targetMessage; + return $targetMessage->setHeader( + EnqueueHeader::HEADER_ACKNOWLEDGE, + new AmqpAcknowledgeCallbackWraper( + $targetMessage->getHeaderWithName(EnqueueHeader::HEADER_ACKNOWLEDGE), + $this->cachedConnectionFactory, + $this->loggingGateway, + ) + ); } public function receiveMessage(int $timeout = 0): ?Message diff --git a/packages/Amqp/src/AmqpInboundChannelAdapterBuilder.php b/packages/Amqp/src/AmqpInboundChannelAdapterBuilder.php index 6bb492887..e03727dfc 100644 --- a/packages/Amqp/src/AmqpInboundChannelAdapterBuilder.php +++ b/packages/Amqp/src/AmqpInboundChannelAdapterBuilder.php @@ -12,6 +12,7 @@ use Ecotone\Messaging\Config\Container\MessagingContainerBuilder; use Ecotone\Messaging\Config\Container\Reference; use Ecotone\Messaging\Conversion\ConversionService; +use Ecotone\Messaging\Handler\Logger\LoggingGateway; use Ecotone\Messaging\MessageConverter\DefaultHeaderMapper; use Enqueue\AmqpExt\AmqpConnectionFactory; use Ramsey\Uuid\Uuid; @@ -59,6 +60,7 @@ public function compile(MessagingContainerBuilder $builder): Definition $this->receiveTimeoutInMilliseconds, $inboundMessageConverter, new Reference(ConversionService::REFERENCE_NAME), + new Reference(LoggingGateway::class), ]); } } diff --git a/packages/Amqp/src/AmqpReconnectableConnectionFactory.php b/packages/Amqp/src/AmqpReconnectableConnectionFactory.php index 9d3ba4366..e6be68a23 100644 --- a/packages/Amqp/src/AmqpReconnectableConnectionFactory.php +++ b/packages/Amqp/src/AmqpReconnectableConnectionFactory.php @@ -63,7 +63,7 @@ public function isDisconnected(?Context $context): bool public function reconnect(): void { $connectionProperty = $this->getConnectionProperty(); - + /** @var AMQPConnection $connection */ $connection = $connectionProperty->getValue($this->connectionFactory); if ($connection) { diff --git a/packages/Amqp/tests/AmqpMessagingTest.php b/packages/Amqp/tests/AmqpMessagingTest.php index 156a563e5..c8e9f9732 100644 --- a/packages/Amqp/tests/AmqpMessagingTest.php +++ b/packages/Amqp/tests/AmqpMessagingTest.php @@ -27,17 +27,22 @@ abstract class AmqpMessagingTest extends TestCase /** * @return AmqpConnectionFactory */ - public function getCachedConnectionFactory(): AmqpConnectionFactory + public function getCachedConnectionFactory(array $config = []): AmqpConnectionFactory { - return self::getRabbitConnectionFactory(); + return self::getRabbitConnectionFactory($config); } /** * @return AmqpConnectionFactory */ - public static function getRabbitConnectionFactory(): AmqpConnectionFactory + public static function getRabbitConnectionFactory(array $config = []): AmqpConnectionFactory { - return new AmqpLibConnection(['dsn' => getenv('RABBIT_HOST') ? getenv('RABBIT_HOST') : 'amqp://guest:guest@localhost:5672/%2f']); + return new AmqpLibConnection( + array_merge( + ['dsn' => getenv('RABBIT_HOST') ? getenv('RABBIT_HOST') : 'amqp://guest:guest@localhost:5672/%2f'], + $config, + ) + ); } public function setUp(): void @@ -58,6 +63,7 @@ public function queueCleanUp(): void $this->deleteQueue(new AmqpQueue('distributed_ticket_service')); $this->deleteQueue(new AmqpQueue(AmqpDistributionModule::CHANNEL_PREFIX . TicketServiceMessagingConfiguration::SERVICE_NAME)); $this->deleteQueue(new AmqpQueue('ecotone_1_delay')); + $this->deleteQueue(new AmqpQueue('async')); } private function deleteQueue(AmqpQueue $queue): void diff --git a/packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/Event/TicketCreated.php b/packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/Event/TicketCreated.php new file mode 100644 index 000000000..986fb4012 --- /dev/null +++ b/packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/Event/TicketCreated.php @@ -0,0 +1,17 @@ +ticket; + } +} \ No newline at end of file diff --git a/packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/TicketServiceReceiver.php b/packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/TicketServiceReceiver.php index c93cdb08c..cd9694aec 100644 --- a/packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/TicketServiceReceiver.php +++ b/packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/TicketServiceReceiver.php @@ -2,9 +2,14 @@ namespace Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver; +use Ecotone\Messaging\Attribute\Asynchronous; +use Ecotone\Messaging\Attribute\Parameter\Header; use Ecotone\Modelling\Attribute\CommandHandler; use Ecotone\Modelling\Attribute\Distributed; +use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Modelling\Attribute\QueryHandler; +use Ecotone\Modelling\EventBus; +use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver\Event\TicketCreated; /** * licence Apache-2.0 @@ -12,10 +17,16 @@ class TicketServiceReceiver { public const CREATE_TICKET_ENDPOINT = 'createTicket'; + public const CREATE_TICKET_WITH_EVENT_ENDPOINT = 'createTicketWithEvent'; public const GET_TICKETS_COUNT = 'getTicketsCount'; private array $tickets = []; + public function __construct(private array $delays = []) + { + + } + #[Distributed] #[CommandHandler(self::CREATE_TICKET_ENDPOINT)] public function registerTicket(string $ticket): void @@ -23,6 +34,20 @@ public function registerTicket(string $ticket): void $this->tickets[] = $ticket; } + #[Distributed] + #[CommandHandler(self::CREATE_TICKET_WITH_EVENT_ENDPOINT)] + public function registerTicketWithEvent(string $ticket, EventBus $eventBus): void + { + $delay = array_shift($this->delays); + if ($delay) { + sleep($delay); + } + + $this->tickets[] = $ticket; + + $eventBus->publish(new TicketCreated($ticket)); + } + #[QueryHandler(self::GET_TICKETS_COUNT)] public function getTickets(): int { diff --git a/packages/Amqp/tests/Fixture/DistributedCommandBus/ReceiverEventHandler/TicketNotificationConfig.php b/packages/Amqp/tests/Fixture/DistributedCommandBus/ReceiverEventHandler/TicketNotificationConfig.php new file mode 100644 index 000000000..39a716980 --- /dev/null +++ b/packages/Amqp/tests/Fixture/DistributedCommandBus/ReceiverEventHandler/TicketNotificationConfig.php @@ -0,0 +1,17 @@ +delays); + if ($delay) { + sleep($delay); + } + + $this->ticketNotifications[] = $event->getTicket(); + } + + #[QueryHandler(self::GET_TICKETS_NOTIFICATION_COUNT)] + public function getTicketsNotifications(): int + { + return count($this->ticketNotifications); + } +} \ No newline at end of file diff --git a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php index 9d8ed5687..cf993f721 100644 --- a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php +++ b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php @@ -8,10 +8,15 @@ use Ecotone\Lite\Test\FlowTestSupport; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; +use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; +use Ecotone\Messaging\Handler\Logger\EchoLogger; +use Ecotone\Modelling\DistributedBus; use Enqueue\AmqpExt\AmqpConnectionFactory; use Test\Ecotone\Amqp\AmqpMessagingTest; use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher\UserService; +use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver\TicketServiceMessagingConfiguration; use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver\TicketServiceReceiver; +use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler\TicketNotificationEventHandler; /** * @internal @@ -36,10 +41,40 @@ public function test_distributing_command_to_another_service(): void self::assertEquals(1, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT)); } - private function bootstrapEcotone(string $serviceName, array $namespaces, array $services): FlowTestSupport + public function test_distributing_command_misses_heartbeat_and_reconnects(): void + { + $executionPollingMetadata = ExecutionPollingMetadata::createWithDefaults()->withFinishWhenNoMessages(true); + $userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher'], [new UserService()], ['heartbeat' => 1]); + $delays = [3]; + $ticketService = $this->bootstrapEcotone('ticket_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver', 'Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler'], [new TicketServiceReceiver($delays), new TicketNotificationEventHandler($delays), + //"logger" => new EchoLogger() + ], + amqpConfig: ['heartbeat' => 1] + ); + + $ticketService->run('ticket_service', $executionPollingMetadata); + self::assertEquals(0, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT)); + + $distributedBus = $userService->getGateway(DistributedBus::class); + $distributedBus->sendCommand( + TicketServiceMessagingConfiguration::SERVICE_NAME, + TicketServiceReceiver::CREATE_TICKET_WITH_EVENT_ENDPOINT, + 'User changed billing address', + ); + + $ticketService->run('ticket_service', $executionPollingMetadata); + // Message will fail on acknowledge due to lost heartbeat, yet should stay in queue and be processed after reconnect + self::assertEquals(2, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT)); + + $ticketService->run('async', $executionPollingMetadata); + // distributed command resulted in two events being published, and first event fails on processing due to heartbeat + self::assertEquals(3, $ticketService->sendQueryWithRouting(TicketNotificationEventHandler::GET_TICKETS_NOTIFICATION_COUNT)); + } + + private function bootstrapEcotone(string $serviceName, array $namespaces, array $services, array $amqpConfig = []): FlowTestSupport { return EcotoneLite::bootstrapFlowTesting( - containerOrAvailableServices: array_merge([AmqpConnectionFactory::class => $this->getCachedConnectionFactory()], $services), + containerOrAvailableServices: array_merge([AmqpConnectionFactory::class => $this->getCachedConnectionFactory($amqpConfig)], $services), configuration: ServiceConfiguration::createWithDefaults() ->withServiceName($serviceName) ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::AMQP_PACKAGE])) diff --git a/packages/Enqueue/src/CachedConnectionFactory.php b/packages/Enqueue/src/CachedConnectionFactory.php index 9598ae981..5af2a3f87 100644 --- a/packages/Enqueue/src/CachedConnectionFactory.php +++ b/packages/Enqueue/src/CachedConnectionFactory.php @@ -45,6 +45,11 @@ public function createContext(): Context return $this->cachedContext[$relatedTo]; } + public function reconnect(): void + { + $this->connectionFactory->reconnect(); + } + public function getConsumer(Destination $destination): Consumer { return $this->createContext()->createConsumer($destination); From 8071c60962f895d1783ded4b7d38c7245491bfcb Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 27 Aug 2024 07:59:02 +0200 Subject: [PATCH 02/10] revert subscription based connection --- .../Amqp/src/AmqpInboundChannelAdapter.php | 2 +- .../AmqpReconnectableConnectionFactory.php | 7 ++++++- .../Integration/DistributedCommandBusTest.php | 21 ++++++++++--------- .../AcknowledgeConfirmationInterceptor.php | 6 +++--- .../src/EnqueueInboundChannelAdapter.php | 3 ++- 5 files changed, 23 insertions(+), 16 deletions(-) diff --git a/packages/Amqp/src/AmqpInboundChannelAdapter.php b/packages/Amqp/src/AmqpInboundChannelAdapter.php index c17cb79d3..7a6aa3574 100644 --- a/packages/Amqp/src/AmqpInboundChannelAdapter.php +++ b/packages/Amqp/src/AmqpInboundChannelAdapter.php @@ -79,7 +79,7 @@ public function enrichMessage(EnqueueMessage $sourceMessage, MessageBuilder $tar ); } - public function receiveMessage(int $timeout = 0): ?Message + public function receiveWithTimeout(int $timeout = 0): ?Message { try { if ($this->declareOnStartup && $this->initialized === false) { diff --git a/packages/Amqp/src/AmqpReconnectableConnectionFactory.php b/packages/Amqp/src/AmqpReconnectableConnectionFactory.php index e6be68a23..72fcf09a2 100644 --- a/packages/Amqp/src/AmqpReconnectableConnectionFactory.php +++ b/packages/Amqp/src/AmqpReconnectableConnectionFactory.php @@ -63,7 +63,12 @@ public function isDisconnected(?Context $context): bool public function reconnect(): void { $connectionProperty = $this->getConnectionProperty(); - + + if ($this->subscriptionConsumer) { + try { + $this->subscriptionConsumer->unsubscribeAll(); + }catch (\Exception) {} + } /** @var AMQPConnection $connection */ $connection = $connectionProperty->getValue($this->connectionFactory); if ($connection) { diff --git a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php index cf993f721..9f5a3546e 100644 --- a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php +++ b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php @@ -45,9 +45,8 @@ public function test_distributing_command_misses_heartbeat_and_reconnects(): voi { $executionPollingMetadata = ExecutionPollingMetadata::createWithDefaults()->withFinishWhenNoMessages(true); $userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher'], [new UserService()], ['heartbeat' => 1]); - $delays = [3]; - $ticketService = $this->bootstrapEcotone('ticket_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver', 'Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler'], [new TicketServiceReceiver($delays), new TicketNotificationEventHandler($delays), - //"logger" => new EchoLogger() + $ticketService = $this->bootstrapEcotone('ticket_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver', 'Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler'], [new TicketServiceReceiver([0, 3]), new TicketNotificationEventHandler([0, 3]), + "logger" => new EchoLogger() ], amqpConfig: ['heartbeat' => 1] ); @@ -56,19 +55,21 @@ public function test_distributing_command_misses_heartbeat_and_reconnects(): voi self::assertEquals(0, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT)); $distributedBus = $userService->getGateway(DistributedBus::class); - $distributedBus->sendCommand( - TicketServiceMessagingConfiguration::SERVICE_NAME, - TicketServiceReceiver::CREATE_TICKET_WITH_EVENT_ENDPOINT, - 'User changed billing address', - ); + for ($i = 1; $i <= 2; $i++) { + $distributedBus->sendCommand( + TicketServiceMessagingConfiguration::SERVICE_NAME, + TicketServiceReceiver::CREATE_TICKET_WITH_EVENT_ENDPOINT, + 'User changed billing address', + ); + } $ticketService->run('ticket_service', $executionPollingMetadata); // Message will fail on acknowledge due to lost heartbeat, yet should stay in queue and be processed after reconnect - self::assertEquals(2, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT)); + self::assertEquals(3, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT)); $ticketService->run('async', $executionPollingMetadata); // distributed command resulted in two events being published, and first event fails on processing due to heartbeat - self::assertEquals(3, $ticketService->sendQueryWithRouting(TicketNotificationEventHandler::GET_TICKETS_NOTIFICATION_COUNT)); + self::assertEquals(4, $ticketService->sendQueryWithRouting(TicketNotificationEventHandler::GET_TICKETS_NOTIFICATION_COUNT)); } private function bootstrapEcotone(string $serviceName, array $namespaces, array $services, array $amqpConfig = []): FlowTestSupport diff --git a/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php b/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php index 1119903fa..1730f4277 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php +++ b/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php @@ -62,23 +62,22 @@ public function ack(MethodInvocation $methodInvocation, Message $message, #[Refe $result = $methodInvocation->proceed(); if ($amqpAcknowledgementCallback->isAutoAck()) { - $amqpAcknowledgementCallback->accept(); $logger->info( sprintf('Message with id `%s` acknowledged in Message Channel', $message->getHeaders()->getMessageId()), $message ); + $amqpAcknowledgementCallback->accept(); } } catch (RejectMessageException $exception) { if ($amqpAcknowledgementCallback->isAutoAck()) { - $amqpAcknowledgementCallback->reject(); $logger->info( sprintf('Message with id `%s` rejected in Message Channel', $message->getHeaders()->getMessageId()), $message ); + $amqpAcknowledgementCallback->reject(); } } catch (Throwable $exception) { if ($amqpAcknowledgementCallback->isAutoAck()) { - $amqpAcknowledgementCallback->requeue(); $logger->info( sprintf( 'Message with id `%s` requeued in Message Channel. Due to %s', @@ -87,6 +86,7 @@ public function ack(MethodInvocation $methodInvocation, Message $message, #[Refe ), $message ); + $amqpAcknowledgementCallback->requeue(); } } diff --git a/packages/Enqueue/src/EnqueueInboundChannelAdapter.php b/packages/Enqueue/src/EnqueueInboundChannelAdapter.php index a48349bc4..937734349 100644 --- a/packages/Enqueue/src/EnqueueInboundChannelAdapter.php +++ b/packages/Enqueue/src/EnqueueInboundChannelAdapter.php @@ -68,6 +68,7 @@ public function receiveWithTimeout(int $timeoutInMilliseconds = 0): ?Message return $convertedMessage->build(); } catch (Exception $exception) { if ($this->isConnectionException($exception) || ($exception->getPrevious() && $this->isConnectionException($exception->getPrevious()))) { + $this->connectionFactory->reconnect(); throw new ConnectionException('There was a problem while polling message channel', 0, $exception); } @@ -79,7 +80,7 @@ abstract public function connectionException(): string; private function isConnectionException(Exception $exception): bool { - return is_subclass_of($exception, $this->connectionException()) || $exception::class === $this->connectionException(); + return is_subclass_of($exception, $this->connectionException()) || $exception::class === $this->connectionException() || $exception instanceof ConnectionException; } public function getQueueName(): string From 2559d886c4c30fe4df655c0ad4ea746dec3f6514 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 27 Aug 2024 08:54:53 +0200 Subject: [PATCH 03/10] test fixes --- packages/Amqp/tests/AmqpMessagingTest.php | 1 + .../Integration/DistributedCommandBusTest.php | 14 +++++++------- .../tests/Integration/DistributedEventBusTest.php | 7 ++++++- .../Enqueue/src/EnqueueInboundChannelAdapter.php | 5 ++++- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/packages/Amqp/tests/AmqpMessagingTest.php b/packages/Amqp/tests/AmqpMessagingTest.php index c8e9f9732..9e1350e97 100644 --- a/packages/Amqp/tests/AmqpMessagingTest.php +++ b/packages/Amqp/tests/AmqpMessagingTest.php @@ -64,6 +64,7 @@ public function queueCleanUp(): void $this->deleteQueue(new AmqpQueue(AmqpDistributionModule::CHANNEL_PREFIX . TicketServiceMessagingConfiguration::SERVICE_NAME)); $this->deleteQueue(new AmqpQueue('ecotone_1_delay')); $this->deleteQueue(new AmqpQueue('async')); + $this->deleteQueue(new AmqpQueue('notification_channel')); } private function deleteQueue(AmqpQueue $queue): void diff --git a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php index 9f5a3546e..f07eabfb1 100644 --- a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php +++ b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php @@ -45,8 +45,8 @@ public function test_distributing_command_misses_heartbeat_and_reconnects(): voi { $executionPollingMetadata = ExecutionPollingMetadata::createWithDefaults()->withFinishWhenNoMessages(true); $userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher'], [new UserService()], ['heartbeat' => 1]); - $ticketService = $this->bootstrapEcotone('ticket_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver', 'Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler'], [new TicketServiceReceiver([0, 3]), new TicketNotificationEventHandler([0, 3]), - "logger" => new EchoLogger() + $ticketService = $this->bootstrapEcotone('ticket_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver', 'Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler'], [new TicketServiceReceiver([0, 3, 0]), new TicketNotificationEventHandler([0, 3, 0]), +// "logger" => new EchoLogger() ], amqpConfig: ['heartbeat' => 1] ); @@ -55,7 +55,7 @@ public function test_distributing_command_misses_heartbeat_and_reconnects(): voi self::assertEquals(0, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT)); $distributedBus = $userService->getGateway(DistributedBus::class); - for ($i = 1; $i <= 2; $i++) { + for ($i = 1; $i <= 3; $i++) { $distributedBus->sendCommand( TicketServiceMessagingConfiguration::SERVICE_NAME, TicketServiceReceiver::CREATE_TICKET_WITH_EVENT_ENDPOINT, @@ -64,12 +64,12 @@ public function test_distributing_command_misses_heartbeat_and_reconnects(): voi } $ticketService->run('ticket_service', $executionPollingMetadata); - // Message will fail on acknowledge due to lost heartbeat, yet should stay in queue and be processed after reconnect - self::assertEquals(3, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT)); + // Message may fail on acknowledgment, but it will be redelivered + self::assertGreaterThanOrEqual(3, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT)); $ticketService->run('async', $executionPollingMetadata); - // distributed command resulted in two events being published, and first event fails on processing due to heartbeat - self::assertEquals(4, $ticketService->sendQueryWithRouting(TicketNotificationEventHandler::GET_TICKETS_NOTIFICATION_COUNT)); + // When message failed on acknowledgment, it will re-publish on re-deliver + self::assertGreaterThanOrEqual(3, $ticketService->sendQueryWithRouting(TicketNotificationEventHandler::GET_TICKETS_NOTIFICATION_COUNT)); } private function bootstrapEcotone(string $serviceName, array $namespaces, array $services, array $amqpConfig = []): FlowTestSupport diff --git a/packages/Amqp/tests/Integration/DistributedEventBusTest.php b/packages/Amqp/tests/Integration/DistributedEventBusTest.php index d9191dd61..d7bcf1cdf 100644 --- a/packages/Amqp/tests/Integration/DistributedEventBusTest.php +++ b/packages/Amqp/tests/Integration/DistributedEventBusTest.php @@ -7,8 +7,10 @@ use Ecotone\Amqp\Configuration\AmqpConfiguration; use Ecotone\Lite\EcotoneLite; use Ecotone\Lite\Test\FlowTestSupport; +use Ecotone\Messaging\Channel\PollableChannel\GlobalPollableChannelConfiguration; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; +use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; use Enqueue\AmqpExt\AmqpConnectionFactory; use Test\Ecotone\Amqp\AmqpMessagingTest; use Test\Ecotone\Amqp\Fixture\DistributedEventBus\AsynchronousEventHandler\TicketNotificationSubscriber; @@ -63,7 +65,10 @@ public function test_distributing_event_and_publish_async_private_event(): void public function test_distributing_event_and_publish_async_without_amqp_transactions(): void { - $userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedEventBus\Publisher'], [new UserService()]); + $userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedEventBus\Publisher'], [new UserService()], extensionObjects: [ + GlobalPollableChannelConfiguration::createWithDefaults() + ->withCollector(false) + ]); $ticketService = $this->bootstrapEcotone( 'ticket_service', [ diff --git a/packages/Enqueue/src/EnqueueInboundChannelAdapter.php b/packages/Enqueue/src/EnqueueInboundChannelAdapter.php index 937734349..09843a7f0 100644 --- a/packages/Enqueue/src/EnqueueInboundChannelAdapter.php +++ b/packages/Enqueue/src/EnqueueInboundChannelAdapter.php @@ -68,7 +68,10 @@ public function receiveWithTimeout(int $timeoutInMilliseconds = 0): ?Message return $convertedMessage->build(); } catch (Exception $exception) { if ($this->isConnectionException($exception) || ($exception->getPrevious() && $this->isConnectionException($exception->getPrevious()))) { - $this->connectionFactory->reconnect(); + try { + $this->connectionFactory->reconnect(); + }catch (\Exception) {} + throw new ConnectionException('There was a problem while polling message channel', 0, $exception); } From 83141f822d4732064cddd8903c5edc41ec5a9a07 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 27 Aug 2024 09:03:23 +0200 Subject: [PATCH 04/10] fixes --- packages/Amqp/tests/Integration/DistributedCommandBusTest.php | 2 +- packages/Enqueue/src/EnqueueInboundChannelAdapter.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php index f07eabfb1..bd66ba864 100644 --- a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php +++ b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php @@ -44,7 +44,7 @@ public function test_distributing_command_to_another_service(): void public function test_distributing_command_misses_heartbeat_and_reconnects(): void { $executionPollingMetadata = ExecutionPollingMetadata::createWithDefaults()->withFinishWhenNoMessages(true); - $userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher'], [new UserService()], ['heartbeat' => 1]); + $userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher'], [new UserService()]); $ticketService = $this->bootstrapEcotone('ticket_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver', 'Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler'], [new TicketServiceReceiver([0, 3, 0]), new TicketNotificationEventHandler([0, 3, 0]), // "logger" => new EchoLogger() ], diff --git a/packages/Enqueue/src/EnqueueInboundChannelAdapter.php b/packages/Enqueue/src/EnqueueInboundChannelAdapter.php index 09843a7f0..0f7813877 100644 --- a/packages/Enqueue/src/EnqueueInboundChannelAdapter.php +++ b/packages/Enqueue/src/EnqueueInboundChannelAdapter.php @@ -71,7 +71,7 @@ public function receiveWithTimeout(int $timeoutInMilliseconds = 0): ?Message try { $this->connectionFactory->reconnect(); }catch (\Exception) {} - + throw new ConnectionException('There was a problem while polling message channel', 0, $exception); } From 9cd0e06f71497195601c44051a3d7002bb99c502 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 27 Aug 2024 18:28:19 +0200 Subject: [PATCH 05/10] fixes --- .../Amqp/src/AmqpReconnectableConnectionFactory.php | 7 ++++++- .../tests/Integration/DistributedCommandBusTest.php | 10 ++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/packages/Amqp/src/AmqpReconnectableConnectionFactory.php b/packages/Amqp/src/AmqpReconnectableConnectionFactory.php index 72fcf09a2..2cabfef60 100644 --- a/packages/Amqp/src/AmqpReconnectableConnectionFactory.php +++ b/packages/Amqp/src/AmqpReconnectableConnectionFactory.php @@ -46,7 +46,12 @@ public function getConnectionInstanceId(): string /** * No way to reliable state if amqp is connected: https://github.com/php-amqp/php-amqp/issues/306 - * So to make it more reliable we check other way around, if is disconnected + * So to make it more reliable we check other way around, if is disconnected. + * + * There are situations where connection to AMQP connections becomes zombies. + * In that scenarios triggering an action on the connection will do nothing and will not throw an exception. + * It makes the feeling like anything is fine, yet in reality it is not. + * In those situations it's better to use this method. * @param Context|AmqpContext|null $context */ public function isDisconnected(?Context $context): bool diff --git a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php index bd66ba864..5f7f3b3b0 100644 --- a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php +++ b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php @@ -44,12 +44,10 @@ public function test_distributing_command_to_another_service(): void public function test_distributing_command_misses_heartbeat_and_reconnects(): void { $executionPollingMetadata = ExecutionPollingMetadata::createWithDefaults()->withFinishWhenNoMessages(true); - $userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher'], [new UserService()]); - $ticketService = $this->bootstrapEcotone('ticket_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver', 'Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler'], [new TicketServiceReceiver([0, 3, 0]), new TicketNotificationEventHandler([0, 3, 0]), -// "logger" => new EchoLogger() - ], - amqpConfig: ['heartbeat' => 1] - ); + $userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher'], [new UserService()], amqpConfig: ['heartbeat' => 1]); + $ticketService = $this->bootstrapEcotone('ticket_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver', 'Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler'], [new TicketServiceReceiver([0, 6, 0]), new TicketNotificationEventHandler([0, 6, 0]), + "logger" => new EchoLogger() + ], amqpConfig: ['heartbeat' => 1]); $ticketService->run('ticket_service', $executionPollingMetadata); self::assertEquals(0, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT)); From 3ecbd9ee600fcf3230031be41b659cc4a59b13de Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 27 Aug 2024 18:50:46 +0200 Subject: [PATCH 06/10] test --- packages/Amqp/src/AmqpReconnectableConnectionFactory.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/Amqp/src/AmqpReconnectableConnectionFactory.php b/packages/Amqp/src/AmqpReconnectableConnectionFactory.php index 2cabfef60..82e8d6c42 100644 --- a/packages/Amqp/src/AmqpReconnectableConnectionFactory.php +++ b/packages/Amqp/src/AmqpReconnectableConnectionFactory.php @@ -105,8 +105,12 @@ private function getConnectionProperty(): ReflectionProperty public function getSubscriptionConsumer(string $queueName, callable $subscriptionCallback): SubscriptionConsumer { + /** @var AmqpContext $context */ $context = $this->createContext(); if ($this->subscriptionConsumer === null) { + $channel = $context->getExtChannel(); + $channel->setGlobalPrefetchCount(0); + $this->subscriptionConsumer = $context->createSubscriptionConsumer(); /** @var AmqpConsumer $consumer */ From 9dd620357547863122421f17de7aca58d19dc0b4 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 27 Aug 2024 20:06:38 +0200 Subject: [PATCH 07/10] fix tests --- packages/Amqp/src/AmqpReconnectableConnectionFactory.php | 6 ++---- .../Receiver/TicketServiceMessagingConfiguration.php | 2 +- .../Amqp/tests/Integration/DistributedCommandBusTest.php | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/Amqp/src/AmqpReconnectableConnectionFactory.php b/packages/Amqp/src/AmqpReconnectableConnectionFactory.php index 82e8d6c42..3064af38c 100644 --- a/packages/Amqp/src/AmqpReconnectableConnectionFactory.php +++ b/packages/Amqp/src/AmqpReconnectableConnectionFactory.php @@ -105,11 +105,9 @@ private function getConnectionProperty(): ReflectionProperty public function getSubscriptionConsumer(string $queueName, callable $subscriptionCallback): SubscriptionConsumer { - /** @var AmqpContext $context */ - $context = $this->createContext(); if ($this->subscriptionConsumer === null) { - $channel = $context->getExtChannel(); - $channel->setGlobalPrefetchCount(0); + /** @var AmqpContext $context */ + $context = $this->createContext(); $this->subscriptionConsumer = $context->createSubscriptionConsumer(); diff --git a/packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/TicketServiceMessagingConfiguration.php b/packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/TicketServiceMessagingConfiguration.php index d78e2eafe..b6b65bf2a 100644 --- a/packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/TicketServiceMessagingConfiguration.php +++ b/packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/TicketServiceMessagingConfiguration.php @@ -19,7 +19,7 @@ public function configure() return [ AmqpDistributedBusConfiguration::createConsumer(), PollingMetadata::create(self::SERVICE_NAME) - ->setHandledMessageLimit(1) + ->setHandledMessageLimit(5) ->setExecutionTimeLimitInMilliseconds(5000), ]; } diff --git a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php index 5f7f3b3b0..98f579a91 100644 --- a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php +++ b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php @@ -43,7 +43,7 @@ public function test_distributing_command_to_another_service(): void public function test_distributing_command_misses_heartbeat_and_reconnects(): void { - $executionPollingMetadata = ExecutionPollingMetadata::createWithDefaults()->withFinishWhenNoMessages(true); + $executionPollingMetadata = ExecutionPollingMetadata::createWithDefaults()->withExecutionTimeLimitInMilliseconds(10000); $userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher'], [new UserService()], amqpConfig: ['heartbeat' => 1]); $ticketService = $this->bootstrapEcotone('ticket_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver', 'Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler'], [new TicketServiceReceiver([0, 6, 0]), new TicketNotificationEventHandler([0, 6, 0]), "logger" => new EchoLogger() From c6a93b2327f7526e7b7f88df05f2a0dce9c885f4 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 27 Aug 2024 20:12:03 +0200 Subject: [PATCH 08/10] force --- packages/Amqp/tests/Integration/DistributedCommandBusTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php index 98f579a91..35c5e3de0 100644 --- a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php +++ b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php @@ -43,7 +43,7 @@ public function test_distributing_command_to_another_service(): void public function test_distributing_command_misses_heartbeat_and_reconnects(): void { - $executionPollingMetadata = ExecutionPollingMetadata::createWithDefaults()->withExecutionTimeLimitInMilliseconds(10000); + $executionPollingMetadata = ExecutionPollingMetadata::createWithDefaults()->withExecutionTimeLimitInMilliseconds(10000)->withStopOnError(false); $userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher'], [new UserService()], amqpConfig: ['heartbeat' => 1]); $ticketService = $this->bootstrapEcotone('ticket_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver', 'Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler'], [new TicketServiceReceiver([0, 6, 0]), new TicketNotificationEventHandler([0, 6, 0]), "logger" => new EchoLogger() From a156a3f73b21f60f59befe16cef522b8d1f2e688 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 27 Aug 2024 20:32:13 +0200 Subject: [PATCH 09/10] initialize --- packages/Amqp/src/AmqpInboundChannelAdapter.php | 2 +- packages/Enqueue/src/EnqueueInboundChannelAdapter.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/Amqp/src/AmqpInboundChannelAdapter.php b/packages/Amqp/src/AmqpInboundChannelAdapter.php index 7a6aa3574..9b46b0d8e 100644 --- a/packages/Amqp/src/AmqpInboundChannelAdapter.php +++ b/packages/Amqp/src/AmqpInboundChannelAdapter.php @@ -103,7 +103,7 @@ public function receiveWithTimeout(int $timeout = 0): ?Message $subscriptionConsumer->consume($timeout ?: $this->receiveTimeoutInMilliseconds); return $this->queueChannel->receive(); - } catch (AMQPConnectionException $exception) { + } catch (AMQPConnectionException|\AMQPChannelException $exception) { throw new ConnectionException('Failed to connect to AMQP broker', 0, $exception); } } diff --git a/packages/Enqueue/src/EnqueueInboundChannelAdapter.php b/packages/Enqueue/src/EnqueueInboundChannelAdapter.php index 0f7813877..287f0f4e4 100644 --- a/packages/Enqueue/src/EnqueueInboundChannelAdapter.php +++ b/packages/Enqueue/src/EnqueueInboundChannelAdapter.php @@ -83,7 +83,7 @@ abstract public function connectionException(): string; private function isConnectionException(Exception $exception): bool { - return is_subclass_of($exception, $this->connectionException()) || $exception::class === $this->connectionException() || $exception instanceof ConnectionException; + return is_subclass_of($exception, $this->connectionException()) || $exception::class === $this->connectionException(); } public function getQueueName(): string From 732ba9dcd8e9393f61f6a7723a8ea72885d5d465 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 27 Aug 2024 20:40:18 +0200 Subject: [PATCH 10/10] reconnect --- packages/Amqp/src/AmqpInboundChannelAdapter.php | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/Amqp/src/AmqpInboundChannelAdapter.php b/packages/Amqp/src/AmqpInboundChannelAdapter.php index 9b46b0d8e..44ffa233a 100644 --- a/packages/Amqp/src/AmqpInboundChannelAdapter.php +++ b/packages/Amqp/src/AmqpInboundChannelAdapter.php @@ -104,6 +104,7 @@ public function receiveWithTimeout(int $timeout = 0): ?Message return $this->queueChannel->receive(); } catch (AMQPConnectionException|\AMQPChannelException $exception) { + $this->connectionFactory->reconnect(); throw new ConnectionException('Failed to connect to AMQP broker', 0, $exception); } }