Skip to content

Commit

Permalink
Amqp self heal on acknowledge failures (#370)
Browse files Browse the repository at this point in the history
* Self healing on acknowledge failures
  • Loading branch information
dgafka authored Aug 27, 2024
1 parent fa473bc commit 3da6eb1
Show file tree
Hide file tree
Showing 15 changed files with 260 additions and 18 deletions.
62 changes: 62 additions & 0 deletions packages/Amqp/src/AmqpAcknowledgeCallbackWraper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

declare(strict_types=1);

namespace Ecotone\Amqp;

use Ecotone\Enqueue\CachedConnectionFactory;
use Ecotone\Enqueue\EnqueueAcknowledgementCallback;
use Ecotone\Enqueue\ReconnectableConnectionFactory;
use Ecotone\Messaging\Endpoint\AcknowledgementCallback;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;

final class AmqpAcknowledgeCallbackWraper implements AcknowledgementCallback
{
public function __construct(private EnqueueAcknowledgementCallback $acknowledgementCallback, private CachedConnectionFactory $connectionFactory, private LoggingGateway $loggingGateway)
{

}

public function isAutoAck(): bool
{
return $this->acknowledgementCallback->isAutoAck();
}

public function disableAutoAck(): void
{
$this->acknowledgementCallback->disableAutoAck();
}

public function accept(): void
{
try {
$this->acknowledgementCallback->accept();
}catch (\Exception $exception) {
$this->loggingGateway->info("Failed to acknowledge message, disconnecting AMQP Connection in order to self-heal. Failure happen due to: " . $exception->getMessage(), exception: $exception);

$this->connectionFactory->reconnect();
}
}

public function reject(): void
{
try {
$this->acknowledgementCallback->reject();
}catch (\Exception $exception) {
$this->loggingGateway->info("Failed to reject message, disconnecting AMQP Connection in order to self-heal. Failure happen due to: " . $exception->getMessage(), exception: $exception);

$this->connectionFactory->reconnect();
}
}

public function requeue(): void
{
try {
$this->acknowledgementCallback->requeue();
}catch (\Exception $exception) {
$this->loggingGateway->info("Failed to requeue message, disconnecting AMQP Connection in order to self-heal. Failure happen due to: " . $exception->getMessage(), exception: $exception);

$this->connectionFactory->reconnect();
}
}
}
22 changes: 17 additions & 5 deletions packages/Amqp/src/AmqpInboundChannelAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@

use AMQPConnectionException;
use Ecotone\Enqueue\CachedConnectionFactory;
use Ecotone\Enqueue\EnqueueHeader;
use Ecotone\Enqueue\EnqueueInboundChannelAdapter;
use Ecotone\Enqueue\InboundMessageConverter;
use Ecotone\Messaging\Channel\QueueChannel;
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Endpoint\PollingConsumer\ConnectionException;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\Support\MessageBuilder;
use Interop\Amqp\AmqpMessage;
use Interop\Queue\Consumer;
Expand All @@ -32,13 +35,14 @@ class AmqpInboundChannelAdapter extends EnqueueInboundChannelAdapter
private QueueChannel $queueChannel;

public function __construct(
CachedConnectionFactory $cachedConnectionFactory,
private CachedConnectionFactory $cachedConnectionFactory,
private AmqpAdmin $amqpAdmin,
bool $declareOnStartup,
string $queueName,
int $receiveTimeoutInMilliseconds,
InboundMessageConverter $inboundMessageConverter,
ConversionService $conversionService
ConversionService $conversionService,
private LoggingGateway $loggingGateway,
) {
parent::__construct(
$cachedConnectionFactory,
Expand All @@ -65,10 +69,17 @@ public function enrichMessage(EnqueueMessage $sourceMessage, MessageBuilder $tar
$targetMessage = $targetMessage->setContentType(MediaType::parseMediaType($sourceMessage->getContentType()));
}

return $targetMessage;
return $targetMessage->setHeader(
EnqueueHeader::HEADER_ACKNOWLEDGE,
new AmqpAcknowledgeCallbackWraper(
$targetMessage->getHeaderWithName(EnqueueHeader::HEADER_ACKNOWLEDGE),
$this->cachedConnectionFactory,
$this->loggingGateway,
)
);
}

public function receiveMessage(int $timeout = 0): ?Message
public function receiveWithTimeout(int $timeout = 0): ?Message
{
try {
if ($this->declareOnStartup && $this->initialized === false) {
Expand All @@ -92,7 +103,8 @@ public function receiveMessage(int $timeout = 0): ?Message
$subscriptionConsumer->consume($timeout ?: $this->receiveTimeoutInMilliseconds);

return $this->queueChannel->receive();
} catch (AMQPConnectionException $exception) {
} catch (AMQPConnectionException|\AMQPChannelException $exception) {
$this->connectionFactory->reconnect();
throw new ConnectionException('Failed to connect to AMQP broker', 0, $exception);
}
}
Expand Down
2 changes: 2 additions & 0 deletions packages/Amqp/src/AmqpInboundChannelAdapterBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\MessageConverter\DefaultHeaderMapper;
use Enqueue\AmqpExt\AmqpConnectionFactory;
use Ramsey\Uuid\Uuid;
Expand Down Expand Up @@ -59,6 +60,7 @@ public function compile(MessagingContainerBuilder $builder): Definition
$this->receiveTimeoutInMilliseconds,
$inboundMessageConverter,
new Reference(ConversionService::REFERENCE_NAME),
new Reference(LoggingGateway::class),
]);
}
}
16 changes: 14 additions & 2 deletions packages/Amqp/src/AmqpReconnectableConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ public function getConnectionInstanceId(): string

/**
* No way to reliable state if amqp is connected: https://github.com/php-amqp/php-amqp/issues/306
* So to make it more reliable we check other way around, if is disconnected
* So to make it more reliable we check other way around, if is disconnected.
*
* There are situations where connection to AMQP connections becomes zombies.
* In that scenarios triggering an action on the connection will do nothing and will not throw an exception.
* It makes the feeling like anything is fine, yet in reality it is not.
* In those situations it's better to use this method.
* @param Context|AmqpContext|null $context
*/
public function isDisconnected(?Context $context): bool
Expand All @@ -64,6 +69,11 @@ public function reconnect(): void
{
$connectionProperty = $this->getConnectionProperty();

if ($this->subscriptionConsumer) {
try {
$this->subscriptionConsumer->unsubscribeAll();
}catch (\Exception) {}
}
/** @var AMQPConnection $connection */
$connection = $connectionProperty->getValue($this->connectionFactory);
if ($connection) {
Expand Down Expand Up @@ -95,8 +105,10 @@ private function getConnectionProperty(): ReflectionProperty

public function getSubscriptionConsumer(string $queueName, callable $subscriptionCallback): SubscriptionConsumer
{
$context = $this->createContext();
if ($this->subscriptionConsumer === null) {
/** @var AmqpContext $context */
$context = $this->createContext();

$this->subscriptionConsumer = $context->createSubscriptionConsumer();

/** @var AmqpConsumer $consumer */
Expand Down
15 changes: 11 additions & 4 deletions packages/Amqp/tests/AmqpMessagingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,22 @@ abstract class AmqpMessagingTest extends TestCase
/**
* @return AmqpConnectionFactory
*/
public function getCachedConnectionFactory(): AmqpConnectionFactory
public function getCachedConnectionFactory(array $config = []): AmqpConnectionFactory
{
return self::getRabbitConnectionFactory();
return self::getRabbitConnectionFactory($config);
}

/**
* @return AmqpConnectionFactory
*/
public static function getRabbitConnectionFactory(): AmqpConnectionFactory
public static function getRabbitConnectionFactory(array $config = []): AmqpConnectionFactory
{
return new AmqpLibConnection(['dsn' => getenv('RABBIT_HOST') ? getenv('RABBIT_HOST') : 'amqp://guest:guest@localhost:5672/%2f']);
return new AmqpLibConnection(
array_merge(
['dsn' => getenv('RABBIT_HOST') ? getenv('RABBIT_HOST') : 'amqp://guest:guest@localhost:5672/%2f'],
$config,
)
);
}

public function setUp(): void
Expand All @@ -58,6 +63,8 @@ public function queueCleanUp(): void
$this->deleteQueue(new AmqpQueue('distributed_ticket_service'));
$this->deleteQueue(new AmqpQueue(AmqpDistributionModule::CHANNEL_PREFIX . TicketServiceMessagingConfiguration::SERVICE_NAME));
$this->deleteQueue(new AmqpQueue('ecotone_1_delay'));
$this->deleteQueue(new AmqpQueue('async'));
$this->deleteQueue(new AmqpQueue('notification_channel'));
}

private function deleteQueue(AmqpQueue $queue): void
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver\Event;

final class TicketCreated
{
public function __construct(private string $ticket)
{
}

public function getTicket(): string
{
return $this->ticket;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function configure()
return [
AmqpDistributedBusConfiguration::createConsumer(),
PollingMetadata::create(self::SERVICE_NAME)
->setHandledMessageLimit(1)
->setHandledMessageLimit(5)
->setExecutionTimeLimitInMilliseconds(5000),
];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,52 @@

namespace Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver;

use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Attribute\Parameter\Header;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\Distributed;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Ecotone\Modelling\EventBus;
use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver\Event\TicketCreated;

/**
* licence Apache-2.0
*/
class TicketServiceReceiver
{
public const CREATE_TICKET_ENDPOINT = 'createTicket';
public const CREATE_TICKET_WITH_EVENT_ENDPOINT = 'createTicketWithEvent';
public const GET_TICKETS_COUNT = 'getTicketsCount';

private array $tickets = [];

public function __construct(private array $delays = [])
{

}

#[Distributed]
#[CommandHandler(self::CREATE_TICKET_ENDPOINT)]
public function registerTicket(string $ticket): void
{
$this->tickets[] = $ticket;
}

#[Distributed]
#[CommandHandler(self::CREATE_TICKET_WITH_EVENT_ENDPOINT)]
public function registerTicketWithEvent(string $ticket, EventBus $eventBus): void
{
$delay = array_shift($this->delays);
if ($delay) {
sleep($delay);
}

$this->tickets[] = $ticket;

$eventBus->publish(new TicketCreated($ticket));
}

#[QueryHandler(self::GET_TICKETS_COUNT)]
public function getTickets(): int
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler;

use Ecotone\Amqp\AmqpBackedMessageChannelBuilder;
use Ecotone\Messaging\Attribute\ServiceContext;

final class TicketNotificationConfig
{
#[ServiceContext]
public function channel()
{
return AmqpBackedMessageChannelBuilder::create('async');
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Amqp\Fixture\DistributedCommandBus\ReceiverEventHandler;

use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Test\Ecotone\Amqp\Fixture\DistributedCommandBus\Receiver\Event\TicketCreated;

final class TicketNotificationEventHandler
{
public const GET_TICKETS_NOTIFICATION_COUNT = 'getTicketsNotificationCount';

private array $ticketNotifications = [];

public function __construct(private array $delays = [])
{

}

#[Asynchronous('async')]
#[EventHandler(endpointId: 'notify')]
public function notify(TicketCreated $event): void
{
$delay = array_shift($this->delays);
if ($delay) {
sleep($delay);
}

$this->ticketNotifications[] = $event->getTicket();
}

#[QueryHandler(self::GET_TICKETS_NOTIFICATION_COUNT)]
public function getTicketsNotifications(): int
{
return count($this->ticketNotifications);
}
}
Loading

0 comments on commit 3da6eb1

Please sign in to comment.