Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 30 additions & 29 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

final class Adapter implements AdapterInterface
{
Expand All @@ -23,64 +25,63 @@ public function __construct(

public function withChannel(string $channel): self
{
$instance = clone $this;
$instance->queueProvider = $this->queueProvider->withChannelName($channel);
$new = clone $this;
$new->queueProvider = $this->queueProvider->withChannelName($channel);

return $instance;
return $new;
}

/**
* @param callable(MessageInterface): bool $handlerCallback
* @param callable(MessageInterface): bool $handlerCallback
*/
public function runExisting(callable $handlerCallback): void
{
$channel = $this->queueProvider->getChannel();
(new ExistingMessagesConsumer($channel, $this->queueProvider
->getQueueSettings()
->getName(), $this->serializer))
->consume($handlerCallback);
$queueName = $this->queueProvider->getQueueSettings()->getName();
$consumer = new ExistingMessagesConsumer(
$channel,
$queueName,
$this->serializer
);

$consumer->consume($handlerCallback);
}

/**
* @return never
*/
public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
$payload = $this->serializer->serialize($message);
$amqpMessage = new AMQPMessage(
$payload,
array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
);
$exchangeSettings = $this->queueProvider->getExchangeSettings();
$this->queueProvider
->getChannel()
->basic_publish(
$amqpMessage,
$exchangeSettings?->getName() ?? '',
$exchangeSettings ? '' : $this->queueProvider
->getQueueSettings()
->getName()
);
$channel = $this->queueProvider->getChannel();
$channel->basic_publish(
$amqpMessage,
$exchangeSettings?->getName() ?? '',
$exchangeSettings ? '' : $this->queueProvider
->getQueueSettings()
->getName()
);
/** @var string $messageId */
$messageId = $amqpMessage->get('message_id');
$message->setId($messageId);

return new IdEnvelope($message, $messageId);
}

public function subscribe(callable $handlerCallback): void
{
$channel = $this->queueProvider->getChannel();
$queueName = $this->queueProvider->getQueueSettings()->getName();

$channel->basic_consume(
$this->queueProvider
->getQueueSettings()
->getName(),
$this->queueProvider
->getQueueSettings()
->getName(),
$queueName,
$queueName,
false,
false,
false,
Expand Down
54 changes: 0 additions & 54 deletions src/Exception/NoKeyInPayloadException.php

This file was deleted.

1 change: 1 addition & 0 deletions src/ExistingMessagesConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

/**
* @internal
Expand Down
64 changes: 0 additions & 64 deletions src/MessageSerializer.php

This file was deleted.

14 changes: 0 additions & 14 deletions src/MessageSerializerInterface.php

This file was deleted.

21 changes: 14 additions & 7 deletions src/Middleware/DelayMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,27 @@ public function processPush(PushRequest $request, MessageHandlerPushInterface $h
{
$adapter = $request->getAdapter();
if (!$adapter instanceof Adapter) {
$type = get_debug_type($adapter);
$class = Adapter::class;
throw new InvalidArgumentException(
"This middleware works only with the $class. $type given."
sprintf(
'This middleware works only with the %s. %s given.',
Adapter::class,
get_debug_type($adapter)
)
);
}

$queueProvider = $adapter->getQueueProvider();
$exchangeSettings = $this->getExchangeSettings($queueProvider->getExchangeSettings());
$queueSettings = $this->getQueueSettings($queueProvider->getQueueSettings(), $queueProvider->getExchangeSettings());
$originalExchangeSettings = $queueProvider->getExchangeSettings();
$delayedExchangeSettings = $this->getExchangeSettings($originalExchangeSettings);
$queueSettings = $this->getQueueSettings(
$queueProvider->getQueueSettings(),
$originalExchangeSettings
);

$adapter = $adapter->withQueueProvider(
$queueProvider
->withMessageProperties($this->getMessageProperties($queueProvider))
->withExchangeSettings($exchangeSettings)
->withExchangeSettings($delayedExchangeSettings)
->withQueueSettings($queueSettings)
);

Expand Down Expand Up @@ -104,7 +111,7 @@ private function getExchangeSettings(?ExchangeSettingsInterface $exchangeSetting
/** @noinspection NullPointerExceptionInspection */
return $exchangeSettings
?->withName("{$exchangeSettings->getName()}.dlx")
->withAutoDelete(true)
->withAutoDelete(false)
->withType(AMQPExchangeType::TOPIC);
}
}
1 change: 1 addition & 0 deletions src/QueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public function __construct(
public function __destruct()
{
$this->channel?->close();
//unset($this->channel);
}

public function getChannel(): AMQPChannel
Expand Down
2 changes: 1 addition & 1 deletion src/Settings/Exchange.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function __construct(
private string $type = AMQPExchangeType::DIRECT,
private bool $passive = false,
private bool $durable = false,
private bool $autoDelete = true,
private bool $autoDelete = false,
private bool $internal = false,
private bool $nowait = false,
private AMQPTable|array $arguments = [],
Expand Down
2 changes: 1 addition & 1 deletion src/Settings/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function __construct(
private bool $passive = false,
private bool $durable = false,
private bool $exclusive = false,
private bool $autoDelete = true,
private bool $autoDelete = false,
private bool $nowait = false,
private AMQPTable|array $arguments = [],
private ?int $ticket = null
Expand Down
9 changes: 5 additions & 4 deletions tests/Integration/DelayMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use Psr\Log\LoggerInterface;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\Adapter;
use Yiisoft\Queue\AMQP\MessageSerializer;

use Yiisoft\Queue\AMQP\Middleware\DelayMiddleware;
use Yiisoft\Queue\AMQP\QueueProvider;
use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings;
Expand All @@ -18,6 +18,7 @@
use Yiisoft\Queue\AMQP\Tests\Support\SimpleMessageHandler;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Cli\SignalLoop;
use Yiisoft\Queue\Message\JsonMessageSerializer;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Middleware\CallableFactory;
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush;
Expand All @@ -44,7 +45,7 @@ public function testMainFlow(): void
$this->createConnection(),
new QueueSettings(),
),
new MessageSerializer(),
new JsonMessageSerializer(),
new SignalLoop(),
);
$queue = $this->makeQueue($adapter);
Expand Down Expand Up @@ -75,15 +76,15 @@ public function testMainFlowWithFakeAdapter(): void
$this->createConnection(),
new QueueSettings(),
),
new MessageSerializer(),
new JsonMessageSerializer(),
new SignalLoop(),
);
$queue = $this->makeQueue($adapter);

$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage("This middleware works only with the $adapterClass. $fakeAdapterClass given.");
$queue->push(
new Message('simple', 'test-delay-middleware-main'),
new Message(SimpleMessageHandler::class, 'test-delay-middleware-main'),
new DelayMiddleware(3),
);
}
Expand Down
6 changes: 3 additions & 3 deletions tests/Support/FakeAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
namespace Yiisoft\Queue\AMQP\Tests\Support;

use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\MessageSerializerInterface;
use Yiisoft\Queue\AMQP\QueueProviderInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

final class FakeAdapter implements AdapterInterface
{
Expand All @@ -25,12 +25,12 @@ public function runExisting(callable $handlerCallback): void
// TODO: Implement runExisting() method.
}

public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
// TODO: Implement status() method.
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
// TODO: Implement push() method.
}
Expand Down
Loading