Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amqp self heal on acknowledge failures #370

Merged
merged 10 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions packages/Amqp/src/AmqpAcknowledgeCallbackWraper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

declare(strict_types=1);

namespace Ecotone\Amqp;

use Ecotone\Enqueue\CachedConnectionFactory;
use Ecotone\Enqueue\EnqueueAcknowledgementCallback;
use Ecotone\Enqueue\ReconnectableConnectionFactory;
use Ecotone\Messaging\Endpoint\AcknowledgementCallback;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;

final class AmqpAcknowledgeCallbackWraper implements AcknowledgementCallback
{
public function __construct(private EnqueueAcknowledgementCallback $acknowledgementCallback, private CachedConnectionFactory $connectionFactory, private LoggingGateway $loggingGateway)
{

}

public function isAutoAck(): bool
{
return $this->acknowledgementCallback->isAutoAck();
}

public function disableAutoAck(): void
{
$this->acknowledgementCallback->disableAutoAck();
}

public function accept(): void
{
try {
$this->acknowledgementCallback->accept();
}catch (\Exception $exception) {
$this->loggingGateway->info("Failed to acknowledge message, disconnecting AMQP Connection in order to self-heal. Failure happen due to: " . $exception->getMessage(), exception: $exception);

$this->connectionFactory->reconnect();
}
}

public function reject(): void
{
try {
$this->acknowledgementCallback->reject();
}catch (\Exception $exception) {
$this->loggingGateway->info("Failed to reject message, disconnecting AMQP Connection in order to self-heal. Failure happen due to: " . $exception->getMessage(), exception: $exception);

$this->connectionFactory->reconnect();
}
}

public function requeue(): void
{
try {
$this->acknowledgementCallback->requeue();
}catch (\Exception $exception) {
$this->loggingGateway->info("Failed to requeue message, disconnecting AMQP Connection in order to self-heal. Failure happen due to: " . $exception->getMessage(), exception: $exception);

$this->connectionFactory->reconnect();
}
}
}
19 changes: 15 additions & 4 deletions packages/Amqp/src/AmqpInboundChannelAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@

use AMQPConnectionException;
use Ecotone\Enqueue\CachedConnectionFactory;
use Ecotone\Enqueue\EnqueueHeader;
use Ecotone\Enqueue\EnqueueInboundChannelAdapter;
use Ecotone\Enqueue\InboundMessageConverter;
use Ecotone\Messaging\Channel\QueueChannel;
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Endpoint\PollingConsumer\ConnectionException;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\Support\MessageBuilder;
use Interop\Amqp\AmqpMessage;
use Interop\Queue\Consumer;
Expand All @@ -32,13 +35,14 @@ class AmqpInboundChannelAdapter extends EnqueueInboundChannelAdapter
private QueueChannel $queueChannel;

public function __construct(
CachedConnectionFactory $cachedConnectionFactory,
private CachedConnectionFactory $cachedConnectionFactory,
private AmqpAdmin $amqpAdmin,
bool $declareOnStartup,
string $queueName,
int $receiveTimeoutInMilliseconds,
InboundMessageConverter $inboundMessageConverter,
ConversionService $conversionService
ConversionService $conversionService,
private LoggingGateway $loggingGateway,
) {
parent::__construct(
$cachedConnectionFactory,
Expand All @@ -65,10 +69,17 @@ public function enrichMessage(EnqueueMessage $sourceMessage, MessageBuilder $tar
$targetMessage = $targetMessage->setContentType(MediaType::parseMediaType($sourceMessage->getContentType()));
}

return $targetMessage;
return $targetMessage->setHeader(
EnqueueHeader::HEADER_ACKNOWLEDGE,
new AmqpAcknowledgeCallbackWraper(
$targetMessage->getHeaderWithName(EnqueueHeader::HEADER_ACKNOWLEDGE),
$this->cachedConnectionFactory,
$this->loggingGateway,
)
);
}

public function receiveMessage(int $timeout = 0): ?Message
public function receiveWithTimeout(int $timeout = 0): ?Message
dgafka marked this conversation as resolved.
Show resolved Hide resolved
{
try {
if ($this->declareOnStartup && $this->initialized === false) {
Expand Down
2 changes: 2 additions & 0 deletions packages/Amqp/src/AmqpInboundChannelAdapterBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\MessageConverter\DefaultHeaderMapper;
use Enqueue\AmqpExt\AmqpConnectionFactory;
use Ramsey\Uuid\Uuid;
Expand Down Expand Up @@ -59,6 +60,7 @@ public function compile(MessagingContainerBuilder $builder): Definition
$this->receiveTimeoutInMilliseconds,
$inboundMessageConverter,
new Reference(ConversionService::REFERENCE_NAME),
new Reference(LoggingGateway::class),
]);
}
}
5 changes: 5 additions & 0 deletions packages/Amqp/src/AmqpReconnectableConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public function reconnect(): void
{
$connectionProperty = $this->getConnectionProperty();

if ($this->subscriptionConsumer) {
try {
$this->subscriptionConsumer->unsubscribeAll();
}catch (\Exception) {}
}
/** @var AMQPConnection $connection */
$connection = $connectionProperty->getValue($this->connectionFactory);
if ($connection) {
Expand Down
14 changes: 10 additions & 4 deletions packages/Amqp/tests/AmqpMessagingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,22 @@ abstract class AmqpMessagingTest extends TestCase
/**
* @return AmqpConnectionFactory
*/
public function getCachedConnectionFactory(): AmqpConnectionFactory
public function getCachedConnectionFactory(array $config = []): AmqpConnectionFactory
{
return self::getRabbitConnectionFactory();
return self::getRabbitConnectionFactory($config);
}

/**
* @return AmqpConnectionFactory
*/
public static function getRabbitConnectionFactory(): AmqpConnectionFactory
public static function getRabbitConnectionFactory(array $config = []): AmqpConnectionFactory
{
return new AmqpLibConnection(['dsn' => getenv('RABBIT_HOST') ? getenv('RABBIT_HOST') : 'amqp://guest:guest@localhost:5672/%2f']);
return new AmqpLibConnection(
array_merge(
['dsn' => getenv('RABBIT_HOST') ? getenv('RABBIT_HOST') : 'amqp://guest:guest@localhost:5672/%2f'],
$config,
)
);
}

public function setUp(): void
Expand All @@ -58,6 +63,7 @@ public function queueCleanUp(): void
$this->deleteQueue(new AmqpQueue('distributed_ticket_service'));
$this->deleteQueue(new AmqpQueue(AmqpDistributionModule::CHANNEL_PREFIX . TicketServiceMessagingConfiguration::SERVICE_NAME));
$this->deleteQueue(new AmqpQueue('ecotone_1_delay'));
$this->deleteQueue(new AmqpQueue('async'));
}

private function deleteQueue(AmqpQueue $queue): void
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver\Event;

final class TicketCreated
{
public function __construct(private string $ticket)
{
}

public function getTicket(): string
{
return $this->ticket;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,52 @@

namespace Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver;

use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Attribute\Parameter\Header;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\Distributed;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Ecotone\Modelling\EventBus;
use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver\Event\TicketCreated;

/**
* licence Apache-2.0
*/
class TicketServiceReceiver
{
public const CREATE_TICKET_ENDPOINT = 'createTicket';
public const CREATE_TICKET_WITH_EVENT_ENDPOINT = 'createTicketWithEvent';
public const GET_TICKETS_COUNT = 'getTicketsCount';

private array $tickets = [];

public function __construct(private array $delays = [])
{

}

#[Distributed]
#[CommandHandler(self::CREATE_TICKET_ENDPOINT)]
public function registerTicket(string $ticket): void
{
$this->tickets[] = $ticket;
}

#[Distributed]
#[CommandHandler(self::CREATE_TICKET_WITH_EVENT_ENDPOINT)]
public function registerTicketWithEvent(string $ticket, EventBus $eventBus): void
{
$delay = array_shift($this->delays);
if ($delay) {
sleep($delay);
}

$this->tickets[] = $ticket;

$eventBus->publish(new TicketCreated($ticket));
}

#[QueryHandler(self::GET_TICKETS_COUNT)]
public function getTickets(): int
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler;

use Ecotone\Amqp\AmqpBackedMessageChannelBuilder;
use Ecotone\Messaging\Attribute\ServiceContext;

final class TicketNotificationConfig
{
#[ServiceContext]
public function channel()
{
return AmqpBackedMessageChannelBuilder::create('async');
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler;

use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver\Event\TicketCreated;

final class TicketNotificationEventHandler
{
public const GET_TICKETS_NOTIFICATION_COUNT = 'getTicketsNotificationCount';

private array $ticketNotifications = [];

public function __construct(private array $delays = [])
{

}

#[Asynchronous('async')]
#[EventHandler(endpointId: 'notify')]
public function notify(TicketCreated $event): void
{
$delay = array_shift($this->delays);
if ($delay) {
sleep($delay);
}

$this->ticketNotifications[] = $event->getTicket();
}

#[QueryHandler(self::GET_TICKETS_NOTIFICATION_COUNT)]
public function getTicketsNotifications(): int
{
return count($this->ticketNotifications);
}
}
40 changes: 38 additions & 2 deletions packages/Amqp/tests/Integration/DistributedCommandBusTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@
use Ecotone\Lite\Test\FlowTestSupport;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
use Ecotone\Messaging\Handler\Logger\EchoLogger;
use Ecotone\Modelling\DistributedBus;
use Enqueue\AmqpExt\AmqpConnectionFactory;
use Test\Ecotone\Amqp\AmqpMessagingTest;
use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher\UserService;
use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver\TicketServiceMessagingConfiguration;
use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver\TicketServiceReceiver;
use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler\TicketNotificationEventHandler;

/**
* @internal
Expand All @@ -36,10 +41,41 @@ public function test_distributing_command_to_another_service(): void
self::assertEquals(1, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT));
}

private function bootstrapEcotone(string $serviceName, array $namespaces, array $services): FlowTestSupport
public function test_distributing_command_misses_heartbeat_and_reconnects(): void
{
$executionPollingMetadata = ExecutionPollingMetadata::createWithDefaults()->withFinishWhenNoMessages(true);
$userService = $this->bootstrapEcotone('user_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Publisher'], [new UserService()], ['heartbeat' => 1]);
$ticketService = $this->bootstrapEcotone('ticket_service', ['Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver', 'Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler'], [new TicketServiceReceiver([0, 3]), new TicketNotificationEventHandler([0, 3]),
"logger" => new EchoLogger()
],
amqpConfig: ['heartbeat' => 1]
);

$ticketService->run('ticket_service', $executionPollingMetadata);
self::assertEquals(0, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT));

$distributedBus = $userService->getGateway(DistributedBus::class);
for ($i = 1; $i <= 2; $i++) {
$distributedBus->sendCommand(
TicketServiceMessagingConfiguration::SERVICE_NAME,
TicketServiceReceiver::CREATE_TICKET_WITH_EVENT_ENDPOINT,
'User changed billing address',
);
}

$ticketService->run('ticket_service', $executionPollingMetadata);
// Message will fail on acknowledge due to lost heartbeat, yet should stay in queue and be processed after reconnect
self::assertEquals(3, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT));

$ticketService->run('async', $executionPollingMetadata);
// distributed command resulted in two events being published, and first event fails on processing due to heartbeat
self::assertEquals(4, $ticketService->sendQueryWithRouting(TicketNotificationEventHandler::GET_TICKETS_NOTIFICATION_COUNT));
}

private function bootstrapEcotone(string $serviceName, array $namespaces, array $services, array $amqpConfig = []): FlowTestSupport
{
return EcotoneLite::bootstrapFlowTesting(
containerOrAvailableServices: array_merge([AmqpConnectionFactory::class => $this->getCachedConnectionFactory()], $services),
containerOrAvailableServices: array_merge([AmqpConnectionFactory::class => $this->getCachedConnectionFactory($amqpConfig)], $services),
configuration: ServiceConfiguration::createWithDefaults()
->withServiceName($serviceName)
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::AMQP_PACKAGE]))
Expand Down
Loading
Loading