From 2cc06c1cfbf2fe19c5c0756638ff73806ce06529 Mon Sep 17 00:00:00 2001 From: Weyoss Date: Sat, 4 May 2024 10:28:56 +0200 Subject: [PATCH] feat: use more granular error classes for reporting errors --- src/common/redis-keys/errors/index.ts | 11 ++ .../errors/redis-keys-invalid-key.error.ts | 12 +++ .../{ => errors}/redis-keys.error.ts | 0 src/common/redis-keys/redis-keys.ts | 20 ++-- .../configuration-message-queue-size.error.ts | 12 +++ ...onfiguration-message-store-expire.error.ts | 12 +++ .../errors/configuration-namespace.error.ts | 12 +++ .../{ => errors}/configuration.error.ts | 0 src/config/errors/index.ts | 13 +++ src/config/index.ts | 1 + src/config/messages/store.ts | 7 +- src/config/namespace.ts | 3 +- .../_/_delete-consumer-group.ts | 16 ++- .../consumer-groups/_/_save-consumer-group.ts | 3 +- ...r-groups-consumer-group-not-empty.error.ts | 12 +++ .../consumer-groups-invalid-group-id.error.ts | 12 +++ .../consumer-groups-queue-not-found.error.ts | 12 +++ .../errors/consumer-groups.error.ts} | 2 +- src/lib/consumer-groups/errors/index.ts | 13 +++ src/lib/consumer-groups/index.ts | 1 + ...me-message-handler-already-exists.error.ts | 12 +++ ...r-consumer-group-id-not-supported.error.ts | 12 +++ ...sumer-consumer-group-id-required.error.ts} | 2 +- .../consumer-group-id-not-supported.error.ts | 18 ---- .../consumer-group-id-required.error.ts | 18 ---- ...er-message-handler-already-exists.error.ts | 21 ---- src/lib/consumer/errors/index.ts | 7 +- .../message-handler-runner.ts | 4 +- .../dequeue-message/dequeue-message.ts | 12 +-- .../consumer-message-handler-file.error.ts | 8 +- ...essage-handler-filename-extension.error.ts | 8 +- .../errors/consumer-message-handler.error.ts | 6 +- .../errors/event-bus-instance-lock.error.ts | 12 +++ src/lib/event-bus/errors/index.ts | 10 ++ src/lib/event-bus/event-bus-redis-instance.ts | 4 +- src/lib/event-bus/index.ts | 1 + ...fan-out-exchange-has-bound-queues.error.ts | 12 +++ .../exchange-invalid-fan-out-params.error.ts | 12 +++ ...=> exchange-invalid-queue-params.error.ts} | 2 +- .../exchange-invalid-topic-params.error.ts | 12 +++ ...ge-queue-is-not-bound-to-exchange.error.ts | 12 +++ src/lib/exchange/errors/index.ts | 6 +- .../_/_get-exchange-direct-transferable.ts | 7 +- .../_/_validate-exchange-direct-params.ts | 6 +- .../exchange-direct/exchange-direct.ts | 3 +- .../_/_get-exchange-fanout-transferable.ts | 7 +- .../_/_validate-exchange-fan-out-params.ts | 7 +- .../exchange-fan-out/exchange-fan-out.ts | 40 ++++--- .../_/_get-exchange-topic-transferable.ts | 7 +- .../_/_get-topic-exchange-params.ts | 8 +- .../_/_validate-exchange-topic-params.ts | 5 +- src/lib/message/_/_delete-message.ts | 23 +++- src/lib/message/_/_get-message-state.ts | 4 +- src/lib/message/_/_get-message-status.ts | 4 +- src/lib/message/_/_get-message.ts | 6 +- src/lib/message/_/_requeue-message.ts | 96 +++++++++++++++++ src/lib/message/errors/index.ts | 9 +- ...age-destination-queue-already-set.error.ts | 6 +- ...essage-destination-queue-required.error.ts | 6 +- ...ts => message-invalid-parameters.error.ts} | 6 +- ...essage-message-exchange-required.error.ts} | 6 +- .../message-message-in-process.error.ts | 12 +++ ....ts => message-message-not-found.error.ts} | 6 +- .../message-message-not-requeuable.error.ts | 12 +++ .../errors/message-message-property.error.ts | 12 +++ src/lib/message/message-envelope.ts | 4 +- src/lib/message/message.ts | 9 ++ src/lib/message/producible-message.ts | 40 +++---- src/lib/namespace/namespace.ts | 2 +- src/lib/producer/_/_schedule-message.ts | 19 ++-- src/lib/producer/errors/index.ts | 10 +- ...roducer-exchange-no-matched-queue.error.ts | 12 +++ .../producer-instance-not-running.error.ts | 8 +- ...roducer-message-priority-required.error.ts | 12 +++ ...ucer-priority-queuing-not-enabled.error.ts | 12 +++ ...er-queue-missing-consumer-groups.error.ts} | 2 +- ...r.ts => producer-queue-not-found.error.ts} | 2 +- ...ducer-schedule-invalid-parameters.error.ts | 12 +++ ...s => producer-unknown-queue-type.error.ts} | 2 +- src/lib/producer/producer.ts | 37 ++++--- src/lib/queue-messages/_/_requeue-message.ts | 101 ------------------ .../_/_validate-queue-extended-params.ts | 8 +- src/lib/queue-messages/errors/index.ts | 3 +- .../errors/message-requeue.error.ts | 16 --- ...ssage.error.ts => queue-messages.error.ts} | 2 +- .../queue-acknowledged-messages.ts | 29 +---- .../queue-dead-lettered-messages.ts | 29 +---- src/lib/queue-messages/types/index.ts | 8 -- src/lib/queue-rate-limit/errors/index.ts | 2 + ...queue-rate-limit-invalid-interval.error.ts | 12 +++ .../queue-rate-limit-invalid-limit.error.ts | 12 +++ src/lib/queue-rate-limit/queue-rate-limit.ts | 17 ++- src/lib/queue/_/_delete-queue.ts | 12 +-- src/lib/queue/_/_get-queue-properties.ts | 5 +- .../queue/_/_parse-queue-extended-params.ts | 13 ++- src/lib/queue/_/_parse-queue-params.ts | 11 +- src/lib/queue/errors/index.ts | 9 +- .../queue-has-running-consumers.error.ts | 18 ---- .../queue-invalid-queue-parameter.error.ts | 12 +++ ...s.error.ts => queue-queue-exists.error.ts} | 2 +- ...queue-queue-has-running-consumers.error.ts | 12 +++ ...rror.ts => queue-queue-not-empty.error.ts} | 2 +- ...rror.ts => queue-queue-not-found.error.ts} | 2 +- src/lib/queue/queue.ts | 4 +- .../consuming-messages/test00015.test.ts | 13 +-- .../consuming-messages/test00036.test.ts | 13 ++- .../tests/deleting-messages/test00003.test.ts | 4 +- .../tests/deleting-messages/test00004.test.ts | 4 +- .../tests/deleting-messages/test00005.test.ts | 9 +- .../direct-exchange/test00001.test.ts | 14 ++- .../fanout-exchange/test00006.test.ts | 4 +- .../fanout-exchange/test00007.test.ts | 11 +- .../topic-exchange/test00001.test.ts | 14 ++- .../topic-exchange/test00007.test.ts | 4 +- tests/tests/misc/test00003.test.ts | 2 +- tests/tests/misc/test00004.test.ts | 4 +- tests/tests/misc/test00019.test.ts | 6 +- tests/tests/misc/test00020.test.ts | 20 ++-- tests/tests/purging-queues/test00006.test.ts | 14 +-- tests/tests/purging-queues/test00008.test.ts | 18 ++-- .../queue-consumer-groups/test00002.test.ts | 4 +- .../queue-consumer-groups/test00006.test.ts | 4 +- .../queue-consumer-groups/test00007.test.ts | 7 +- .../tests/queue-rate-limit/test00028.test.ts | 8 +- .../test00010.test.ts | 13 ++- .../requeuing-messages/test00001.test.ts | 14 +-- .../requeuing-messages/test00002.test.ts | 14 +-- .../requeuing-messages/test00003.test.ts | 18 ++-- 128 files changed, 843 insertions(+), 600 deletions(-) create mode 100644 src/common/redis-keys/errors/index.ts create mode 100644 src/common/redis-keys/errors/redis-keys-invalid-key.error.ts rename src/common/redis-keys/{ => errors}/redis-keys.error.ts (100%) create mode 100644 src/config/errors/configuration-message-queue-size.error.ts create mode 100644 src/config/errors/configuration-message-store-expire.error.ts create mode 100644 src/config/errors/configuration-namespace.error.ts rename src/config/{ => errors}/configuration.error.ts (100%) create mode 100644 src/config/errors/index.ts create mode 100644 src/lib/consumer-groups/errors/consumer-groups-consumer-group-not-empty.error.ts create mode 100644 src/lib/consumer-groups/errors/consumer-groups-invalid-group-id.error.ts create mode 100644 src/lib/consumer-groups/errors/consumer-groups-queue-not-found.error.ts rename src/lib/{event-bus/errors/event-bus-lock.error.ts => consumer-groups/errors/consumer-groups.error.ts} (82%) create mode 100644 src/lib/consumer-groups/errors/index.ts create mode 100644 src/lib/consumer/errors/consumer-consume-message-handler-already-exists.error.ts create mode 100644 src/lib/consumer/errors/consumer-consumer-group-id-not-supported.error.ts rename src/lib/consumer/errors/{consumer-group-delete.error.ts => consumer-consumer-group-id-required.error.ts} (78%) delete mode 100644 src/lib/consumer/errors/consumer-group-id-not-supported.error.ts delete mode 100644 src/lib/consumer/errors/consumer-group-id-required.error.ts delete mode 100644 src/lib/consumer/errors/consumer-message-handler-already-exists.error.ts create mode 100644 src/lib/event-bus/errors/event-bus-instance-lock.error.ts create mode 100644 src/lib/event-bus/errors/index.ts create mode 100644 src/lib/exchange/errors/exchange-fan-out-exchange-has-bound-queues.error.ts create mode 100644 src/lib/exchange/errors/exchange-invalid-fan-out-params.error.ts rename src/lib/exchange/errors/{exchange-invalid-data.error.ts => exchange-invalid-queue-params.error.ts} (79%) create mode 100644 src/lib/exchange/errors/exchange-invalid-topic-params.error.ts create mode 100644 src/lib/exchange/errors/exchange-queue-is-not-bound-to-exchange.error.ts create mode 100644 src/lib/message/_/_requeue-message.ts rename src/lib/message/errors/{message-not-found.error.ts => message-invalid-parameters.error.ts} (67%) rename src/lib/message/errors/{message-delete.error.ts => message-message-exchange-required.error.ts} (66%) create mode 100644 src/lib/message/errors/message-message-in-process.error.ts rename src/lib/message/errors/{message-exchange-required.error.ts => message-message-not-found.error.ts} (66%) create mode 100644 src/lib/message/errors/message-message-not-requeuable.error.ts create mode 100644 src/lib/message/errors/message-message-property.error.ts create mode 100644 src/lib/producer/errors/producer-exchange-no-matched-queue.error.ts create mode 100644 src/lib/producer/errors/producer-message-priority-required.error.ts create mode 100644 src/lib/producer/errors/producer-priority-queuing-not-enabled.error.ts rename src/lib/producer/errors/{producer-queue-without-consumer-groups.error.ts => producer-queue-missing-consumer-groups.error.ts} (81%) rename src/lib/producer/errors/{producer-message-not-scheduled.error.ts => producer-queue-not-found.error.ts} (79%) create mode 100644 src/lib/producer/errors/producer-schedule-invalid-parameters.error.ts rename src/lib/producer/errors/{producer-message-not-published.error.ts => producer-unknown-queue-type.error.ts} (79%) delete mode 100644 src/lib/queue-messages/_/_requeue-message.ts delete mode 100644 src/lib/queue-messages/errors/message-requeue.error.ts rename src/lib/queue-messages/errors/{queue-message.error.ts => queue-messages.error.ts} (82%) create mode 100644 src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-interval.error.ts create mode 100644 src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-limit.error.ts delete mode 100644 src/lib/queue/errors/queue-has-running-consumers.error.ts create mode 100644 src/lib/queue/errors/queue-invalid-queue-parameter.error.ts rename src/lib/queue/errors/{queue-exists.error.ts => queue-queue-exists.error.ts} (82%) create mode 100644 src/lib/queue/errors/queue-queue-has-running-consumers.error.ts rename src/lib/queue/errors/{queue-not-empty.error.ts => queue-queue-not-empty.error.ts} (81%) rename src/lib/queue/errors/{queue-not-found.error.ts => queue-queue-not-found.error.ts} (81%) diff --git a/src/common/redis-keys/errors/index.ts b/src/common/redis-keys/errors/index.ts new file mode 100644 index 00000000..c0f2eb1d --- /dev/null +++ b/src/common/redis-keys/errors/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export { RedisKeysInvalidKeyError } from './redis-keys-invalid-key.error.js'; +export { RedisKeysError } from './redis-keys.error.js'; diff --git a/src/common/redis-keys/errors/redis-keys-invalid-key.error.ts b/src/common/redis-keys/errors/redis-keys-invalid-key.error.ts new file mode 100644 index 00000000..247e428e --- /dev/null +++ b/src/common/redis-keys/errors/redis-keys-invalid-key.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { RedisKeysError } from './redis-keys.error.js'; + +export class RedisKeysInvalidKeyError extends RedisKeysError {} diff --git a/src/common/redis-keys/redis-keys.error.ts b/src/common/redis-keys/errors/redis-keys.error.ts similarity index 100% rename from src/common/redis-keys/redis-keys.error.ts rename to src/common/redis-keys/errors/redis-keys.error.ts diff --git a/src/common/redis-keys/redis-keys.ts b/src/common/redis-keys/redis-keys.ts index 979db0c8..d1029c6b 100644 --- a/src/common/redis-keys/redis-keys.ts +++ b/src/common/redis-keys/redis-keys.ts @@ -8,7 +8,7 @@ */ import { IQueueParams } from '../../lib/index.js'; -import { RedisKeysError } from './redis-keys.error.js'; +import { RedisKeysInvalidKeyError } from './errors/index.js'; // Key segments separator const keySegmentSeparator = ':'; @@ -156,21 +156,19 @@ export const redisKeys = { return makeNamespacedKeys(mainKeys, globalNamespace); }, - validateNamespace(ns: string): string | RedisKeysError { + validateNamespace(ns: string): string | RedisKeysInvalidKeyError { const validated = this.validateRedisKey(ns); if (validated === globalNamespace) { - return new RedisKeysError( - `Namespace [${validated}] is reserved. Use another one.`, - ); + return new RedisKeysInvalidKeyError(); } return validated; }, - validateRedisKey(key: string | null | undefined): string | RedisKeysError { + validateRedisKey( + key: string | null | undefined, + ): string | RedisKeysInvalidKeyError { if (!key || !key.length) { - return new RedisKeysError( - 'Invalid Redis key. Expected be a non empty string.', - ); + return new RedisKeysInvalidKeyError(); } const lowerCase = key.toLowerCase(); const filtered = lowerCase.replace( @@ -178,9 +176,7 @@ export const redisKeys = { '', ); if (filtered.length) { - return new RedisKeysError( - 'Invalid Redis key. Valid characters are letters (a-z) and numbers (0-9). (-_) are allowed between alphanumerics. Use a dot (.) to denote hierarchies.', - ); + return new RedisKeysInvalidKeyError(); } return lowerCase; }, diff --git a/src/config/errors/configuration-message-queue-size.error.ts b/src/config/errors/configuration-message-queue-size.error.ts new file mode 100644 index 00000000..a18806e8 --- /dev/null +++ b/src/config/errors/configuration-message-queue-size.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ConfigurationError } from './configuration.error.js'; + +export class ConfigurationMessageQueueSizeError extends ConfigurationError {} diff --git a/src/config/errors/configuration-message-store-expire.error.ts b/src/config/errors/configuration-message-store-expire.error.ts new file mode 100644 index 00000000..1597459c --- /dev/null +++ b/src/config/errors/configuration-message-store-expire.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ConfigurationError } from './configuration.error.js'; + +export class ConfigurationMessageStoreExpireError extends ConfigurationError {} diff --git a/src/config/errors/configuration-namespace.error.ts b/src/config/errors/configuration-namespace.error.ts new file mode 100644 index 00000000..97c135ef --- /dev/null +++ b/src/config/errors/configuration-namespace.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ConfigurationError } from './configuration.error.js'; + +export class ConfigurationNamespaceError extends ConfigurationError {} diff --git a/src/config/configuration.error.ts b/src/config/errors/configuration.error.ts similarity index 100% rename from src/config/configuration.error.ts rename to src/config/errors/configuration.error.ts diff --git a/src/config/errors/index.ts b/src/config/errors/index.ts new file mode 100644 index 00000000..04382271 --- /dev/null +++ b/src/config/errors/index.ts @@ -0,0 +1,13 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export { ConfigurationError } from './configuration.error.js'; +export { ConfigurationMessageStoreExpireError } from './configuration-message-store-expire.error.js'; +export { ConfigurationMessageQueueSizeError } from './configuration-message-queue-size.error.js'; +export { ConfigurationNamespaceError } from './configuration-namespace.error.js'; diff --git a/src/config/index.ts b/src/config/index.ts index c0b613e7..dc9343da 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -7,5 +7,6 @@ * in the root directory of this source tree. */ +export * from './errors/index.js'; export * from './types/index.js'; export * from './configuration.js'; diff --git a/src/config/messages/store.ts b/src/config/messages/store.ts index dacc8312..ad2920da 100644 --- a/src/config/messages/store.ts +++ b/src/config/messages/store.ts @@ -13,7 +13,8 @@ import { IMessagesConfigStorageOptionsRequired, IMessagesConfigStorageRequired, } from '../../lib/index.js'; -import { ConfigurationError } from '../configuration.error.js'; +import { ConfigurationMessageQueueSizeError } from '../errors/configuration-message-queue-size.error.js'; +import { ConfigurationMessageStoreExpireError } from '../errors/configuration-message-store-expire.error.js'; import { IRedisSMQConfig } from '../types/index.js'; function getMessageStorageConfig( @@ -43,11 +44,11 @@ function getMessageStorageParams( } const queueSize = Number(params.queueSize ?? 0); if (isNaN(queueSize) || queueSize < 0) { - throw new ConfigurationError(`Parameter [queueSize] should be >= 0`); + throw new ConfigurationMessageQueueSizeError(); } const expire = Number(params.expire ?? 0); if (isNaN(expire) || expire < 0) { - throw new ConfigurationError(`Parameter [expire] should be >= 0`); + throw new ConfigurationMessageStoreExpireError(); } return { store: true, diff --git a/src/config/namespace.ts b/src/config/namespace.ts index dced29c5..583ca2db 100644 --- a/src/config/namespace.ts +++ b/src/config/namespace.ts @@ -8,6 +8,7 @@ */ import { redisKeys } from '../common/redis-keys/redis-keys.js'; +import { ConfigurationNamespaceError } from './errors/configuration-namespace.error.js'; import { IRedisSMQConfig } from './types/index.js'; const defaultNamespace = 'default'; @@ -15,6 +16,6 @@ const defaultNamespace = 'default'; export default function Namespace(userConfig: IRedisSMQConfig): string { if (!userConfig.namespace) return defaultNamespace; const ns = redisKeys.validateNamespace(userConfig.namespace); - if (ns instanceof Error) throw ns; + if (ns instanceof Error) throw new ConfigurationNamespaceError(); return ns; } diff --git a/src/lib/consumer-groups/_/_delete-consumer-group.ts b/src/lib/consumer-groups/_/_delete-consumer-group.ts index 1a838dd3..e762ae73 100644 --- a/src/lib/consumer-groups/_/_delete-consumer-group.ts +++ b/src/lib/consumer-groups/_/_delete-consumer-group.ts @@ -11,8 +11,10 @@ import { async, ICallback, IEventBus, IRedisClient } from 'redis-smq-common'; import { TRedisSMQEvent } from '../../../common/index.js'; import { ELuaScriptName } from '../../../common/redis-client/scripts/scripts.js'; import { redisKeys } from '../../../common/redis-keys/redis-keys.js'; -import { ConsumerGroupDeleteError } from '../../consumer/index.js'; import { EQueueProperty, EQueueType, IQueueParams } from '../../queue/index.js'; +import { ConsumerGroupsConsumerGroupNotEmptyError } from '../errors/consumer-groups-consumer-group-not-empty.error.js'; +import { ConsumerGroupsQueueNotFoundError } from '../errors/consumer-groups-queue-not-found.error.js'; +import { ConsumerGroupsError } from '../errors/consumer-groups.error.js'; export function _deleteConsumerGroup( redisClient: IRedisClient, @@ -47,9 +49,15 @@ export function _deleteConsumerGroup( ], (err, reply) => { if (err) cb(err); - else if (reply !== 'OK') - cb(new ConsumerGroupDeleteError(String(reply))); - else { + else if (reply !== 'OK') { + if (reply === 'QUEUE_NOT_FOUND') { + cb(new ConsumerGroupsQueueNotFoundError()); + } else if (reply === 'CONSUMER_GROUP_NOT_EMPTY') { + cb(new ConsumerGroupsConsumerGroupNotEmptyError()); + } else { + cb(new ConsumerGroupsError()); + } + } else { eventBus.emit('queue.consumerGroupDeleted', queue, groupId); cb(); } diff --git a/src/lib/consumer-groups/_/_save-consumer-group.ts b/src/lib/consumer-groups/_/_save-consumer-group.ts index e73e54e2..7db1474b 100644 --- a/src/lib/consumer-groups/_/_save-consumer-group.ts +++ b/src/lib/consumer-groups/_/_save-consumer-group.ts @@ -11,6 +11,7 @@ import { ICallback, IEventBus, IRedisClient } from 'redis-smq-common'; import { TRedisSMQEvent } from '../../../common/index.js'; import { redisKeys } from '../../../common/redis-keys/redis-keys.js'; import { IQueueParams } from '../../queue/index.js'; +import { ConsumerGroupsInvalidGroupIdError } from '../errors/consumer-groups-invalid-group-id.error.js'; export function _saveConsumerGroup( redisClient: IRedisClient, @@ -20,7 +21,7 @@ export function _saveConsumerGroup( cb: ICallback, ): void { const gid = redisKeys.validateRedisKey(groupId); - if (gid instanceof Error) cb(gid); + if (gid instanceof Error) cb(new ConsumerGroupsInvalidGroupIdError()); else { const { keyQueueConsumerGroups } = redisKeys.getQueueKeys(queue, gid); redisClient.sadd(keyQueueConsumerGroups, gid, (err, reply) => { diff --git a/src/lib/consumer-groups/errors/consumer-groups-consumer-group-not-empty.error.ts b/src/lib/consumer-groups/errors/consumer-groups-consumer-group-not-empty.error.ts new file mode 100644 index 00000000..85226ce1 --- /dev/null +++ b/src/lib/consumer-groups/errors/consumer-groups-consumer-group-not-empty.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ConsumerGroupsError } from './consumer-groups.error.js'; + +export class ConsumerGroupsConsumerGroupNotEmptyError extends ConsumerGroupsError {} diff --git a/src/lib/consumer-groups/errors/consumer-groups-invalid-group-id.error.ts b/src/lib/consumer-groups/errors/consumer-groups-invalid-group-id.error.ts new file mode 100644 index 00000000..9078fd63 --- /dev/null +++ b/src/lib/consumer-groups/errors/consumer-groups-invalid-group-id.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ConsumerGroupsError } from './consumer-groups.error.js'; + +export class ConsumerGroupsInvalidGroupIdError extends ConsumerGroupsError {} diff --git a/src/lib/consumer-groups/errors/consumer-groups-queue-not-found.error.ts b/src/lib/consumer-groups/errors/consumer-groups-queue-not-found.error.ts new file mode 100644 index 00000000..814ce642 --- /dev/null +++ b/src/lib/consumer-groups/errors/consumer-groups-queue-not-found.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ConsumerGroupsError } from './consumer-groups.error.js'; + +export class ConsumerGroupsQueueNotFoundError extends ConsumerGroupsError {} diff --git a/src/lib/event-bus/errors/event-bus-lock.error.ts b/src/lib/consumer-groups/errors/consumer-groups.error.ts similarity index 82% rename from src/lib/event-bus/errors/event-bus-lock.error.ts rename to src/lib/consumer-groups/errors/consumer-groups.error.ts index ea382ae4..4654546c 100644 --- a/src/lib/event-bus/errors/event-bus-lock.error.ts +++ b/src/lib/consumer-groups/errors/consumer-groups.error.ts @@ -9,4 +9,4 @@ import { RedisSMQError } from 'redis-smq-common'; -export class EventBusLockError extends RedisSMQError {} +export class ConsumerGroupsError extends RedisSMQError {} diff --git a/src/lib/consumer-groups/errors/index.ts b/src/lib/consumer-groups/errors/index.ts new file mode 100644 index 00000000..538b67ca --- /dev/null +++ b/src/lib/consumer-groups/errors/index.ts @@ -0,0 +1,13 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export { ConsumerGroupsError } from './consumer-groups.error.js'; +export { ConsumerGroupsQueueNotFoundError } from './consumer-groups-queue-not-found.error.js'; +export { ConsumerGroupsConsumerGroupNotEmptyError } from './consumer-groups-consumer-group-not-empty.error.js'; +export { ConsumerGroupsInvalidGroupIdError } from './consumer-groups-invalid-group-id.error.js'; diff --git a/src/lib/consumer-groups/index.ts b/src/lib/consumer-groups/index.ts index 4763e9c0..08c9cb1a 100644 --- a/src/lib/consumer-groups/index.ts +++ b/src/lib/consumer-groups/index.ts @@ -8,3 +8,4 @@ */ export * from './consumer-groups.js'; +export * from './errors/index.js'; diff --git a/src/lib/consumer/errors/consumer-consume-message-handler-already-exists.error.ts b/src/lib/consumer/errors/consumer-consume-message-handler-already-exists.error.ts new file mode 100644 index 00000000..b5055117 --- /dev/null +++ b/src/lib/consumer/errors/consumer-consume-message-handler-already-exists.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ConsumerError } from './consumer.error.js'; + +export class ConsumerConsumeMessageHandlerAlreadyExistsError extends ConsumerError {} diff --git a/src/lib/consumer/errors/consumer-consumer-group-id-not-supported.error.ts b/src/lib/consumer/errors/consumer-consumer-group-id-not-supported.error.ts new file mode 100644 index 00000000..3dd0156b --- /dev/null +++ b/src/lib/consumer/errors/consumer-consumer-group-id-not-supported.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ConsumerError } from './consumer.error.js'; + +export class ConsumerConsumerGroupIdNotSupportedError extends ConsumerError {} diff --git a/src/lib/consumer/errors/consumer-group-delete.error.ts b/src/lib/consumer/errors/consumer-consumer-group-id-required.error.ts similarity index 78% rename from src/lib/consumer/errors/consumer-group-delete.error.ts rename to src/lib/consumer/errors/consumer-consumer-group-id-required.error.ts index 31b130e4..9ee2c3ae 100644 --- a/src/lib/consumer/errors/consumer-group-delete.error.ts +++ b/src/lib/consumer/errors/consumer-consumer-group-id-required.error.ts @@ -9,4 +9,4 @@ import { ConsumerError } from './consumer.error.js'; -export class ConsumerGroupDeleteError extends ConsumerError {} +export class ConsumerConsumerGroupIdRequiredError extends ConsumerError {} diff --git a/src/lib/consumer/errors/consumer-group-id-not-supported.error.ts b/src/lib/consumer/errors/consumer-group-id-not-supported.error.ts deleted file mode 100644 index 1b012134..00000000 --- a/src/lib/consumer/errors/consumer-group-id-not-supported.error.ts +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { ConsumerError } from './consumer.error.js'; - -export class ConsumerGroupIdNotSupportedError extends ConsumerError { - constructor() { - super( - `Consumer groups are only supported for queues of a PubSub delivery model.`, - ); - } -} diff --git a/src/lib/consumer/errors/consumer-group-id-required.error.ts b/src/lib/consumer/errors/consumer-group-id-required.error.ts deleted file mode 100644 index f05223f0..00000000 --- a/src/lib/consumer/errors/consumer-group-id-required.error.ts +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { ConsumerError } from './consumer.error.js'; - -export class ConsumerGroupIdRequiredError extends ConsumerError { - constructor() { - super( - `A Consumer group is required for queues of a PubSub delivery model.`, - ); - } -} diff --git a/src/lib/consumer/errors/consumer-message-handler-already-exists.error.ts b/src/lib/consumer/errors/consumer-message-handler-already-exists.error.ts deleted file mode 100644 index bd86a41c..00000000 --- a/src/lib/consumer/errors/consumer-message-handler-already-exists.error.ts +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { IQueueParsedParams } from '../../queue/index.js'; -import { ConsumerError } from './consumer.error.js'; - -export class ConsumerMessageHandlerAlreadyExistsError extends ConsumerError { - constructor(queue: IQueueParsedParams) { - super( - `A message handler for queue [${queue.queueParams.name}@${ - queue.queueParams.ns - }${queue.groupId ? `/${queue.groupId}` : ''}] already exists`, - ); - } -} diff --git a/src/lib/consumer/errors/index.ts b/src/lib/consumer/errors/index.ts index ad7b9867..15cd7171 100644 --- a/src/lib/consumer/errors/index.ts +++ b/src/lib/consumer/errors/index.ts @@ -8,7 +8,6 @@ */ export { ConsumerError } from './consumer.error.js'; -export { ConsumerMessageHandlerAlreadyExistsError } from './consumer-message-handler-already-exists.error.js'; -export { ConsumerGroupDeleteError } from './consumer-group-delete.error.js'; -export { ConsumerGroupIdNotSupportedError } from './consumer-group-id-not-supported.error.js'; -export { ConsumerGroupIdRequiredError } from './consumer-group-id-required.error.js'; +export { ConsumerConsumeMessageHandlerAlreadyExistsError } from './consumer-consume-message-handler-already-exists.error.js'; +export { ConsumerConsumerGroupIdNotSupportedError } from './consumer-consumer-group-id-not-supported.error.js'; +export { ConsumerConsumerGroupIdRequiredError } from './consumer-consumer-group-id-required.error.js'; diff --git a/src/lib/consumer/message-handler-runner/message-handler-runner.ts b/src/lib/consumer/message-handler-runner/message-handler-runner.ts index d6ee0397..360ff5d3 100644 --- a/src/lib/consumer/message-handler-runner/message-handler-runner.ts +++ b/src/lib/consumer/message-handler-runner/message-handler-runner.ts @@ -12,7 +12,7 @@ import { TConsumerMessageHandlerRunnerEvent } from '../../../common/index.js'; import { Configuration } from '../../../config/index.js'; import { IQueueParsedParams } from '../../queue/index.js'; import { Consumer } from '../consumer/consumer.js'; -import { ConsumerMessageHandlerAlreadyExistsError } from '../errors/index.js'; +import { ConsumerConsumeMessageHandlerAlreadyExistsError } from '../errors/index.js'; import { MessageHandler } from '../message-handler/message-handler/message-handler.js'; import { IConsumerMessageHandlerArgs, @@ -198,7 +198,7 @@ export class MessageHandlerRunner extends Runnable, ): void { const handler = this.getMessageHandler(queue); - if (handler) cb(new ConsumerMessageHandlerAlreadyExistsError(queue)); + if (handler) cb(new ConsumerConsumeMessageHandlerAlreadyExistsError()); else { const handlerParams = { queue, diff --git a/src/lib/consumer/message-handler/dequeue-message/dequeue-message.ts b/src/lib/consumer/message-handler/dequeue-message/dequeue-message.ts index 21af42c9..c89a5af2 100644 --- a/src/lib/consumer/message-handler/dequeue-message/dequeue-message.ts +++ b/src/lib/consumer/message-handler/dequeue-message/dequeue-message.ts @@ -32,12 +32,12 @@ import { EQueueType, IQueueParsedParams, IQueueRateLimit, - QueueNotFoundError, + QueueQueueNotFoundError, TQueueConsumer, } from '../../../queue/index.js'; import { - ConsumerGroupIdNotSupportedError, - ConsumerGroupIdRequiredError, + ConsumerConsumerGroupIdNotSupportedError, + ConsumerConsumerGroupIdRequiredError, } from '../../errors/index.js'; import { eventBusPublisher } from './event-bus-publisher.js'; @@ -172,7 +172,7 @@ export class DequeueMessage extends Runnable { ], (err, reply) => { if (err) cb(err); - else if (!reply) cb(new QueueNotFoundError()); + else if (!reply) cb(new QueueQueueNotFoundError()); else cb(); }, ); @@ -198,14 +198,14 @@ export class DequeueMessage extends Runnable { queueProperties.deliveryModel === EQueueDeliveryModel.POINT_TO_POINT ) { - if (groupId) cb(new ConsumerGroupIdNotSupportedError()); + if (groupId) cb(new ConsumerConsumerGroupIdNotSupportedError()); else cb(); } // PubSub delivery model else if ( queueProperties.deliveryModel === EQueueDeliveryModel.PUB_SUB ) { - if (!groupId) cb(new ConsumerGroupIdRequiredError()); + if (!groupId) cb(new ConsumerConsumerGroupIdRequiredError()); else { const eventBus = this.eventBus.getInstance(); if (eventBus instanceof Error) cb(eventBus); diff --git a/src/lib/consumer/message-handler/errors/consumer-message-handler-file.error.ts b/src/lib/consumer/message-handler/errors/consumer-message-handler-file.error.ts index 99ec629d..977b7fdd 100644 --- a/src/lib/consumer/message-handler/errors/consumer-message-handler-file.error.ts +++ b/src/lib/consumer/message-handler/errors/consumer-message-handler-file.error.ts @@ -9,10 +9,4 @@ import { ConsumerMessageHandlerError } from './consumer-message-handler.error.js'; -export class ConsumerMessageHandlerFileError extends ConsumerMessageHandlerError { - constructor() { - super( - `Make sure the message handler filename is an absolute file path pointing to an existing file in your project. `, - ); - } -} +export class ConsumerMessageHandlerFileError extends ConsumerMessageHandlerError {} diff --git a/src/lib/consumer/message-handler/errors/consumer-message-handler-filename-extension.error.ts b/src/lib/consumer/message-handler/errors/consumer-message-handler-filename-extension.error.ts index 40b1bbfb..2821b60b 100644 --- a/src/lib/consumer/message-handler/errors/consumer-message-handler-filename-extension.error.ts +++ b/src/lib/consumer/message-handler/errors/consumer-message-handler-filename-extension.error.ts @@ -9,10 +9,4 @@ import { ConsumerMessageHandlerError } from './consumer-message-handler.error.js'; -export class ConsumerMessageHandlerFilenameExtensionError extends ConsumerMessageHandlerError { - constructor() { - super( - `Message handler filename must ends with a '.js' or '.cjs' extension depending on your project settings.`, - ); - } -} +export class ConsumerMessageHandlerFilenameExtensionError extends ConsumerMessageHandlerError {} diff --git a/src/lib/consumer/message-handler/errors/consumer-message-handler.error.ts b/src/lib/consumer/message-handler/errors/consumer-message-handler.error.ts index ae4fcc93..8daa0e42 100644 --- a/src/lib/consumer/message-handler/errors/consumer-message-handler.error.ts +++ b/src/lib/consumer/message-handler/errors/consumer-message-handler.error.ts @@ -9,8 +9,4 @@ import { ConsumerError } from '../../errors/index.js'; -export class ConsumerMessageHandlerError extends ConsumerError { - constructor(msg?: string) { - super(msg); - } -} +export class ConsumerMessageHandlerError extends ConsumerError {} diff --git a/src/lib/event-bus/errors/event-bus-instance-lock.error.ts b/src/lib/event-bus/errors/event-bus-instance-lock.error.ts new file mode 100644 index 00000000..f754b8a2 --- /dev/null +++ b/src/lib/event-bus/errors/event-bus-instance-lock.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { EventBusError } from 'redis-smq-common'; + +export class EventBusInstanceLockError extends EventBusError {} diff --git a/src/lib/event-bus/errors/index.ts b/src/lib/event-bus/errors/index.ts new file mode 100644 index 00000000..b2d961b8 --- /dev/null +++ b/src/lib/event-bus/errors/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export { EventBusInstanceLockError } from './event-bus-instance-lock.error.js'; diff --git a/src/lib/event-bus/event-bus-redis-instance.ts b/src/lib/event-bus/event-bus-redis-instance.ts index b8b941c2..085fcd9d 100644 --- a/src/lib/event-bus/event-bus-redis-instance.ts +++ b/src/lib/event-bus/event-bus-redis-instance.ts @@ -18,7 +18,7 @@ import { } from 'redis-smq-common'; import { TRedisSMQEvent } from '../../common/index.js'; import { Configuration } from '../../config/index.js'; -import { EventBusLockError } from './errors/event-bus-lock.error.js'; +import { EventBusInstanceLockError } from './errors/event-bus-instance-lock.error.js'; export class EventBusRedisInstance extends EventEmitter< Pick @@ -59,7 +59,7 @@ export class EventBusRedisInstance extends EventEmitter< }, ); } else cb(null, this.instance); - } else cb(new EventBusLockError()); + } else cb(new EventBusInstanceLockError()); } shutdown = (cb: ICallback): void => { diff --git a/src/lib/event-bus/index.ts b/src/lib/event-bus/index.ts index 7885800f..19e07916 100644 --- a/src/lib/event-bus/index.ts +++ b/src/lib/event-bus/index.ts @@ -7,5 +7,6 @@ * in the root directory of this source tree. */ +export * from './errors/index.js'; export * from './event-bus-redis-instance.js'; export * from './types/index.js'; diff --git a/src/lib/exchange/errors/exchange-fan-out-exchange-has-bound-queues.error.ts b/src/lib/exchange/errors/exchange-fan-out-exchange-has-bound-queues.error.ts new file mode 100644 index 00000000..765c489b --- /dev/null +++ b/src/lib/exchange/errors/exchange-fan-out-exchange-has-bound-queues.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ExchangeError } from './exchange.error.js'; + +export class ExchangeFanOutExchangeHasBoundQueuesError extends ExchangeError {} diff --git a/src/lib/exchange/errors/exchange-invalid-fan-out-params.error.ts b/src/lib/exchange/errors/exchange-invalid-fan-out-params.error.ts new file mode 100644 index 00000000..1a15dacb --- /dev/null +++ b/src/lib/exchange/errors/exchange-invalid-fan-out-params.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ExchangeError } from './exchange.error.js'; + +export class ExchangeInvalidFanOutParamsError extends ExchangeError {} diff --git a/src/lib/exchange/errors/exchange-invalid-data.error.ts b/src/lib/exchange/errors/exchange-invalid-queue-params.error.ts similarity index 79% rename from src/lib/exchange/errors/exchange-invalid-data.error.ts rename to src/lib/exchange/errors/exchange-invalid-queue-params.error.ts index a3b0a761..e248658a 100644 --- a/src/lib/exchange/errors/exchange-invalid-data.error.ts +++ b/src/lib/exchange/errors/exchange-invalid-queue-params.error.ts @@ -9,4 +9,4 @@ import { ExchangeError } from './exchange.error.js'; -export class ExchangeInvalidDataError extends ExchangeError {} +export class ExchangeInvalidQueueParamsError extends ExchangeError {} diff --git a/src/lib/exchange/errors/exchange-invalid-topic-params.error.ts b/src/lib/exchange/errors/exchange-invalid-topic-params.error.ts new file mode 100644 index 00000000..773b2c6b --- /dev/null +++ b/src/lib/exchange/errors/exchange-invalid-topic-params.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ExchangeError } from './exchange.error.js'; + +export class ExchangeInvalidTopicParamsError extends ExchangeError {} diff --git a/src/lib/exchange/errors/exchange-queue-is-not-bound-to-exchange.error.ts b/src/lib/exchange/errors/exchange-queue-is-not-bound-to-exchange.error.ts new file mode 100644 index 00000000..1b3e5e30 --- /dev/null +++ b/src/lib/exchange/errors/exchange-queue-is-not-bound-to-exchange.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ExchangeError } from './exchange.error.js'; + +export class ExchangeQueueIsNotBoundToExchangeError extends ExchangeError {} diff --git a/src/lib/exchange/errors/index.ts b/src/lib/exchange/errors/index.ts index 6f75d398..e60269da 100644 --- a/src/lib/exchange/errors/index.ts +++ b/src/lib/exchange/errors/index.ts @@ -9,4 +9,8 @@ export { ExchangeError } from './exchange.error.js'; export { ExchangeFanOutError } from './exchange-fan-out.error.js'; -export { ExchangeInvalidDataError } from './exchange-invalid-data.error.js'; +export { ExchangeInvalidTopicParamsError } from './exchange-invalid-topic-params.error.js'; +export { ExchangeInvalidFanOutParamsError } from './exchange-invalid-fan-out-params.error.js'; +export { ExchangeInvalidQueueParamsError } from './exchange-invalid-queue-params.error.js'; +export { ExchangeFanOutExchangeHasBoundQueuesError } from './exchange-fan-out-exchange-has-bound-queues.error.js'; +export { ExchangeQueueIsNotBoundToExchangeError } from './exchange-queue-is-not-bound-to-exchange.error.js'; diff --git a/src/lib/exchange/exchange-direct/_/_get-exchange-direct-transferable.ts b/src/lib/exchange/exchange-direct/_/_get-exchange-direct-transferable.ts index 3eda8968..26863981 100644 --- a/src/lib/exchange/exchange-direct/_/_get-exchange-direct-transferable.ts +++ b/src/lib/exchange/exchange-direct/_/_get-exchange-direct-transferable.ts @@ -9,6 +9,7 @@ import { v4 } from 'uuid'; import { IQueueParams } from '../../../queue/index.js'; +import { ExchangeInvalidQueueParamsError } from '../../errors/exchange-invalid-queue-params.error.js'; import { EExchangeType, TExchangeDirectTransferable, @@ -17,9 +18,11 @@ import { _validateExchangeDirectParams } from './_validate-exchange-direct-param export function _getExchangeDirectTransferable( queue: string | IQueueParams, -): TExchangeDirectTransferable { +): TExchangeDirectTransferable | ExchangeInvalidQueueParamsError { + const params = _validateExchangeDirectParams(queue); + if (params instanceof Error) return params; return { - params: _validateExchangeDirectParams(queue), + params, type: EExchangeType.DIRECT, exchangeTag: v4(), }; diff --git a/src/lib/exchange/exchange-direct/_/_validate-exchange-direct-params.ts b/src/lib/exchange/exchange-direct/_/_validate-exchange-direct-params.ts index dc90fced..8c9241f1 100644 --- a/src/lib/exchange/exchange-direct/_/_validate-exchange-direct-params.ts +++ b/src/lib/exchange/exchange-direct/_/_validate-exchange-direct-params.ts @@ -9,11 +9,13 @@ import { _parseQueueParams } from '../../../queue/_/_parse-queue-params.js'; import { IQueueParams } from '../../../queue/index.js'; +import { ExchangeInvalidQueueParamsError } from '../../errors/exchange-invalid-queue-params.error.js'; export function _validateExchangeDirectParams( queue: string | IQueueParams, -): IQueueParams { +): IQueueParams | ExchangeInvalidQueueParamsError { const queueParams = _parseQueueParams(queue); - if (queueParams instanceof Error) throw queueParams; + if (queueParams instanceof Error) + return new ExchangeInvalidQueueParamsError(); return queueParams; } diff --git a/src/lib/exchange/exchange-direct/exchange-direct.ts b/src/lib/exchange/exchange-direct/exchange-direct.ts index 442e77a4..9430af46 100644 --- a/src/lib/exchange/exchange-direct/exchange-direct.ts +++ b/src/lib/exchange/exchange-direct/exchange-direct.ts @@ -18,6 +18,7 @@ export class ExchangeDirect extends ExchangeAbstract { cb: ICallback, ): void { const queue = _validateExchangeDirectParams(exchangeParams); - cb(null, [queue]); + if (queue instanceof Error) cb(queue); + else cb(null, [queue]); } } diff --git a/src/lib/exchange/exchange-fan-out/_/_get-exchange-fanout-transferable.ts b/src/lib/exchange/exchange-fan-out/_/_get-exchange-fanout-transferable.ts index aca67e91..0155224d 100644 --- a/src/lib/exchange/exchange-fan-out/_/_get-exchange-fanout-transferable.ts +++ b/src/lib/exchange/exchange-fan-out/_/_get-exchange-fanout-transferable.ts @@ -8,6 +8,7 @@ */ import { v4 } from 'uuid'; +import { ExchangeInvalidFanOutParamsError } from '../../errors/exchange-invalid-fan-out-params.error.js'; import { EExchangeType, TExchangeFanOutTransferable, @@ -16,9 +17,11 @@ import { _validateExchangeFanOutParams } from './_validate-exchange-fan-out-para export function _getExchangeFanOutTransferable( fanOutName: string, -): TExchangeFanOutTransferable { +): TExchangeFanOutTransferable | ExchangeInvalidFanOutParamsError { + const params = _validateExchangeFanOutParams(fanOutName); + if (params instanceof Error) return params; return { - params: _validateExchangeFanOutParams(fanOutName), + params, type: EExchangeType.FANOUT, exchangeTag: v4(), }; diff --git a/src/lib/exchange/exchange-fan-out/_/_validate-exchange-fan-out-params.ts b/src/lib/exchange/exchange-fan-out/_/_validate-exchange-fan-out-params.ts index ccf9bc8e..3c7699ad 100644 --- a/src/lib/exchange/exchange-fan-out/_/_validate-exchange-fan-out-params.ts +++ b/src/lib/exchange/exchange-fan-out/_/_validate-exchange-fan-out-params.ts @@ -8,9 +8,12 @@ */ import { redisKeys } from '../../../../common/redis-keys/redis-keys.js'; +import { ExchangeInvalidFanOutParamsError } from '../../errors/index.js'; -export function _validateExchangeFanOutParams(fanOutName: string): string { +export function _validateExchangeFanOutParams( + fanOutName: string, +): string | ExchangeInvalidFanOutParamsError { const name = redisKeys.validateRedisKey(fanOutName); - if (name instanceof Error) throw name; + if (name instanceof Error) return new ExchangeInvalidFanOutParamsError(); return name; } diff --git a/src/lib/exchange/exchange-fan-out/exchange-fan-out.ts b/src/lib/exchange/exchange-fan-out/exchange-fan-out.ts index 5795ec6e..68280d90 100644 --- a/src/lib/exchange/exchange-fan-out/exchange-fan-out.ts +++ b/src/lib/exchange/exchange-fan-out/exchange-fan-out.ts @@ -16,8 +16,11 @@ import { IQueueParams, IQueueProperties, } from '../../queue/index.js'; -import { ExchangeFanOutError } from '../errors/index.js'; -import { ExchangeError } from '../errors/index.js'; +import { + ExchangeFanOutError, + ExchangeFanOutExchangeHasBoundQueuesError, + ExchangeQueueIsNotBoundToExchangeError, +} from '../errors/index.js'; import { ExchangeAbstract } from '../exchange-abstract.js'; import { _getFanOutExchangeQueues } from './_/_get-fan-out-exchange-queues.js'; import { _getQueueFanOutExchange } from './_/_get-queue-fan-out-exchange.js'; @@ -26,15 +29,18 @@ import { _validateExchangeFanOutParams } from './_/_validate-exchange-fan-out-pa export class ExchangeFanOut extends ExchangeAbstract { getQueues(exchangeParams: string, cb: ICallback): void { const fanOutName = _validateExchangeFanOutParams(exchangeParams); - this.redisClient.getSetInstance((err, client) => { - if (err) cb(err); - else if (!client) cb(new CallbackEmptyReplyError()); - else _getFanOutExchangeQueues(client, fanOutName, cb); - }); + if (fanOutName instanceof Error) cb(fanOutName); + else { + this.redisClient.getSetInstance((err, client) => { + if (err) cb(err); + else if (!client) cb(new CallbackEmptyReplyError()); + else _getFanOutExchangeQueues(client, fanOutName, cb); + }); + } } saveExchange(exchangeParams: string, cb: ICallback): void { - const fanOutName = redisKeys.validateRedisKey(exchangeParams); + const fanOutName = _validateExchangeFanOutParams(exchangeParams); if (fanOutName instanceof Error) cb(fanOutName); else { const { keyFanOutExchanges } = redisKeys.getMainKeys(); @@ -48,7 +54,7 @@ export class ExchangeFanOut extends ExchangeAbstract { } deleteExchange(exchangeParams: string, cb: ICallback): void { - const fanOutName = redisKeys.validateRedisKey(exchangeParams); + const fanOutName = _validateExchangeFanOutParams(exchangeParams); if (fanOutName instanceof Error) cb(fanOutName); else { const { keyExchangeBindings } = @@ -66,11 +72,7 @@ export class ExchangeFanOut extends ExchangeAbstract { (err, reply = []) => { if (err) cb(err); else if (reply.length) - cb( - new ExchangeError( - `Exchange has ${reply.length} bound queue(s). Unbind all queues before deleting the exchange.`, - ), - ); + cb(new ExchangeFanOutExchangeHasBoundQueuesError()); else { const multi = client.multi(); multi.srem(keyFanOutExchanges, fanOutName); @@ -92,7 +94,7 @@ export class ExchangeFanOut extends ExchangeAbstract { cb: ICallback, ): void { const queueParams = _parseQueueParams(queue); - const fanOutName = redisKeys.validateRedisKey(exchangeParams); + const fanOutName = _validateExchangeFanOutParams(exchangeParams); if (queueParams instanceof Error) cb(queueParams); else if (fanOutName instanceof Error) cb(fanOutName); else { @@ -182,7 +184,7 @@ export class ExchangeFanOut extends ExchangeAbstract { cb: ICallback, ): void { const queueParams = _parseQueueParams(queue); - const fanOutName = redisKeys.validateRedisKey(exchangeParams); + const fanOutName = _validateExchangeFanOutParams(exchangeParams); if (queueParams instanceof Error) cb(queueParams); else if (fanOutName instanceof Error) cb(fanOutName); else { @@ -206,11 +208,7 @@ export class ExchangeFanOut extends ExchangeAbstract { if (err) cb(err); else if (!properties) cb(new CallbackEmptyReplyError()); else if (properties.exchange !== fanOutName) - cb( - new ExchangeFanOutError( - `Queue ${queueParams.name}@${queueParams.ns} is not bound to [${fanOutName}] exchange.`, - ), - ); + cb(new ExchangeQueueIsNotBoundToExchangeError()); else cb(); }), (cb: ICallback) => { diff --git a/src/lib/exchange/exchange-topic/_/_get-exchange-topic-transferable.ts b/src/lib/exchange/exchange-topic/_/_get-exchange-topic-transferable.ts index a3cbf00a..58be109d 100644 --- a/src/lib/exchange/exchange-topic/_/_get-exchange-topic-transferable.ts +++ b/src/lib/exchange/exchange-topic/_/_get-exchange-topic-transferable.ts @@ -8,6 +8,7 @@ */ import { v4 } from 'uuid'; +import { ExchangeInvalidTopicParamsError } from '../../errors/exchange-invalid-topic-params.error.js'; import { EExchangeType, ITopicParams, @@ -17,9 +18,11 @@ import { _validateExchangeTopicParams } from './_validate-exchange-topic-params. export function _getExchangeTopicTransferable( topic: string | ITopicParams, -): TExchangeTopicTransferable { +): TExchangeTopicTransferable | ExchangeInvalidTopicParamsError { + const params = _validateExchangeTopicParams(topic); + if (params instanceof Error) return params; return { - params: _validateExchangeTopicParams(topic), + params, type: EExchangeType.TOPIC, exchangeTag: v4(), }; diff --git a/src/lib/exchange/exchange-topic/_/_get-topic-exchange-params.ts b/src/lib/exchange/exchange-topic/_/_get-topic-exchange-params.ts index 54429ac4..20962361 100644 --- a/src/lib/exchange/exchange-topic/_/_get-topic-exchange-params.ts +++ b/src/lib/exchange/exchange-topic/_/_get-topic-exchange-params.ts @@ -7,14 +7,14 @@ * in the root directory of this source tree. */ -import { RedisKeysError } from '../../../../common/redis-keys/redis-keys.error.js'; import { redisKeys } from '../../../../common/redis-keys/redis-keys.js'; import { Configuration } from '../../../../config/index.js'; +import { ExchangeInvalidTopicParamsError } from '../../errors/exchange-invalid-topic-params.error.js'; import { ITopicParams } from '../../types/index.js'; export function _getTopicExchangeParams( topic: ITopicParams | string, -): ITopicParams | RedisKeysError { +): ITopicParams | ExchangeInvalidTopicParamsError { const config = Configuration.getSetConfig(); const topicParams = typeof topic === 'string' @@ -24,9 +24,9 @@ export function _getTopicExchangeParams( } : topic; const vTopic = redisKeys.validateRedisKey(topicParams.topic); - if (vTopic instanceof Error) return vTopic; + if (vTopic instanceof Error) return new ExchangeInvalidTopicParamsError(); const vNamespace = redisKeys.validateNamespace(topicParams.ns); - if (vNamespace instanceof Error) return vNamespace; + if (vNamespace instanceof Error) return new ExchangeInvalidTopicParamsError(); return { topic: vTopic, ns: vNamespace, diff --git a/src/lib/exchange/exchange-topic/_/_validate-exchange-topic-params.ts b/src/lib/exchange/exchange-topic/_/_validate-exchange-topic-params.ts index 1370e69e..dcf76d7f 100644 --- a/src/lib/exchange/exchange-topic/_/_validate-exchange-topic-params.ts +++ b/src/lib/exchange/exchange-topic/_/_validate-exchange-topic-params.ts @@ -7,13 +7,14 @@ * in the root directory of this source tree. */ +import { ExchangeInvalidTopicParamsError } from '../../errors/exchange-invalid-topic-params.error.js'; import { ITopicParams } from '../../types/index.js'; import { _getTopicExchangeParams } from './_get-topic-exchange-params.js'; export function _validateExchangeTopicParams( topicParams: string | ITopicParams, -): ITopicParams { +): ITopicParams | ExchangeInvalidTopicParamsError { const params = _getTopicExchangeParams(topicParams); - if (params instanceof Error) throw params; + if (params instanceof Error) return new ExchangeInvalidTopicParamsError(); return params; } diff --git a/src/lib/message/_/_delete-message.ts b/src/lib/message/_/_delete-message.ts index 2b4bd22d..a325e9a2 100644 --- a/src/lib/message/_/_delete-message.ts +++ b/src/lib/message/_/_delete-message.ts @@ -16,7 +16,12 @@ import { import { ELuaScriptName } from '../../../common/redis-client/scripts/scripts.js'; import { redisKeys } from '../../../common/redis-keys/redis-keys.js'; import { EQueueProperty, EQueueType } from '../../queue/index.js'; -import { MessageDeleteError } from '../errors/index.js'; +import { + MessageError, + MessageInvalidParametersError, + MessageMessageInProcessError, + MessageMessageNotFoundError, +} from '../errors/index.js'; import { EMessageProperty, EMessagePropertyStatus } from '../types/index.js'; import { _getMessage } from './_get-message.js'; @@ -88,9 +93,19 @@ export function _deleteMessage( argv, (err, reply) => { if (err) cb(err); - else if (reply !== 'OK') - cb(new MessageDeleteError(reply ? String(reply) : undefined)); - else cb(); + else if (reply !== 'OK') { + if (reply === 'MESSAGE_NOT_FOUND') { + cb(new MessageMessageNotFoundError()); + } else if (reply === 'MESSAGE_IN_PROCESS') { + cb(new MessageMessageInProcessError()); + } else if (reply === 'MESSAGE_NOT_DELETED') { + cb(new MessageMessageNotFoundError()); + } else if (reply === 'INVALID_PARAMETERS') { + cb(new MessageInvalidParametersError()); + } else { + cb(new MessageError()); + } + } else cb(); }, ); } else cb(); diff --git a/src/lib/message/_/_get-message-state.ts b/src/lib/message/_/_get-message-state.ts index cdaaa8cb..7d70a65e 100644 --- a/src/lib/message/_/_get-message-state.ts +++ b/src/lib/message/_/_get-message-state.ts @@ -9,7 +9,7 @@ import { ICallback, IRedisClient } from 'redis-smq-common'; import { redisKeys } from '../../../common/redis-keys/redis-keys.js'; -import { MessageNotFoundError } from '../errors/index.js'; +import { MessageMessageNotFoundError } from '../errors/index.js'; import { EMessageProperty, IMessageStateTransferable } from '../types/index.js'; export function _getMessageState( @@ -20,7 +20,7 @@ export function _getMessageState( const { keyMessage } = redisKeys.getMessageKeys(messageId); redisClient.hget(keyMessage, String(EMessageProperty.STATE), (err, reply) => { if (err) cb(err); - else if (!reply) cb(new MessageNotFoundError()); + else if (!reply) cb(new MessageMessageNotFoundError()); else cb(null, JSON.parse(reply)); }); } diff --git a/src/lib/message/_/_get-message-status.ts b/src/lib/message/_/_get-message-status.ts index e93faef2..46a60ba4 100644 --- a/src/lib/message/_/_get-message-status.ts +++ b/src/lib/message/_/_get-message-status.ts @@ -9,7 +9,7 @@ import { ICallback, IRedisClient } from 'redis-smq-common'; import { redisKeys } from '../../../common/redis-keys/redis-keys.js'; -import { MessageNotFoundError } from '../errors/index.js'; +import { MessageMessageNotFoundError } from '../errors/index.js'; import { EMessageProperty, EMessagePropertyStatus } from '../types/index.js'; export function _getMessageStatus( @@ -23,7 +23,7 @@ export function _getMessageStatus( String(EMessageProperty.STATUS), (err, reply) => { if (err) cb(err); - else if (!reply) cb(new MessageNotFoundError()); + else if (!reply) cb(new MessageMessageNotFoundError()); else cb(null, Number(reply)); }, ); diff --git a/src/lib/message/_/_get-message.ts b/src/lib/message/_/_get-message.ts index e4e9f4be..20bd28ca 100644 --- a/src/lib/message/_/_get-message.ts +++ b/src/lib/message/_/_get-message.ts @@ -9,7 +9,7 @@ import { async, ICallback, IRedisClient } from 'redis-smq-common'; import { redisKeys } from '../../../common/redis-keys/redis-keys.js'; -import { MessageNotFoundError } from '../errors/index.js'; +import { MessageMessageNotFoundError } from '../errors/index.js'; import { MessageEnvelope } from '../message-envelope.js'; import { EMessageProperty } from '../types/index.js'; import { _fromMessage } from './_from-message.js'; @@ -23,7 +23,7 @@ export function _getMessage( redisClient.hgetall(keyMessage, (err, reply) => { if (err) cb(err); else if (!reply || !Object.keys(reply).length) - cb(new MessageNotFoundError()); + cb(new MessageMessageNotFoundError()); else cb( null, @@ -49,7 +49,7 @@ export function _getMessages( redisClient.hgetall(keyMessage, (err, reply) => { if (err) done(err); else if (!reply || !Object.keys(reply).length) { - done(new MessageNotFoundError()); + done(new MessageMessageNotFoundError()); } else { const msg = _fromMessage( reply[EMessageProperty.MESSAGE], diff --git a/src/lib/message/_/_requeue-message.ts b/src/lib/message/_/_requeue-message.ts new file mode 100644 index 00000000..d9a3c179 --- /dev/null +++ b/src/lib/message/_/_requeue-message.ts @@ -0,0 +1,96 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { + CallbackEmptyReplyError, + ICallback, + IRedisClient, +} from 'redis-smq-common'; +import { ELuaScriptName } from '../../../common/redis-client/scripts/scripts.js'; +import { redisKeys } from '../../../common/redis-keys/redis-keys.js'; +import { _getMessage } from './_get-message.js'; +import { + EMessageProperty, + EMessagePropertyStatus, + MessageError, + MessageMessageNotRequeuableError, + MessageInvalidParametersError, + MessageMessageNotFoundError, +} from '../index.js'; +import { EQueueProperty, EQueueType } from '../../queue/index.js'; + +export function _requeueMessage( + redisClient: IRedisClient, + messageId: string, + cb: ICallback, +): void { + _getMessage(redisClient, messageId, (err, message) => { + if (err) cb(err); + else if (!message) cb(new CallbackEmptyReplyError()); + else if ( + ![ + EMessagePropertyStatus.ACKNOWLEDGED, + EMessagePropertyStatus.DEAD_LETTERED, + ].includes(message.getStatus()) + ) + cb(new MessageMessageNotRequeuableError()); + else { + message.getMessageState().reset(); // resetting all system parameters + const { + keyQueueProperties, + keyQueuePending, + keyQueuePriorityPending, + keyQueueAcknowledged, + keyQueueDL, + } = redisKeys.getQueueKeys( + message.getDestinationQueue(), + message.getConsumerGroupId(), + ); + const messageId = message.getId(); + const { keyMessage } = redisKeys.getMessageKeys(messageId); + const status = message.getStatus(); + const sourceKey = + status === EMessagePropertyStatus.DEAD_LETTERED + ? keyQueueDL + : keyQueueAcknowledged; + redisClient.runScript( + ELuaScriptName.REQUEUE_MESSAGE, + [ + sourceKey, + keyQueueProperties, + keyQueuePriorityPending, + keyQueuePending, + keyMessage, + ], + [ + EQueueProperty.QUEUE_TYPE, + EQueueType.PRIORITY_QUEUE, + EQueueType.LIFO_QUEUE, + EQueueType.FIFO_QUEUE, + EMessageProperty.STATUS, + EMessagePropertyStatus.PENDING, + EMessageProperty.STATE, + messageId, + message.producibleMessage.getPriority() ?? '', + JSON.stringify(message.getMessageState()), + ], + (err, reply) => { + if (err) cb(err); + else if (reply !== 'OK') { + if (reply === 'MESSAGE_NOT_FOUND') + cb(new MessageMessageNotFoundError()); + else if (reply === 'INVALID_PARAMETERS') + cb(new MessageInvalidParametersError()); + else cb(new MessageError()); + } else cb(); + }, + ); + } + }); +} diff --git a/src/lib/message/errors/index.ts b/src/lib/message/errors/index.ts index 874129ab..d9b61a64 100644 --- a/src/lib/message/errors/index.ts +++ b/src/lib/message/errors/index.ts @@ -10,6 +10,9 @@ export { MessageDestinationQueueAlreadySetError } from './message-destination-queue-already-set.error.js'; export { MessageDestinationQueueRequiredError } from './message-destination-queue-required.error.js'; export { MessageError } from './message.error.js'; -export { MessageExchangeRequiredError } from './message-exchange-required.error.js'; -export { MessageNotFoundError } from './message-not-found.error.js'; -export { MessageDeleteError } from './message-delete.error.js'; +export { MessageMessageExchangeRequiredError } from './message-message-exchange-required.error.js'; +export { MessageMessageNotFoundError } from './message-message-not-found.error.js'; +export { MessageMessagePropertyError } from './message-message-property.error.js'; +export { MessageMessageInProcessError } from './message-message-in-process.error.js'; +export { MessageInvalidParametersError } from './message-invalid-parameters.error.js'; +export { MessageMessageNotRequeuableError } from './message-message-not-requeuable.error.js'; diff --git a/src/lib/message/errors/message-destination-queue-already-set.error.ts b/src/lib/message/errors/message-destination-queue-already-set.error.ts index a20b6506..4edceed8 100644 --- a/src/lib/message/errors/message-destination-queue-already-set.error.ts +++ b/src/lib/message/errors/message-destination-queue-already-set.error.ts @@ -9,8 +9,4 @@ import { MessageError } from './message.error.js'; -export class MessageDestinationQueueAlreadySetError extends MessageError { - constructor() { - super(`Destination queue is already set`); - } -} +export class MessageDestinationQueueAlreadySetError extends MessageError {} diff --git a/src/lib/message/errors/message-destination-queue-required.error.ts b/src/lib/message/errors/message-destination-queue-required.error.ts index 5223be35..8592e585 100644 --- a/src/lib/message/errors/message-destination-queue-required.error.ts +++ b/src/lib/message/errors/message-destination-queue-required.error.ts @@ -9,8 +9,4 @@ import { MessageError } from './message.error.js'; -export class MessageDestinationQueueRequiredError extends MessageError { - constructor() { - super(`Destination queue is required`); - } -} +export class MessageDestinationQueueRequiredError extends MessageError {} diff --git a/src/lib/message/errors/message-not-found.error.ts b/src/lib/message/errors/message-invalid-parameters.error.ts similarity index 67% rename from src/lib/message/errors/message-not-found.error.ts rename to src/lib/message/errors/message-invalid-parameters.error.ts index 6a2d65db..a3f74c26 100644 --- a/src/lib/message/errors/message-not-found.error.ts +++ b/src/lib/message/errors/message-invalid-parameters.error.ts @@ -9,8 +9,4 @@ import { MessageError } from './message.error.js'; -export class MessageNotFoundError extends MessageError { - constructor(msg?: string) { - super(msg ?? `MESSAGE_NOT_FOUND`); - } -} +export class MessageInvalidParametersError extends MessageError {} diff --git a/src/lib/message/errors/message-delete.error.ts b/src/lib/message/errors/message-message-exchange-required.error.ts similarity index 66% rename from src/lib/message/errors/message-delete.error.ts rename to src/lib/message/errors/message-message-exchange-required.error.ts index dc80daff..3c6461cb 100644 --- a/src/lib/message/errors/message-delete.error.ts +++ b/src/lib/message/errors/message-message-exchange-required.error.ts @@ -9,8 +9,4 @@ import { MessageError } from './message.error.js'; -export class MessageDeleteError extends MessageError { - constructor(msg?: string) { - super(msg ?? `MESSAGE_DELETE_ERROR`); - } -} +export class MessageMessageExchangeRequiredError extends MessageError {} diff --git a/src/lib/message/errors/message-message-in-process.error.ts b/src/lib/message/errors/message-message-in-process.error.ts new file mode 100644 index 00000000..66e412a6 --- /dev/null +++ b/src/lib/message/errors/message-message-in-process.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { RedisSMQError } from 'redis-smq-common'; + +export class MessageMessageInProcessError extends RedisSMQError {} diff --git a/src/lib/message/errors/message-exchange-required.error.ts b/src/lib/message/errors/message-message-not-found.error.ts similarity index 66% rename from src/lib/message/errors/message-exchange-required.error.ts rename to src/lib/message/errors/message-message-not-found.error.ts index 3d792781..3a162017 100644 --- a/src/lib/message/errors/message-exchange-required.error.ts +++ b/src/lib/message/errors/message-message-not-found.error.ts @@ -9,8 +9,4 @@ import { MessageError } from './message.error.js'; -export class MessageExchangeRequiredError extends MessageError { - constructor() { - super(`A message exchange is required`); - } -} +export class MessageMessageNotFoundError extends MessageError {} diff --git a/src/lib/message/errors/message-message-not-requeuable.error.ts b/src/lib/message/errors/message-message-not-requeuable.error.ts new file mode 100644 index 00000000..b178a1a1 --- /dev/null +++ b/src/lib/message/errors/message-message-not-requeuable.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { MessageError } from './message.error.js'; + +export class MessageMessageNotRequeuableError extends MessageError {} diff --git a/src/lib/message/errors/message-message-property.error.ts b/src/lib/message/errors/message-message-property.error.ts new file mode 100644 index 00000000..75254d0c --- /dev/null +++ b/src/lib/message/errors/message-message-property.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { MessageError } from './message.error.js'; + +export class MessageMessagePropertyError extends MessageError {} diff --git a/src/lib/message/message-envelope.ts b/src/lib/message/message-envelope.ts index 714c0d72..a2b35cc3 100644 --- a/src/lib/message/message-envelope.ts +++ b/src/lib/message/message-envelope.ts @@ -13,7 +13,7 @@ import { IQueueParams } from '../queue/index.js'; import { MessageDestinationQueueAlreadySetError, MessageDestinationQueueRequiredError, - MessageExchangeRequiredError, + MessageMessageExchangeRequiredError, } from './errors/index.js'; import { MessageState } from './message-state.js'; import { ProducibleMessage } from './producible-message.js'; @@ -150,7 +150,7 @@ export class MessageEnvelope { getExchange(): TExchangeTransferable { const exchange = this.producibleMessage.getExchange(); if (!exchange) { - throw new MessageExchangeRequiredError(); + throw new MessageMessageExchangeRequiredError(); } return exchange; } diff --git a/src/lib/message/message.ts b/src/lib/message/message.ts index 744988fb..c8a81b81 100644 --- a/src/lib/message/message.ts +++ b/src/lib/message/message.ts @@ -14,6 +14,7 @@ import { _deleteMessage } from './_/_delete-message.js'; import { _getMessageState } from './_/_get-message-state.js'; import { _getMessageStatus } from './_/_get-message-status.js'; import { _getMessage, _getMessages } from './_/_get-message.js'; +import { _requeueMessage } from './_/_requeue-message.js'; import { EMessagePropertyStatus, IMessageStateTransferable, @@ -101,6 +102,14 @@ export class Message { this.deleteMessagesByIds([id], cb); } + requeueMessageById(messageId: string, cb: ICallback): void { + this.redisClient.getSetInstance((err, client) => { + if (err) cb(err); + else if (!client) cb(new CallbackEmptyReplyError()); + else _requeueMessage(client, messageId, cb); + }); + } + shutdown = (cb: ICallback): void => { this.redisClient.shutdown(cb); }; diff --git a/src/lib/message/producible-message.ts b/src/lib/message/producible-message.ts index b5cb7356..2dca86ae 100644 --- a/src/lib/message/producible-message.ts +++ b/src/lib/message/producible-message.ts @@ -17,7 +17,7 @@ import { TExchangeTransferable, } from '../exchange/index.js'; import { IQueueParams } from '../queue/index.js'; -import { MessageError } from './errors/index.js'; +import { MessageMessagePropertyError } from './errors/message-message-property.error.js'; import { EMessagePriority, TMessageConsumeOptions } from './types/index.js'; export class ProducibleMessage { @@ -92,9 +92,7 @@ export class ProducibleMessage { protected static validateRetryDelay(delay: number): number { const value = Number(delay); if (isNaN(value) || value < 0) { - throw new MessageError( - 'Expected a positive integer in milliseconds >= 0', - ); + throw new MessageMessagePropertyError(); } return value; } @@ -102,9 +100,7 @@ export class ProducibleMessage { protected static validateTTL(ttl: unknown): number { const value = Number(ttl); if (isNaN(value) || value < 0) { - throw new MessageError( - 'Expected a positive integer value in milliseconds >= 0', - ); + throw new MessageMessagePropertyError(); } return value; } @@ -112,9 +108,7 @@ export class ProducibleMessage { protected static validateConsumeTimeout(timeout: unknown): number { const value = Number(timeout); if (isNaN(value) || value < 0) { - throw new MessageError( - 'Expected a positive integer value in milliseconds >= 0', - ); + throw new MessageMessagePropertyError(); } return value; } @@ -122,9 +116,7 @@ export class ProducibleMessage { protected static validateRetryThreshold(threshold: unknown): number { const value = Number(threshold); if (isNaN(value) || value < 0) { - throw new MessageError( - 'Retry threshold should be a positive integer >= 0', - ); + throw new MessageMessagePropertyError(); } return value; } @@ -141,9 +133,7 @@ export class ProducibleMessage { // So just make sure that we have an integer value const value = Number(period); if (isNaN(value) || value < 0) { - throw new MessageError( - 'Expected a positive integer value in milliseconds', - ); + throw new MessageMessagePropertyError(); } this.scheduledRepeatPeriod = value; return this; @@ -157,9 +147,7 @@ export class ProducibleMessage { // So just make sure that we have an integer value const value = Number(delay); if (isNaN(value) || value < 0) { - throw new MessageError( - 'Expected a positive integer value in milliseconds', - ); + throw new MessageMessagePropertyError(); } this.scheduledDelay = value; return this; @@ -181,7 +169,7 @@ export class ProducibleMessage { // So just make sure that we have an integer value const value = Number(repeat); if (isNaN(value) || value < 0) { - throw new MessageError('Expected a positive integer value >= 0'); + throw new MessageMessagePropertyError(); } this.scheduledRepeat = value; return this; @@ -244,17 +232,23 @@ export class ProducibleMessage { } setFanOut(fanOutName: string): ProducibleMessage { - this.exchange = _getExchangeFanOutTransferable(fanOutName); + const exchange = _getExchangeFanOutTransferable(fanOutName); + if (exchange instanceof Error) throw exchange; + this.exchange = exchange; return this; } setTopic(topicParams: string | ITopicParams): ProducibleMessage { - this.exchange = _getExchangeTopicTransferable(topicParams); + const exchange = _getExchangeTopicTransferable(topicParams); + if (exchange instanceof Error) throw exchange; + this.exchange = exchange; return this; } setQueue(queueParams: string | IQueueParams): ProducibleMessage { - this.exchange = _getExchangeDirectTransferable(queueParams); + const exchange = _getExchangeDirectTransferable(queueParams); + if (exchange instanceof Error) throw exchange; + this.exchange = exchange; return this; } diff --git a/src/lib/namespace/namespace.ts b/src/lib/namespace/namespace.ts index 8d711e8c..5116ee0c 100644 --- a/src/lib/namespace/namespace.ts +++ b/src/lib/namespace/namespace.ts @@ -102,7 +102,7 @@ export class Namespace { (cb: ICallback) => { client.sismember(keyNamespaces, ns, (err, isMember) => { if (err) cb(err); - else if (!isMember) cb(new NamespaceNotFoundError(ns)); + else if (!isMember) cb(new NamespaceNotFoundError()); else cb(); }); }, diff --git a/src/lib/producer/_/_schedule-message.ts b/src/lib/producer/_/_schedule-message.ts index 4bfab2aa..ea40e36e 100644 --- a/src/lib/producer/_/_schedule-message.ts +++ b/src/lib/producer/_/_schedule-message.ts @@ -16,7 +16,9 @@ import { } from '../../message/index.js'; import { MessageEnvelope } from '../../message/message-envelope.js'; import { EQueueProperty } from '../../queue/index.js'; -import { ProducerMessageNotScheduledError } from '../errors/index.js'; +import { ProducerQueueNotFoundError } from '../errors/producer-queue-not-found.error.js'; +import { ProducerScheduleInvalidParametersError } from '../errors/producer-schedule-invalid-parameters.error.js'; +import { ProducerError } from '../errors/producer.error.js'; export function _scheduleMessage( mixed: IRedisClient, @@ -62,11 +64,16 @@ export function _scheduleMessage( ], (err, reply) => { if (err) cb(err); - else if (reply !== 'OK') - cb(new ProducerMessageNotScheduledError(String(reply))); - else cb(); + else if (reply !== 'OK') { + if (reply === 'QUEUE_NOT_FOUND') { + cb(new ProducerQueueNotFoundError()); + } else if (reply !== 'INVALID_PARAMETERS') { + cb(new ProducerScheduleInvalidParametersError()); + } else { + cb(new ProducerError()); + } + } else cb(); }, ); - } else - cb(new ProducerMessageNotScheduledError('INVALID_SCHEDULING_PARAMETERS')); + } else cb(new ProducerScheduleInvalidParametersError()); } diff --git a/src/lib/producer/errors/index.ts b/src/lib/producer/errors/index.ts index 87b25581..a0adb8e1 100644 --- a/src/lib/producer/errors/index.ts +++ b/src/lib/producer/errors/index.ts @@ -8,8 +8,12 @@ */ export { ProducerError } from './producer.error.js'; -export { ProducerMessageNotPublishedError } from './producer-message-not-published.error.js'; -export { ProducerMessageNotScheduledError } from './producer-message-not-scheduled.error.js'; export { ProducerInstanceNotRunningError } from './producer-instance-not-running.error.js'; -export { ProducerQueueWithoutConsumerGroupsError } from './producer-queue-without-consumer-groups.error.js'; +export { ProducerQueueMissingConsumerGroupsError } from './producer-queue-missing-consumer-groups.error.js'; export { ProducerMessageExchangeRequiredError } from './producer-message-exchange-required.error.js'; +export { ProducerQueueNotFoundError } from './producer-queue-not-found.error.js'; +export { ProducerMessagePriorityRequiredError } from './producer-message-priority-required.error.js'; +export { ProducerPriorityQueuingNotEnabledError } from './producer-priority-queuing-not-enabled.error.js'; +export { ProducerUnknownQueueTypeError } from './producer-unknown-queue-type.error.js'; +export { ProducerExchangeNoMatchedQueueError } from './producer-exchange-no-matched-queue.error.js'; +export { ProducerScheduleInvalidParametersError } from './producer-schedule-invalid-parameters.error.js'; diff --git a/src/lib/producer/errors/producer-exchange-no-matched-queue.error.ts b/src/lib/producer/errors/producer-exchange-no-matched-queue.error.ts new file mode 100644 index 00000000..e74c02c3 --- /dev/null +++ b/src/lib/producer/errors/producer-exchange-no-matched-queue.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ProducerError } from './producer.error.js'; + +export class ProducerExchangeNoMatchedQueueError extends ProducerError {} diff --git a/src/lib/producer/errors/producer-instance-not-running.error.ts b/src/lib/producer/errors/producer-instance-not-running.error.ts index f1363d41..750d61ff 100644 --- a/src/lib/producer/errors/producer-instance-not-running.error.ts +++ b/src/lib/producer/errors/producer-instance-not-running.error.ts @@ -9,10 +9,4 @@ import { ProducerError } from './producer.error.js'; -export class ProducerInstanceNotRunningError extends ProducerError { - constructor( - msg = `Producer instance is not running. Before producing messages you need to run your producer instance.`, - ) { - super(msg); - } -} +export class ProducerInstanceNotRunningError extends ProducerError {} diff --git a/src/lib/producer/errors/producer-message-priority-required.error.ts b/src/lib/producer/errors/producer-message-priority-required.error.ts new file mode 100644 index 00000000..9ffa99fd --- /dev/null +++ b/src/lib/producer/errors/producer-message-priority-required.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ProducerError } from './producer.error.js'; + +export class ProducerMessagePriorityRequiredError extends ProducerError {} diff --git a/src/lib/producer/errors/producer-priority-queuing-not-enabled.error.ts b/src/lib/producer/errors/producer-priority-queuing-not-enabled.error.ts new file mode 100644 index 00000000..356528f9 --- /dev/null +++ b/src/lib/producer/errors/producer-priority-queuing-not-enabled.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ProducerError } from './producer.error.js'; + +export class ProducerPriorityQueuingNotEnabledError extends ProducerError {} diff --git a/src/lib/producer/errors/producer-queue-without-consumer-groups.error.ts b/src/lib/producer/errors/producer-queue-missing-consumer-groups.error.ts similarity index 81% rename from src/lib/producer/errors/producer-queue-without-consumer-groups.error.ts rename to src/lib/producer/errors/producer-queue-missing-consumer-groups.error.ts index da444377..c489c0a4 100644 --- a/src/lib/producer/errors/producer-queue-without-consumer-groups.error.ts +++ b/src/lib/producer/errors/producer-queue-missing-consumer-groups.error.ts @@ -9,4 +9,4 @@ import { ProducerError } from './producer.error.js'; -export class ProducerQueueWithoutConsumerGroupsError extends ProducerError {} +export class ProducerQueueMissingConsumerGroupsError extends ProducerError {} diff --git a/src/lib/producer/errors/producer-message-not-scheduled.error.ts b/src/lib/producer/errors/producer-queue-not-found.error.ts similarity index 79% rename from src/lib/producer/errors/producer-message-not-scheduled.error.ts rename to src/lib/producer/errors/producer-queue-not-found.error.ts index 71a230b6..cf4f5230 100644 --- a/src/lib/producer/errors/producer-message-not-scheduled.error.ts +++ b/src/lib/producer/errors/producer-queue-not-found.error.ts @@ -9,4 +9,4 @@ import { ProducerError } from './producer.error.js'; -export class ProducerMessageNotScheduledError extends ProducerError {} +export class ProducerQueueNotFoundError extends ProducerError {} diff --git a/src/lib/producer/errors/producer-schedule-invalid-parameters.error.ts b/src/lib/producer/errors/producer-schedule-invalid-parameters.error.ts new file mode 100644 index 00000000..66568c66 --- /dev/null +++ b/src/lib/producer/errors/producer-schedule-invalid-parameters.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ProducerError } from './producer.error.js'; + +export class ProducerScheduleInvalidParametersError extends ProducerError {} diff --git a/src/lib/producer/errors/producer-message-not-published.error.ts b/src/lib/producer/errors/producer-unknown-queue-type.error.ts similarity index 79% rename from src/lib/producer/errors/producer-message-not-published.error.ts rename to src/lib/producer/errors/producer-unknown-queue-type.error.ts index fbc880a3..8a87f0a0 100644 --- a/src/lib/producer/errors/producer-message-not-published.error.ts +++ b/src/lib/producer/errors/producer-unknown-queue-type.error.ts @@ -9,4 +9,4 @@ import { ProducerError } from './producer.error.js'; -export class ProducerMessageNotPublishedError extends ProducerError {} +export class ProducerUnknownQueueTypeError extends ProducerError {} diff --git a/src/lib/producer/producer.ts b/src/lib/producer/producer.ts index ec091bf1..ca74ba90 100644 --- a/src/lib/producer/producer.ts +++ b/src/lib/producer/producer.ts @@ -35,10 +35,15 @@ import { MessageEnvelope } from '../message/message-envelope.js'; import { EQueueProperty, EQueueType, IQueueParams } from '../queue/index.js'; import { _scheduleMessage } from './_/_schedule-message.js'; import { + ProducerError, + ProducerExchangeNoMatchedQueueError, ProducerInstanceNotRunningError, ProducerMessageExchangeRequiredError, - ProducerMessageNotPublishedError, - ProducerQueueWithoutConsumerGroupsError, + ProducerMessagePriorityRequiredError, + ProducerPriorityQueuingNotEnabledError, + ProducerQueueMissingConsumerGroupsError, + ProducerQueueNotFoundError, + ProducerUnknownQueueTypeError, } from './errors/index.js'; import { eventBusPublisher } from './event-bus-publisher.js'; import { QueueConsumerGroupsCache } from './queue-consumer-groups-cache.js'; @@ -182,13 +187,19 @@ export class Producer extends Runnable { ], (err, reply) => { if (err) cb(err); - else if (reply !== 'OK') - cb( - new ProducerMessageNotPublishedError( - reply ? String(reply) : undefined, - ), - ); - else cb(); + else if (reply !== 'OK') { + if (reply === 'QUEUE_NOT_FOUND') { + cb(new ProducerQueueNotFoundError()); + } else if (reply === 'MESSAGE_PRIORITY_REQUIRED') { + cb(new ProducerMessagePriorityRequiredError()); + } else if (reply === 'PRIORITY_QUEUING_NOT_ENABLED') { + cb(new ProducerPriorityQueuingNotEnabledError()); + } else if (reply === 'UNKNOWN_QUEUE_TYPE') { + cb(new ProducerUnknownQueueTypeError()); + } else { + cb(new ProducerError()); + } + } else cb(); }, ); } @@ -237,7 +248,7 @@ export class Producer extends Runnable { this.getQueueConsumerGroupsHandler().getConsumerGroups(queue); if (exists) { if (!consumerGroups.length) { - cb(new ProducerQueueWithoutConsumerGroupsError()); + cb(new ProducerQueueMissingConsumerGroupsError()); } const ids: string[] = []; async.eachOf( @@ -283,11 +294,7 @@ export class Producer extends Runnable { _getExchangeQueues(redisClient, exchangeParams, (err, queues) => { if (err) cb(err); else if (!queues?.length) - cb( - new ProducerMessageNotPublishedError( - `The exchange does not match any queue.`, - ), - ); + cb(new ProducerExchangeNoMatchedQueueError()); else { const messages: string[] = []; async.eachOf( diff --git a/src/lib/queue-messages/_/_requeue-message.ts b/src/lib/queue-messages/_/_requeue-message.ts deleted file mode 100644 index faa2bdf1..00000000 --- a/src/lib/queue-messages/_/_requeue-message.ts +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { - CallbackEmptyReplyError, - ICallback, - IRedisClient, -} from 'redis-smq-common'; -import { ELuaScriptName } from '../../../common/redis-client/scripts/scripts.js'; -import { redisKeys } from '../../../common/redis-keys/redis-keys.js'; -import { _getMessage } from '../../message/_/_get-message.js'; -import { - EMessageProperty, - EMessagePropertyStatus, -} from '../../message/index.js'; -import { _parseQueueParams } from '../../queue/_/_parse-queue-params.js'; -import { EQueueProperty, EQueueType, IQueueParams } from '../../queue/index.js'; -import { MessageRequeueError } from '../errors/index.js'; - -export function _requeueMessage( - redisClient: IRedisClient, - queue: string | IQueueParams, - messageId: string, - messageStatus: - | EMessagePropertyStatus.ACKNOWLEDGED - | EMessagePropertyStatus.DEAD_LETTERED, - cb: ICallback, -): void { - const queueParams = _parseQueueParams(queue); - if (queueParams instanceof Error) cb(queueParams); - else { - _getMessage(redisClient, messageId, (err, message) => { - if (err) cb(err); - else if (!message) cb(new CallbackEmptyReplyError()); - else if (messageStatus !== message.getStatus()) - cb(new MessageRequeueError('INVALID_OPERATION')); - else { - const destinationQueue = message.getDestinationQueue(); - if ( - queueParams.name !== destinationQueue.name || - queueParams.ns !== destinationQueue.ns - ) { - cb(new MessageRequeueError('INVALID_OPERATION')); - } else { - message.getMessageState().reset(); // resetting all system parameters - const { - keyQueueProperties, - keyQueuePending, - keyQueuePriorityPending, - keyQueueAcknowledged, - keyQueueDL, - } = redisKeys.getQueueKeys( - message.getDestinationQueue(), - message.getConsumerGroupId(), - ); - const messageId = message.getId(); - const { keyMessage } = redisKeys.getMessageKeys(messageId); - const status = message.getStatus(); - const sourceKey = - status === EMessagePropertyStatus.DEAD_LETTERED - ? keyQueueDL - : keyQueueAcknowledged; - redisClient.runScript( - ELuaScriptName.REQUEUE_MESSAGE, - [ - sourceKey, - keyQueueProperties, - keyQueuePriorityPending, - keyQueuePending, - keyMessage, - ], - [ - EQueueProperty.QUEUE_TYPE, - EQueueType.PRIORITY_QUEUE, - EQueueType.LIFO_QUEUE, - EQueueType.FIFO_QUEUE, - EMessageProperty.STATUS, - EMessagePropertyStatus.PENDING, - EMessageProperty.STATE, - messageId, - message.producibleMessage.getPriority() ?? '', - JSON.stringify(message.getMessageState()), - ], - (err, reply) => { - if (err) cb(err); - else if (reply !== 'OK') - cb(new MessageRequeueError(String(reply))); - else cb(); - }, - ); - } - } - }); - } -} diff --git a/src/lib/queue-messages/_/_validate-queue-extended-params.ts b/src/lib/queue-messages/_/_validate-queue-extended-params.ts index d7b9fbea..cf3a3436 100644 --- a/src/lib/queue-messages/_/_validate-queue-extended-params.ts +++ b/src/lib/queue-messages/_/_validate-queue-extended-params.ts @@ -9,8 +9,8 @@ import { ICallback, IRedisClient } from 'redis-smq-common'; import { - ConsumerGroupIdNotSupportedError, - ConsumerGroupIdRequiredError, + ConsumerConsumerGroupIdNotSupportedError, + ConsumerConsumerGroupIdRequiredError, } from '../../consumer/index.js'; import { _getQueueProperties } from '../../queue/_/_get-queue-properties.js'; import { EQueueDeliveryModel, IQueueParsedParams } from '../../queue/index.js'; @@ -30,12 +30,12 @@ export function _validateQueueExtendedParams( properties?.deliveryModel === EQueueDeliveryModel.PUB_SUB && !groupId ) { - cb(new ConsumerGroupIdRequiredError()); + cb(new ConsumerConsumerGroupIdRequiredError()); } else if ( properties?.deliveryModel === EQueueDeliveryModel.POINT_TO_POINT && groupId ) { - cb(new ConsumerGroupIdNotSupportedError()); + cb(new ConsumerConsumerGroupIdNotSupportedError()); } else cb(); } }); diff --git a/src/lib/queue-messages/errors/index.ts b/src/lib/queue-messages/errors/index.ts index bc5c094c..cfd2f133 100644 --- a/src/lib/queue-messages/errors/index.ts +++ b/src/lib/queue-messages/errors/index.ts @@ -7,5 +7,4 @@ * in the root directory of this source tree. */ -export { MessageRequeueError } from './message-requeue.error.js'; -export { QueueMessageError } from './queue-message.error.js'; +export { QueueMessagesError } from './queue-messages.error.js'; diff --git a/src/lib/queue-messages/errors/message-requeue.error.ts b/src/lib/queue-messages/errors/message-requeue.error.ts deleted file mode 100644 index 63eaf2d5..00000000 --- a/src/lib/queue-messages/errors/message-requeue.error.ts +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { QueueMessageError } from './queue-message.error.js'; - -export class MessageRequeueError extends QueueMessageError { - constructor(msg = 'MESSAGE_REQUEUE_ERROR') { - super(msg); - } -} diff --git a/src/lib/queue-messages/errors/queue-message.error.ts b/src/lib/queue-messages/errors/queue-messages.error.ts similarity index 82% rename from src/lib/queue-messages/errors/queue-message.error.ts rename to src/lib/queue-messages/errors/queue-messages.error.ts index 1ecf42d6..fe39576f 100644 --- a/src/lib/queue-messages/errors/queue-message.error.ts +++ b/src/lib/queue-messages/errors/queue-messages.error.ts @@ -9,4 +9,4 @@ import { RedisSMQError } from 'redis-smq-common'; -export class QueueMessageError extends RedisSMQError {} +export class QueueMessagesError extends RedisSMQError {} diff --git a/src/lib/queue-messages/queue-acknowledged-messages.ts b/src/lib/queue-messages/queue-acknowledged-messages.ts index 1b62b81f..eab53875 100644 --- a/src/lib/queue-messages/queue-acknowledged-messages.ts +++ b/src/lib/queue-messages/queue-acknowledged-messages.ts @@ -7,37 +7,10 @@ * in the root directory of this source tree. */ -import { CallbackEmptyReplyError, ICallback } from 'redis-smq-common'; import { redisKeys } from '../../common/redis-keys/redis-keys.js'; -import { EMessagePropertyStatus } from '../message/index.js'; -import { IQueueParams } from '../queue/index.js'; -import { _requeueMessage } from './_/_requeue-message.js'; import { QueueMessagesPaginatorList } from './queue-messages-paginator/queue-messages-paginator-list.js'; -import { IQueueMessagesRequeuable } from './types/index.js'; -export class QueueAcknowledgedMessages - extends QueueMessagesPaginatorList - implements IQueueMessagesRequeuable -{ +export class QueueAcknowledgedMessages extends QueueMessagesPaginatorList { protected redisKey: keyof ReturnType = 'keyQueueAcknowledged'; - - requeueMessage( - queue: string | IQueueParams, - messageId: string, - cb: ICallback, - ): void { - this.redisClient.getSetInstance((err, client) => { - if (err) cb(err); - else if (!client) cb(new CallbackEmptyReplyError()); - else - _requeueMessage( - client, - queue, - messageId, - EMessagePropertyStatus.ACKNOWLEDGED, - cb, - ); - }); - } } diff --git a/src/lib/queue-messages/queue-dead-lettered-messages.ts b/src/lib/queue-messages/queue-dead-lettered-messages.ts index 4546fd59..5433af28 100644 --- a/src/lib/queue-messages/queue-dead-lettered-messages.ts +++ b/src/lib/queue-messages/queue-dead-lettered-messages.ts @@ -7,37 +7,10 @@ * in the root directory of this source tree. */ -import { CallbackEmptyReplyError, ICallback } from 'redis-smq-common'; import { redisKeys } from '../../common/redis-keys/redis-keys.js'; -import { EMessagePropertyStatus } from '../message/index.js'; -import { IQueueParams } from '../queue/index.js'; -import { _requeueMessage } from './_/_requeue-message.js'; import { QueueMessagesPaginatorList } from './queue-messages-paginator/queue-messages-paginator-list.js'; -import { IQueueMessagesRequeuable } from './types/index.js'; -export class QueueDeadLetteredMessages - extends QueueMessagesPaginatorList - implements IQueueMessagesRequeuable -{ +export class QueueDeadLetteredMessages extends QueueMessagesPaginatorList { protected redisKey: keyof ReturnType = 'keyQueueDL'; - - requeueMessage( - queue: string | IQueueParams, - messageId: string, - cb: ICallback, - ): void { - this.redisClient.getSetInstance((err, client) => { - if (err) cb(err); - else if (!client) cb(new CallbackEmptyReplyError()); - else - _requeueMessage( - client, - queue, - messageId, - EMessagePropertyStatus.DEAD_LETTERED, - cb, - ); - }); - } } diff --git a/src/lib/queue-messages/types/index.ts b/src/lib/queue-messages/types/index.ts index f1dcf106..de60c79f 100644 --- a/src/lib/queue-messages/types/index.ts +++ b/src/lib/queue-messages/types/index.ts @@ -57,11 +57,3 @@ export type TQueueMessagesPaginationParams = { pageSize: number; consumerGroupId?: string | null; }; - -export interface IQueueMessagesRequeuable { - requeueMessage( - queue: string | IQueueParams, - messageId: string, - cb: ICallback, - ): void; -} diff --git a/src/lib/queue-rate-limit/errors/index.ts b/src/lib/queue-rate-limit/errors/index.ts index 9aeefb44..a116d7e4 100644 --- a/src/lib/queue-rate-limit/errors/index.ts +++ b/src/lib/queue-rate-limit/errors/index.ts @@ -8,3 +8,5 @@ */ export { QueueRateLimitError } from './queue-rate-limit.error.js'; +export { QueueRateLimitInvalidIntervalError } from './queue-rate-limit-invalid-interval.error.js'; +export { QueueRateLimitInvalidLimitError } from './queue-rate-limit-invalid-limit.error.js'; diff --git a/src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-interval.error.ts b/src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-interval.error.ts new file mode 100644 index 00000000..16c43cef --- /dev/null +++ b/src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-interval.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { QueueRateLimitError } from './queue-rate-limit.error.js'; + +export class QueueRateLimitInvalidIntervalError extends QueueRateLimitError {} diff --git a/src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-limit.error.ts b/src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-limit.error.ts new file mode 100644 index 00000000..2cd1dc48 --- /dev/null +++ b/src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-limit.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { QueueRateLimitError } from './queue-rate-limit.error.js'; + +export class QueueRateLimitInvalidLimitError extends QueueRateLimitError {} diff --git a/src/lib/queue-rate-limit/queue-rate-limit.ts b/src/lib/queue-rate-limit/queue-rate-limit.ts index b1f11f5a..621c82f5 100644 --- a/src/lib/queue-rate-limit/queue-rate-limit.ts +++ b/src/lib/queue-rate-limit/queue-rate-limit.ts @@ -24,7 +24,10 @@ import { } from '../queue/index.js'; import { _parseQueueParams } from '../queue/_/_parse-queue-params.js'; import { _hasRateLimitExceeded } from './_/_has-rate-limit-exceeded.js'; -import { QueueRateLimitError } from './errors/index.js'; +import { + QueueRateLimitInvalidLimitError, + QueueRateLimitInvalidIntervalError, +} from './errors/index.js'; export class QueueRateLimit { protected redisClient; @@ -75,19 +78,11 @@ export class QueueRateLimit { // validating rateLimit params from a javascript client const limit = Number(rateLimit.limit); if (isNaN(limit) || limit <= 0) { - cb( - new QueueRateLimitError( - `Invalid rateLimit.limit. Expected a positive integer > 0`, - ), - ); + cb(new QueueRateLimitInvalidLimitError()); } const interval = Number(rateLimit.interval); if (isNaN(interval) || interval < 1000) { - cb( - new QueueRateLimitError( - `Invalid rateLimit.interval. Expected a positive integer >= 1000`, - ), - ); + cb(new QueueRateLimitInvalidIntervalError()); } const validatedRateLimit: IQueueRateLimit = { interval, limit }; const { keyQueueProperties } = redisKeys.getQueueKeys( diff --git a/src/lib/queue/_/_delete-queue.ts b/src/lib/queue/_/_delete-queue.ts index 1f7295ee..e4e26cd8 100644 --- a/src/lib/queue/_/_delete-queue.ts +++ b/src/lib/queue/_/_delete-queue.ts @@ -19,9 +19,9 @@ import { ConsumerHeartbeat } from '../../consumer/consumer-heartbeat/consumer-he import { consumerQueues } from '../../consumer/consumer-queues.js'; import { processingQueue } from '../../consumer/message-handler/processing-queue.js'; import { - QueueHasRunningConsumersError, - QueueNotEmptyError, - QueueNotFoundError, + QueueQueueHasRunningConsumersError, + QueueQueueNotEmptyError, + QueueQueueNotFoundError, } from '../errors/index.js'; import { EQueueDeliveryModel, IQueueParams } from '../types/index.js'; import { _getQueueProperties } from './_get-queue-properties.js'; @@ -41,7 +41,7 @@ function checkOnlineConsumers( else { const r = reply ?? {}; const onlineArr = Object.keys(r).filter((id) => r[id]); - if (onlineArr.length) cb(new QueueHasRunningConsumersError()); + if (onlineArr.length) cb(new QueueQueueHasRunningConsumersError()); else cb(); } }, @@ -106,10 +106,10 @@ export function _deleteQueue( (cb: ICallback): void => _getQueueProperties(redisClient, queueParams, (err, reply) => { if (err) cb(err); - else if (!reply) cb(new QueueNotFoundError()); + else if (!reply) cb(new QueueQueueNotFoundError()); else { const messagesCount = reply.messagesCount; - if (messagesCount) cb(new QueueNotEmptyError()); + if (messagesCount) cb(new QueueQueueNotEmptyError()); else { exchange = reply.exchange ?? null; pubSubDelivery = diff --git a/src/lib/queue/_/_get-queue-properties.ts b/src/lib/queue/_/_get-queue-properties.ts index f47e551b..33cb5087 100644 --- a/src/lib/queue/_/_get-queue-properties.ts +++ b/src/lib/queue/_/_get-queue-properties.ts @@ -9,7 +9,7 @@ import { ICallback, IRedisClient, PanicError } from 'redis-smq-common'; import { redisKeys } from '../../../common/redis-keys/redis-keys.js'; -import { QueueNotFoundError } from '../errors/index.js'; +import { QueueQueueNotFoundError } from '../errors/index.js'; import { EQueueDeliveryModel, EQueueProperty, @@ -55,7 +55,8 @@ export function _getQueueProperties( const { keyQueueProperties } = redisKeys.getQueueKeys(queueParams, null); redisClient.hgetall(keyQueueProperties, (err, reply) => { if (err) cb(err); - else if (!reply || !Object.keys(reply).length) cb(new QueueNotFoundError()); + else if (!reply || !Object.keys(reply).length) + cb(new QueueQueueNotFoundError()); else { const queueProperties = parseProperties(reply); if (queueProperties instanceof Error) cb(queueProperties); diff --git a/src/lib/queue/_/_parse-queue-extended-params.ts b/src/lib/queue/_/_parse-queue-extended-params.ts index e41563b9..67738561 100644 --- a/src/lib/queue/_/_parse-queue-extended-params.ts +++ b/src/lib/queue/_/_parse-queue-extended-params.ts @@ -7,8 +7,9 @@ * in the root directory of this source tree. */ -import { RedisKeysError } from '../../../common/redis-keys/redis-keys.error.js'; +import { RedisKeysError } from '../../../common/redis-keys/errors/redis-keys.error.js'; import { redisKeys } from '../../../common/redis-keys/redis-keys.js'; +import { QueueInvalidQueueParameterError } from '../errors/queue-invalid-queue-parameter.error.js'; import { IQueueParams, IQueueParsedParams, @@ -27,10 +28,11 @@ function isQueueParams(args: unknown): args is IQueueParams { export function _parseQueueExtendedParams( args: TQueueExtendedParams, -): IQueueParsedParams | RedisKeysError { +): IQueueParsedParams | QueueInvalidQueueParameterError { if (typeof args === 'string') { const queueParams = _parseQueueParams(args); - if (queueParams instanceof Error) return queueParams; + if (queueParams instanceof Error) + return new QueueInvalidQueueParameterError(); return { queueParams, groupId: null, @@ -38,7 +40,8 @@ export function _parseQueueExtendedParams( } if (isQueueParams(args)) { const queueParams = _parseQueueParams(args); - if (queueParams instanceof Error) return queueParams; + if (queueParams instanceof Error) + return new QueueInvalidQueueParameterError(); return { queueParams, groupId: null, @@ -49,7 +52,7 @@ export function _parseQueueExtendedParams( let groupId: string | RedisKeysError | null = null; if (args.groupId) { groupId = redisKeys.validateRedisKey(args.groupId); - if (groupId instanceof Error) return groupId; + if (groupId instanceof Error) return new QueueInvalidQueueParameterError(); } return { queueParams, diff --git a/src/lib/queue/_/_parse-queue-params.ts b/src/lib/queue/_/_parse-queue-params.ts index 3be1cffb..a4a30442 100644 --- a/src/lib/queue/_/_parse-queue-params.ts +++ b/src/lib/queue/_/_parse-queue-params.ts @@ -7,22 +7,25 @@ * in the root directory of this source tree. */ -import { RedisKeysError } from '../../../common/redis-keys/redis-keys.error.js'; +import { RedisKeysError } from '../../../common/redis-keys/errors/redis-keys.error.js'; import { redisKeys } from '../../../common/redis-keys/redis-keys.js'; import { Configuration } from '../../../config/index.js'; +import { QueueInvalidQueueParameterError } from '../errors/queue-invalid-queue-parameter.error.js'; import { IQueueParams } from '../types/index.js'; export function _parseQueueParams( queue: string | IQueueParams, -): IQueueParams | RedisKeysError { +): IQueueParams | QueueInvalidQueueParameterError { const queueParams: { name: string; ns?: string } = typeof queue === 'string' ? { name: queue } : queue; const name = redisKeys.validateRedisKey(queueParams.name); - if (name instanceof RedisKeysError) return name; + if (name instanceof RedisKeysError) + return new QueueInvalidQueueParameterError(); const ns = queueParams.ns ? redisKeys.validateNamespace(queueParams.ns) : Configuration.getSetConfig().namespace; - if (ns instanceof RedisKeysError) return ns; + if (ns instanceof RedisKeysError) + return new QueueInvalidQueueParameterError(); return { name, ns, diff --git a/src/lib/queue/errors/index.ts b/src/lib/queue/errors/index.ts index 0259f07e..5b55dd62 100644 --- a/src/lib/queue/errors/index.ts +++ b/src/lib/queue/errors/index.ts @@ -7,8 +7,9 @@ * in the root directory of this source tree. */ -export { QueueExistsError } from './queue-exists.error.js'; -export { QueueHasRunningConsumersError } from './queue-has-running-consumers.error.js'; -export { QueueNotEmptyError } from './queue-not-empty.error.js'; -export { QueueNotFoundError } from './queue-not-found.error.js'; +export { QueueQueueExistsError } from './queue-queue-exists.error.js'; +export { QueueQueueHasRunningConsumersError } from './queue-queue-has-running-consumers.error.js'; +export { QueueQueueNotEmptyError } from './queue-queue-not-empty.error.js'; +export { QueueQueueNotFoundError } from './queue-queue-not-found.error.js'; export { QueueError } from './queue.error.js'; +export { QueueInvalidQueueParameterError } from './queue-invalid-queue-parameter.error.js'; diff --git a/src/lib/queue/errors/queue-has-running-consumers.error.ts b/src/lib/queue/errors/queue-has-running-consumers.error.ts deleted file mode 100644 index f17065a4..00000000 --- a/src/lib/queue/errors/queue-has-running-consumers.error.ts +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { QueueError } from './queue.error.js'; - -export class QueueHasRunningConsumersError extends QueueError { - constructor() { - super( - `Before deleting a queue/namespace, make sure it is not used by a message handler. After shutting down all message handlers, wait a few seconds and try again.`, - ); - } -} diff --git a/src/lib/queue/errors/queue-invalid-queue-parameter.error.ts b/src/lib/queue/errors/queue-invalid-queue-parameter.error.ts new file mode 100644 index 00000000..7234c128 --- /dev/null +++ b/src/lib/queue/errors/queue-invalid-queue-parameter.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { QueueError } from './queue.error.js'; + +export class QueueInvalidQueueParameterError extends QueueError {} diff --git a/src/lib/queue/errors/queue-exists.error.ts b/src/lib/queue/errors/queue-queue-exists.error.ts similarity index 82% rename from src/lib/queue/errors/queue-exists.error.ts rename to src/lib/queue/errors/queue-queue-exists.error.ts index ef4fdf05..1da95f5b 100644 --- a/src/lib/queue/errors/queue-exists.error.ts +++ b/src/lib/queue/errors/queue-queue-exists.error.ts @@ -9,4 +9,4 @@ import { QueueError } from './queue.error.js'; -export class QueueExistsError extends QueueError {} +export class QueueQueueExistsError extends QueueError {} diff --git a/src/lib/queue/errors/queue-queue-has-running-consumers.error.ts b/src/lib/queue/errors/queue-queue-has-running-consumers.error.ts new file mode 100644 index 00000000..5521a039 --- /dev/null +++ b/src/lib/queue/errors/queue-queue-has-running-consumers.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { QueueError } from './queue.error.js'; + +export class QueueQueueHasRunningConsumersError extends QueueError {} diff --git a/src/lib/queue/errors/queue-not-empty.error.ts b/src/lib/queue/errors/queue-queue-not-empty.error.ts similarity index 81% rename from src/lib/queue/errors/queue-not-empty.error.ts rename to src/lib/queue/errors/queue-queue-not-empty.error.ts index ca1b851b..53e03f1d 100644 --- a/src/lib/queue/errors/queue-not-empty.error.ts +++ b/src/lib/queue/errors/queue-queue-not-empty.error.ts @@ -9,4 +9,4 @@ import { QueueError } from './queue.error.js'; -export class QueueNotEmptyError extends QueueError {} +export class QueueQueueNotEmptyError extends QueueError {} diff --git a/src/lib/queue/errors/queue-not-found.error.ts b/src/lib/queue/errors/queue-queue-not-found.error.ts similarity index 81% rename from src/lib/queue/errors/queue-not-found.error.ts rename to src/lib/queue/errors/queue-queue-not-found.error.ts index b25ac1aa..69ea2ad1 100644 --- a/src/lib/queue/errors/queue-not-found.error.ts +++ b/src/lib/queue/errors/queue-queue-not-found.error.ts @@ -9,4 +9,4 @@ import { QueueError } from './queue.error.js'; -export class QueueNotFoundError extends QueueError {} +export class QueueQueueNotFoundError extends QueueError {} diff --git a/src/lib/queue/queue.ts b/src/lib/queue/queue.ts index 36e122e6..ddad329b 100644 --- a/src/lib/queue/queue.ts +++ b/src/lib/queue/queue.ts @@ -22,7 +22,7 @@ import { _deleteQueue } from './_/_delete-queue.js'; import { _getQueueProperties } from './_/_get-queue-properties.js'; import { _getQueues } from './_/_get-queues.js'; import { _parseQueueParams } from './_/_parse-queue-params.js'; -import { QueueExistsError } from './errors/index.js'; +import { QueueQueueExistsError } from './errors/index.js'; import { EQueueDeliveryModel, EQueueProperty, @@ -84,7 +84,7 @@ export class Queue { (err, reply) => { if (err) cb(err); else if (!reply) cb(new CallbackEmptyReplyError()); - else if (reply !== 'OK') cb(new QueueExistsError()); + else if (reply !== 'OK') cb(new QueueQueueExistsError()); else this.getProperties(queueParams, (err, properties) => { if (err) cb(err); diff --git a/tests/tests/consuming-messages/test00015.test.ts b/tests/tests/consuming-messages/test00015.test.ts index e5ab4fe2..0f1080fa 100644 --- a/tests/tests/consuming-messages/test00015.test.ts +++ b/tests/tests/consuming-messages/test00015.test.ts @@ -12,6 +12,7 @@ import bluebird from 'bluebird'; import { ICallback } from 'redis-smq-common'; import { Consumer, + ConsumerConsumeMessageHandlerAlreadyExistsError, EQueueDeliveryModel, EQueueType, IMessageTransferable, @@ -50,9 +51,7 @@ test('Consume message from different queues using a single consumer instance: ca 'another_queue', (msg: IMessageTransferable, cb: ICallback) => cb(), ), - ).rejects.toThrow( - `A message handler for queue [another_queue@testing] already exists`, - ); + ).rejects.toThrow(ConsumerConsumeMessageHandlerAlreadyExistsError); expect(consumer.getQueues()).toEqual([ { queueParams: { name: 'test_queue', ns: 'testing' }, groupId: null }, @@ -83,9 +82,7 @@ test('Consume message from different queues using a single consumer instance: ca 'another_queue', (msg: IMessageTransferable, cb: ICallback) => cb(), ), - ).rejects.toThrow( - `A message handler for queue [another_queue@testing] already exists`, - ); + ).rejects.toThrow(ConsumerConsumeMessageHandlerAlreadyExistsError); await consumer.cancelAsync('another_queue'); @@ -121,9 +118,7 @@ test('Consume message from different queues using a single consumer instance: ca 'queue_a', (msg: IMessageTransferable, cb: ICallback) => cb(), ), - ).rejects.toThrow( - `A message handler for queue [queue_a@testing] already exists`, - ); + ).rejects.toThrow(ConsumerConsumeMessageHandlerAlreadyExistsError); expect(consumer.getQueues()).toEqual([ { queueParams: { name: 'test_queue', ns: 'testing' }, groupId: null }, diff --git a/tests/tests/consuming-messages/test00036.test.ts b/tests/tests/consuming-messages/test00036.test.ts index 80eedb6d..e44277f4 100644 --- a/tests/tests/consuming-messages/test00036.test.ts +++ b/tests/tests/consuming-messages/test00036.test.ts @@ -12,7 +12,9 @@ import { EMessagePriority, EQueueDeliveryModel, EQueueType, - ProducerMessageNotPublishedError, + ProducerMessagePriorityRequiredError, + ProducerPriorityQueuingNotEnabledError, + ProducerQueueNotFoundError, ProducibleMessage, } from '../../../src/lib/index.js'; import { getProducer } from '../../common/producer.js'; @@ -41,23 +43,20 @@ test('Producing a message and expecting different kind of failures', async () => .setPriority(EMessagePriority.LOW); await producer.produceAsync(msg); } catch (e: unknown) { - const m = e instanceof ProducerMessageNotPublishedError ? e.message : ''; - expect(m).toBe('PRIORITY_QUEUING_NOT_ENABLED'); + expect(e instanceof ProducerPriorityQueuingNotEnabledError).toBe(true); } try { const msg1 = new ProducibleMessage().setQueue('test1').setBody('body'); await producer.produceAsync(msg1); } catch (e: unknown) { - const m = e instanceof ProducerMessageNotPublishedError ? e.message : ''; - expect(m).toBe('MESSAGE_PRIORITY_REQUIRED'); + expect(e instanceof ProducerMessagePriorityRequiredError).toBe(true); } try { const msg2 = new ProducibleMessage().setQueue('test2').setBody('body'); await producer.produceAsync(msg2); } catch (e: unknown) { - const m = e instanceof ProducerMessageNotPublishedError ? e.message : ''; - expect(m).toBe('QUEUE_NOT_FOUND'); + expect(e instanceof ProducerQueueNotFoundError).toBe(true); } }); diff --git a/tests/tests/deleting-messages/test00003.test.ts b/tests/tests/deleting-messages/test00003.test.ts index c580b931..aae58c13 100644 --- a/tests/tests/deleting-messages/test00003.test.ts +++ b/tests/tests/deleting-messages/test00003.test.ts @@ -8,7 +8,7 @@ */ import { test, expect } from '@jest/globals'; -import { MessageNotFoundError } from '../../../src/lib/index.js'; +import { MessageMessageNotFoundError } from '../../../src/lib/index.js'; import { getMessage } from '../../common/message.js'; import { createQueue, @@ -67,6 +67,6 @@ test('Combined test: Delete an acknowledged message. Check pending, acknowledged expect(count1.deadLettered).toBe(0); await expect(message.deleteMessageByIdAsync(messageId)).rejects.toThrow( - MessageNotFoundError, + MessageMessageNotFoundError, ); }); diff --git a/tests/tests/deleting-messages/test00004.test.ts b/tests/tests/deleting-messages/test00004.test.ts index f5cddf5f..56285097 100644 --- a/tests/tests/deleting-messages/test00004.test.ts +++ b/tests/tests/deleting-messages/test00004.test.ts @@ -8,7 +8,7 @@ */ import { test, expect } from '@jest/globals'; -import { MessageNotFoundError } from '../../../src/lib/index.js'; +import { MessageMessageNotFoundError } from '../../../src/lib/index.js'; import { getMessage } from '../../common/message.js'; import { createQueue, @@ -69,6 +69,6 @@ test('Combined test: Delete a dead-letter message. Check pending, acknowledged, expect(count1.deadLettered).toBe(0); await expect(message.deleteMessageByIdAsync(messageId)).rejects.toThrow( - MessageNotFoundError, + MessageMessageNotFoundError, ); }); diff --git a/tests/tests/deleting-messages/test00005.test.ts b/tests/tests/deleting-messages/test00005.test.ts index 2ef5ae82..b8b3481b 100644 --- a/tests/tests/deleting-messages/test00005.test.ts +++ b/tests/tests/deleting-messages/test00005.test.ts @@ -9,7 +9,10 @@ import { test, expect } from '@jest/globals'; import bluebird from 'bluebird'; -import { MessageNotFoundError } from '../../../src/lib/index.js'; +import { + MessageMessageInProcessError, + MessageMessageNotFoundError, +} from '../../../src/lib/index.js'; import { getConsumer } from '../../common/consumer.js'; import { getMessage } from '../../common/message.js'; import { @@ -40,7 +43,7 @@ test('Combined test: Delete a message being in process. Check pending, acknowled const message = await getMessage(); await expect(message.deleteMessageByIdAsync(messageId)).rejects.toThrow( - 'MESSAGE_IN_PROCESS', + MessageMessageInProcessError, ); await bluebird.delay(20000); @@ -56,7 +59,7 @@ test('Combined test: Delete a message being in process. Check pending, acknowled expect(count3.acknowledged).toBe(0); await expect(message.deleteMessageByIdAsync(messageId)).rejects.toThrow( - MessageNotFoundError, + MessageMessageNotFoundError, ); await message.shutdownAsync(); diff --git a/tests/tests/exchanges/direct-exchange/test00001.test.ts b/tests/tests/exchanges/direct-exchange/test00001.test.ts index 7d8e95c6..ed08510f 100644 --- a/tests/tests/exchanges/direct-exchange/test00001.test.ts +++ b/tests/tests/exchanges/direct-exchange/test00001.test.ts @@ -8,14 +8,20 @@ */ import { test, expect } from '@jest/globals'; -import { RedisKeysError } from '../../../../src/common/redis-keys/redis-keys.error.js'; +import { ExchangeInvalidQueueParamsError } from '../../../../src/lib/exchange/errors/exchange-invalid-queue-params.error.js'; import { getDirectExchange } from '../../../common/exchange.js'; test('DirectExchange', async () => { const e = getDirectExchange(); - await expect(e.getQueuesAsync('!@223333')).rejects.toThrow(RedisKeysError); - await expect(e.getQueuesAsync('223333.')).rejects.toThrow(RedisKeysError); - await expect(e.getQueuesAsync('223333.w')).rejects.toThrow(RedisKeysError); + await expect(e.getQueuesAsync('!@223333')).rejects.toThrow( + ExchangeInvalidQueueParamsError, + ); + await expect(e.getQueuesAsync('223333.')).rejects.toThrow( + ExchangeInvalidQueueParamsError, + ); + await expect(e.getQueuesAsync('223333.w')).rejects.toThrow( + ExchangeInvalidQueueParamsError, + ); await expect(e.getQueuesAsync('a223333.w')).resolves.not.toThrow(); await expect(e.getQueuesAsync('a223333.w_e')).resolves.not.toThrow(); await expect(e.getQueuesAsync('a223333.w-e')).resolves.not.toThrow(); diff --git a/tests/tests/exchanges/fanout-exchange/test00006.test.ts b/tests/tests/exchanges/fanout-exchange/test00006.test.ts index e784a317..49d4e932 100644 --- a/tests/tests/exchanges/fanout-exchange/test00006.test.ts +++ b/tests/tests/exchanges/fanout-exchange/test00006.test.ts @@ -9,7 +9,7 @@ import { test, expect } from '@jest/globals'; import { - ProducerMessageNotPublishedError, + ProducerExchangeNoMatchedQueueError, ProducibleMessage, } from '../../../../src/lib/index.js'; import { getProducer } from '../../../common/producer.js'; @@ -21,6 +21,6 @@ test('ExchangeFanOut: producing message having an exchange without matched queue const msg = new ProducibleMessage().setFanOut('fanout_a').setBody('hello'); await expect(producer.produceAsync(msg)).rejects.toThrow( - ProducerMessageNotPublishedError, + ProducerExchangeNoMatchedQueueError, ); }); diff --git a/tests/tests/exchanges/fanout-exchange/test00007.test.ts b/tests/tests/exchanges/fanout-exchange/test00007.test.ts index 9ce8d4dc..992e7cea 100644 --- a/tests/tests/exchanges/fanout-exchange/test00007.test.ts +++ b/tests/tests/exchanges/fanout-exchange/test00007.test.ts @@ -8,7 +8,12 @@ */ import { test, expect } from '@jest/globals'; -import { EQueueDeliveryModel, EQueueType } from '../../../../src/lib/index.js'; +import { + EQueueDeliveryModel, + EQueueType, + ExchangeFanOutExchangeHasBoundQueuesError, + ExchangeQueueIsNotBoundToExchangeError, +} from '../../../../src/lib/index.js'; import { getFanOutExchange } from '../../../common/exchange.js'; import { getQueue } from '../../../common/queue.js'; @@ -43,12 +48,12 @@ test('ExchangeFanOut: creating and deleting an exchange', async () => { expect(r6).toEqual([q1]); await expect(fanOutExchange.deleteExchangeAsync('e2')).rejects.toThrow( - `Exchange has 1 bound queue(s). Unbind all queues before deleting the exchange.`, + ExchangeFanOutExchangeHasBoundQueuesError, ); await fanOutExchange.unbindQueueAsync(q1, 'e2'); await expect(fanOutExchange.unbindQueueAsync(q1, 'e2')).rejects.toThrow( - `Queue ${q1.name}@${q1.ns} is not bound to [e2] exchange.`, + ExchangeQueueIsNotBoundToExchangeError, ); await fanOutExchange.deleteExchangeAsync('e2'); diff --git a/tests/tests/exchanges/topic-exchange/test00001.test.ts b/tests/tests/exchanges/topic-exchange/test00001.test.ts index 6c0cf57f..506f7e93 100644 --- a/tests/tests/exchanges/topic-exchange/test00001.test.ts +++ b/tests/tests/exchanges/topic-exchange/test00001.test.ts @@ -8,14 +8,20 @@ */ import { test, expect } from '@jest/globals'; -import { RedisKeysError } from '../../../../src/common/redis-keys/redis-keys.error.js'; +import { ExchangeInvalidTopicParamsError } from '../../../../src/lib/exchange/errors/exchange-invalid-topic-params.error.js'; import { getTopicExchange } from '../../../common/exchange.js'; test('ExchangeTopic: topic validation', async () => { const e = getTopicExchange(); - await expect(e.getQueuesAsync('!@223333')).rejects.toThrow(RedisKeysError); - await expect(e.getQueuesAsync('223333.')).rejects.toThrow(RedisKeysError); - await expect(e.getQueuesAsync('223333.w')).rejects.toThrow(RedisKeysError); + await expect(e.getQueuesAsync('!@223333')).rejects.toThrow( + ExchangeInvalidTopicParamsError, + ); + await expect(e.getQueuesAsync('223333.')).rejects.toThrow( + ExchangeInvalidTopicParamsError, + ); + await expect(e.getQueuesAsync('223333.w')).rejects.toThrow( + ExchangeInvalidTopicParamsError, + ); await expect(e.getQueuesAsync('a223333.w')).resolves.not.toThrow(); await expect(e.getQueuesAsync('a223333.w_e')).resolves.not.toThrow(); await expect(e.getQueuesAsync('a223333.w-e')).resolves.not.toThrow(); diff --git a/tests/tests/exchanges/topic-exchange/test00007.test.ts b/tests/tests/exchanges/topic-exchange/test00007.test.ts index 73d8f23f..cf4b95c2 100644 --- a/tests/tests/exchanges/topic-exchange/test00007.test.ts +++ b/tests/tests/exchanges/topic-exchange/test00007.test.ts @@ -9,7 +9,7 @@ import { test, expect } from '@jest/globals'; import { - ProducerMessageNotPublishedError, + ProducerExchangeNoMatchedQueueError, ProducibleMessage, } from '../../../../src/lib/index.js'; import { getProducer } from '../../../common/producer.js'; @@ -21,6 +21,6 @@ test('ExchangeTopic: producing message having an exchange without matched queues const msg = new ProducibleMessage().setTopic('a.b.c.d').setBody('hello'); await expect(producer.produceAsync(msg)).rejects.toThrow( - ProducerMessageNotPublishedError, + ProducerExchangeNoMatchedQueueError, ); }); diff --git a/tests/tests/misc/test00003.test.ts b/tests/tests/misc/test00003.test.ts index 79b69107..1e18f42d 100644 --- a/tests/tests/misc/test00003.test.ts +++ b/tests/tests/misc/test00003.test.ts @@ -9,7 +9,7 @@ import { test, expect } from '@jest/globals'; import { redisKeys } from '../../../src/common/redis-keys/redis-keys.js'; -import { RedisKeysError } from '../../../src/common/redis-keys/redis-keys.error.js'; +import { RedisKeysError } from '../../../src/common/redis-keys/errors/redis-keys.error.js'; test('redisKeys', async () => { expect(redisKeys.validateNamespace('global')).toBeInstanceOf(RedisKeysError); diff --git a/tests/tests/misc/test00004.test.ts b/tests/tests/misc/test00004.test.ts index c8b61b94..90038322 100644 --- a/tests/tests/misc/test00004.test.ts +++ b/tests/tests/misc/test00004.test.ts @@ -11,7 +11,7 @@ import { test, expect } from '@jest/globals'; import { MessageDestinationQueueAlreadySetError, MessageDestinationQueueRequiredError, - MessageExchangeRequiredError, + MessageMessageExchangeRequiredError, ProducibleMessage, } from '../../../src/lib/index.js'; import { MessageEnvelope } from '../../../src/lib/message/message-envelope.js'; @@ -26,5 +26,5 @@ test('MessageEnvelope: additional checks', async () => { expect(() => env.setDestinationQueue({ ns: 'ns1', name: 'queue2' })).toThrow( MessageDestinationQueueAlreadySetError, ); - expect(() => env.getExchange()).toThrow(MessageExchangeRequiredError); + expect(() => env.getExchange()).toThrow(MessageMessageExchangeRequiredError); }); diff --git a/tests/tests/misc/test00019.test.ts b/tests/tests/misc/test00019.test.ts index a10c6f4c..e94b8365 100644 --- a/tests/tests/misc/test00019.test.ts +++ b/tests/tests/misc/test00019.test.ts @@ -8,6 +8,8 @@ */ import { test, expect } from '@jest/globals'; +import { ConfigurationMessageQueueSizeError } from '../../../src/config/errors/configuration-message-queue-size.error.js'; +import { ConfigurationMessageStoreExpireError } from '../../../src/config/errors/configuration-message-store-expire.error.js'; import Store from '../../../src/config/messages/store.js'; test('Configuration: storeMessages', async () => { @@ -21,7 +23,7 @@ test('Configuration: storeMessages', async () => { }, }, }); - }).toThrow(`Parameter [queueSize] should be >= 0`); + }).toThrow(ConfigurationMessageQueueSizeError); expect(() => { Store({ @@ -33,7 +35,7 @@ test('Configuration: storeMessages', async () => { }, }, }); - }).toThrow(`Parameter [expire] should be >= 0`); + }).toThrow(ConfigurationMessageStoreExpireError); const config = Store({}); expect(config.deadLettered.store).toEqual(false); diff --git a/tests/tests/misc/test00020.test.ts b/tests/tests/misc/test00020.test.ts index dc072a93..5184fd81 100644 --- a/tests/tests/misc/test00020.test.ts +++ b/tests/tests/misc/test00020.test.ts @@ -8,31 +8,35 @@ */ import { test, expect } from '@jest/globals'; -import { EMessagePriority, ProducibleMessage } from '../../../src/lib/index.js'; +import { + EMessagePriority, + MessageMessagePropertyError, + ProducibleMessage, +} from '../../../src/lib/index.js'; test('ProducibleMessage', async () => { const msg = new ProducibleMessage(); expect(() => { msg.setScheduledRepeatPeriod(-1); - }).toThrow('Expected a positive integer value in milliseconds'); + }).toThrow(MessageMessagePropertyError); expect(() => { msg.setScheduledDelay(-1); - }).toThrow('Expected a positive integer value in milliseconds'); + }).toThrow(MessageMessagePropertyError); expect(() => { msg.setScheduledRepeat(-1); - }).toThrow('Expected a positive integer value >= 0'); + }).toThrow(MessageMessagePropertyError); expect(() => { msg.setTTL(-1); - }).toThrow('Expected a positive integer value in milliseconds >= 0'); + }).toThrow(MessageMessagePropertyError); expect(() => { msg.setConsumeTimeout(-1); - }).toThrow('Expected a positive integer value in milliseconds >= 0'); + }).toThrow(MessageMessagePropertyError); expect(() => { msg.setRetryThreshold(-1); - }).toThrow('Retry threshold should be a positive integer >= 0'); + }).toThrow(MessageMessagePropertyError); expect(() => { msg.setRetryDelay(-1); - }).toThrow('Expected a positive integer in milliseconds >= 0'); + }).toThrow(MessageMessagePropertyError); msg.setPriority(EMessagePriority.HIGHEST); expect(msg.getPriority()).toBe(EMessagePriority.HIGHEST); diff --git a/tests/tests/purging-queues/test00006.test.ts b/tests/tests/purging-queues/test00006.test.ts index 3d465f16..b37937a7 100644 --- a/tests/tests/purging-queues/test00006.test.ts +++ b/tests/tests/purging-queues/test00006.test.ts @@ -9,9 +9,9 @@ import { test, expect } from '@jest/globals'; import { - QueueHasRunningConsumersError, - QueueNotEmptyError, - QueueNotFoundError, + QueueQueueHasRunningConsumersError, + QueueQueueNotEmptyError, + QueueQueueNotFoundError, } from '../../../src/lib/index.js'; import { shutDownBaseInstance } from '../../common/base-instance.js'; import { @@ -33,20 +33,20 @@ test('Deleting a message queue with all of its data', async () => { const q = await getQueue(); - await expect(q.deleteAsync(queue)).rejects.toThrow(QueueNotEmptyError); + await expect(q.deleteAsync(queue)).rejects.toThrow(QueueQueueNotEmptyError); await queueMessages.purgeAsync(defaultQueue); await expect(q.deleteAsync(queue)).rejects.toThrow( - QueueHasRunningConsumersError, + QueueQueueHasRunningConsumersError, ); await shutDownBaseInstance(consumer); await q.deleteAsync(queue); await expect(queueMessages.countMessagesByStatusAsync(queue)).rejects.toThrow( - QueueNotFoundError, + QueueQueueNotFoundError, ); - await expect(q.deleteAsync(queue)).rejects.toThrow(QueueNotFoundError); + await expect(q.deleteAsync(queue)).rejects.toThrow(QueueQueueNotFoundError); }); diff --git a/tests/tests/purging-queues/test00008.test.ts b/tests/tests/purging-queues/test00008.test.ts index 064d0ed1..6c286853 100644 --- a/tests/tests/purging-queues/test00008.test.ts +++ b/tests/tests/purging-queues/test00008.test.ts @@ -11,9 +11,9 @@ import { test, expect } from '@jest/globals'; import { IQueueParams, NamespaceNotFoundError, - QueueHasRunningConsumersError, - QueueNotEmptyError, - QueueNotFoundError, + QueueQueueHasRunningConsumersError, + QueueQueueNotEmptyError, + QueueQueueNotFoundError, } from '../../../src/lib/index.js'; import { createQueue, @@ -52,35 +52,35 @@ test('Combined: Fetching namespaces, deleting a namespace with its message queue await c1.shutdownAsync(); await c2.shutdownAsync(); - await expect(ns.deleteAsync('ns1')).rejects.toThrow(QueueNotEmptyError); + await expect(ns.deleteAsync('ns1')).rejects.toThrow(QueueQueueNotEmptyError); await qm.purgeAsync(queueA); - await expect(ns.deleteAsync('ns1')).rejects.toThrow(QueueNotEmptyError); + await expect(ns.deleteAsync('ns1')).rejects.toThrow(QueueQueueNotEmptyError); await qm.purgeAsync(queueB); await c1.runAsync(); await c2.runAsync(); await expect(ns.deleteAsync('ns1')).rejects.toThrow( - QueueHasRunningConsumersError, + QueueQueueHasRunningConsumersError, ); await c1.shutdownAsync(); await expect(ns.deleteAsync('ns1')).rejects.toThrow( - QueueHasRunningConsumersError, + QueueQueueHasRunningConsumersError, ); await c2.shutdownAsync(); await ns.deleteAsync('ns1'); await expect(qm.countMessagesByStatusAsync(queueA)).rejects.toThrow( - QueueNotFoundError, + QueueQueueNotFoundError, ); await expect(qm.countMessagesByStatusAsync(queueB)).rejects.toThrow( - QueueNotFoundError, + QueueQueueNotFoundError, ); const m5 = await ns.getNamespacesAsync(); diff --git a/tests/tests/queue-consumer-groups/test00002.test.ts b/tests/tests/queue-consumer-groups/test00002.test.ts index 4b49d17d..11869ba1 100644 --- a/tests/tests/queue-consumer-groups/test00002.test.ts +++ b/tests/tests/queue-consumer-groups/test00002.test.ts @@ -11,7 +11,7 @@ import { test, expect } from '@jest/globals'; import bluebird from 'bluebird'; import { Consumer, - ConsumerGroupIdRequiredError, + ConsumerConsumerGroupIdRequiredError, ConsumerGroups, EMessagePriority, EQueueDeliveryModel, @@ -69,7 +69,7 @@ test('Publish and consume a message to/from a consumer group', async () => { const pendingMessages = await getQueuePendingMessages(); await expect(pendingMessages.getMessagesAsync(queue1, 1, 10)).rejects.toThrow( - ConsumerGroupIdRequiredError, + ConsumerConsumerGroupIdRequiredError, ); const messages = await pendingMessages.getMessagesAsync( diff --git a/tests/tests/queue-consumer-groups/test00006.test.ts b/tests/tests/queue-consumer-groups/test00006.test.ts index 6eda16e3..3a3325d7 100644 --- a/tests/tests/queue-consumer-groups/test00006.test.ts +++ b/tests/tests/queue-consumer-groups/test00006.test.ts @@ -11,7 +11,7 @@ import { expect, test } from '@jest/globals'; import bluebird from 'bluebird'; import { Consumer, - ConsumerGroupIdNotSupportedError, + ConsumerConsumerGroupIdNotSupportedError, EQueueDeliveryModel, EQueueType, IQueueParams, @@ -38,7 +38,7 @@ test('ConsumerGroupIdNotSupportedError', async () => { ); await expect(consumer1.runAsync()).rejects.toThrow( - ConsumerGroupIdNotSupportedError, + ConsumerConsumerGroupIdNotSupportedError, ); await consumer1.shutdownAsync(); }); diff --git a/tests/tests/queue-consumer-groups/test00007.test.ts b/tests/tests/queue-consumer-groups/test00007.test.ts index e250355c..70ce4b43 100644 --- a/tests/tests/queue-consumer-groups/test00007.test.ts +++ b/tests/tests/queue-consumer-groups/test00007.test.ts @@ -9,13 +9,14 @@ import { expect, test } from '@jest/globals'; import bluebird from 'bluebird'; -import { RedisKeysError } from '../../../src/common/redis-keys/redis-keys.error.js'; import { Consumer, ConsumerGroups, + ConsumerGroupsInvalidGroupIdError, EQueueDeliveryModel, EQueueType, IQueueParams, + QueueInvalidQueueParameterError, } from '../../../src/lib/index.js'; import { getQueue } from '../../common/queue.js'; @@ -38,12 +39,12 @@ test('Consumer group ID validation', async () => { { queue: queue1, groupId: 'my-group-1!' }, (msg, cb) => cb(), ), - ).rejects.toThrow(RedisKeysError); + ).rejects.toThrow(QueueInvalidQueueParameterError); const consumerGroups = bluebird.promisifyAll(new ConsumerGroups()); await expect( consumerGroups.saveConsumerGroupAsync(queue1, 'my-group-1!'), - ).rejects.toThrow(RedisKeysError); + ).rejects.toThrow(ConsumerGroupsInvalidGroupIdError); await consumerGroups.shutdownAsync(); await consumer1.shutdownAsync(); diff --git a/tests/tests/queue-rate-limit/test00028.test.ts b/tests/tests/queue-rate-limit/test00028.test.ts index 618b1dfe..b74627eb 100644 --- a/tests/tests/queue-rate-limit/test00028.test.ts +++ b/tests/tests/queue-rate-limit/test00028.test.ts @@ -8,6 +8,8 @@ */ import { test, expect } from '@jest/globals'; +import { QueueRateLimitInvalidIntervalError } from '../../../src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-interval.error.js'; +import { QueueRateLimitInvalidLimitError } from '../../../src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-limit.error.js'; import { defaultQueue } from '../../common/message-producing-consuming.js'; import { getQueueRateLimit } from '../../common/queue-rate-limit.js'; @@ -31,14 +33,12 @@ test('SetQueueRateLimit()/GetQueueRateLimit()/ClearQueueRateLimit()', async () = limit: 0, interval: 1000, }), - ).rejects.toThrow(`Invalid rateLimit.limit. Expected a positive integer > 0`); + ).rejects.toThrow(QueueRateLimitInvalidLimitError); await expect( queueRateLimit.setAsync(defaultQueue, { limit: 4, interval: 0, }), - ).rejects.toThrow( - `Invalid rateLimit.interval. Expected a positive integer >= 1000`, - ); + ).rejects.toThrow(QueueRateLimitInvalidIntervalError); }); diff --git a/tests/tests/queue-scheduled-messages/test00010.test.ts b/tests/tests/queue-scheduled-messages/test00010.test.ts index 06fdde69..88404bb9 100644 --- a/tests/tests/queue-scheduled-messages/test00010.test.ts +++ b/tests/tests/queue-scheduled-messages/test00010.test.ts @@ -12,7 +12,9 @@ import { EMessagePriority, EQueueDeliveryModel, EQueueType, - ProducerMessageNotScheduledError, + ProducerMessagePriorityRequiredError, + ProducerPriorityQueuingNotEnabledError, + ProducerQueueNotFoundError, ProducibleMessage, } from '../../../src/lib/index.js'; import { getProducer } from '../../common/producer.js'; @@ -42,8 +44,7 @@ test('Scheduling a message and expecting different kind of failures', async () = .setScheduledCRON('* * * * * *'); await producer.produceAsync(msg); } catch (e: unknown) { - const m = e instanceof ProducerMessageNotScheduledError ? e.message : ''; - expect(m).toBe('PRIORITY_QUEUING_NOT_ENABLED'); + expect(e instanceof ProducerPriorityQueuingNotEnabledError).toBe(true); } try { @@ -53,8 +54,7 @@ test('Scheduling a message and expecting different kind of failures', async () = .setScheduledCRON('* * * * * *'); await producer.produceAsync(msg1); } catch (e: unknown) { - const m = e instanceof ProducerMessageNotScheduledError ? e.message : ''; - expect(m).toBe('MESSAGE_PRIORITY_REQUIRED'); + expect(e instanceof ProducerMessagePriorityRequiredError).toBe(true); } try { @@ -64,7 +64,6 @@ test('Scheduling a message and expecting different kind of failures', async () = .setScheduledCRON('* * * * * *'); await producer.produceAsync(msg2); } catch (e: unknown) { - const m = e instanceof ProducerMessageNotScheduledError ? e.message : ''; - expect(m).toBe('QUEUE_NOT_FOUND'); + expect(e instanceof ProducerQueueNotFoundError).toBe(true); } }); diff --git a/tests/tests/requeuing-messages/test00001.test.ts b/tests/tests/requeuing-messages/test00001.test.ts index d0312a20..b653ed94 100644 --- a/tests/tests/requeuing-messages/test00001.test.ts +++ b/tests/tests/requeuing-messages/test00001.test.ts @@ -8,13 +8,14 @@ */ import { test, expect } from '@jest/globals'; -import { MessageRequeueError } from '../../../src/lib/index.js'; +import { MessageMessageNotRequeuableError } from '../../../src/lib/message/errors/message-message-not-requeuable.error.js'; import { shutDownBaseInstance } from '../../common/base-instance.js'; import { createQueue, defaultQueue, produceAndDeadLetterMessage, } from '../../common/message-producing-consuming.js'; +import { getMessage } from '../../common/message.js'; import { getQueueDeadLetteredMessages } from '../../common/queue-dead-lettered-messages.js'; import { getQueueMessages } from '../../common/queue-messages.js'; import { getQueuePendingMessages } from '../../common/queue-pending-messages.js'; @@ -24,8 +25,8 @@ test('Combined test: Requeue a message from dead-letter queue. Check queue metri const { messageId, queue, consumer } = await produceAndDeadLetterMessage(); await shutDownBaseInstance(consumer); - const deadLetteredMessages = await getQueueDeadLetteredMessages(); - await deadLetteredMessages.requeueMessageAsync(queue, messageId); + const message = await getMessage(); + await message.requeueMessageByIdAsync(messageId); const pendingMessages = await getQueuePendingMessages(); const res2 = await pendingMessages.getMessagesAsync(queue, 0, 100); @@ -33,6 +34,7 @@ test('Combined test: Requeue a message from dead-letter queue. Check queue metri expect(res2.items.length).toBe(1); expect(res2.items[0].id).toEqual(messageId); + const deadLetteredMessages = await getQueueDeadLetteredMessages(); const res3 = await deadLetteredMessages.getMessagesAsync(queue, 0, 100); expect(res3.totalItems).toBe(0); expect(res3.items.length).toBe(0); @@ -42,7 +44,7 @@ test('Combined test: Requeue a message from dead-letter queue. Check queue metri expect(count.deadLettered).toBe(0); expect(count.pending).toBe(1); - await expect( - deadLetteredMessages.requeueMessageAsync(queue, messageId), - ).rejects.toThrow(MessageRequeueError); + await expect(message.requeueMessageByIdAsync(messageId)).rejects.toThrow( + MessageMessageNotRequeuableError, + ); }); diff --git a/tests/tests/requeuing-messages/test00002.test.ts b/tests/tests/requeuing-messages/test00002.test.ts index 02efc282..25f0b0ae 100644 --- a/tests/tests/requeuing-messages/test00002.test.ts +++ b/tests/tests/requeuing-messages/test00002.test.ts @@ -8,13 +8,14 @@ */ import { test, expect } from '@jest/globals'; -import { MessageRequeueError } from '../../../src/lib/index.js'; +import { MessageMessageNotRequeuableError } from '../../../src/lib/message/errors/message-message-not-requeuable.error.js'; import { shutDownBaseInstance } from '../../common/base-instance.js'; import { createQueue, defaultQueue, produceAndAcknowledgeMessage, } from '../../common/message-producing-consuming.js'; +import { getMessage } from '../../common/message.js'; import { getQueueAcknowledgedMessages } from '../../common/queue-acknowledged-messages.js'; import { getQueueMessages } from '../../common/queue-messages.js'; import { getQueuePendingMessages } from '../../common/queue-pending-messages.js'; @@ -24,9 +25,10 @@ test('Combined test. Requeue a message from acknowledged queue. Check queue metr const { messageId, queue, consumer } = await produceAndAcknowledgeMessage(); await shutDownBaseInstance(consumer); - const acknowledgedMessages = await getQueueAcknowledgedMessages(); - await acknowledgedMessages.requeueMessageAsync(queue, messageId); + const message = await getMessage(); + await message.requeueMessageByIdAsync(messageId); + const acknowledgedMessages = await getQueueAcknowledgedMessages(); const pendingMessages = await getQueuePendingMessages(); const res2 = await pendingMessages.getMessagesAsync(queue, 0, 100); expect(res2.totalItems).toBe(1); @@ -42,7 +44,7 @@ test('Combined test. Requeue a message from acknowledged queue. Check queue metr expect(count.acknowledged).toBe(0); expect(count.pending).toBe(1); - await expect( - acknowledgedMessages.requeueMessageAsync(queue, messageId), - ).rejects.toThrow(MessageRequeueError); + await expect(message.requeueMessageByIdAsync(messageId)).rejects.toThrow( + MessageMessageNotRequeuableError, + ); }); diff --git a/tests/tests/requeuing-messages/test00003.test.ts b/tests/tests/requeuing-messages/test00003.test.ts index c5c76031..d0257b4d 100644 --- a/tests/tests/requeuing-messages/test00003.test.ts +++ b/tests/tests/requeuing-messages/test00003.test.ts @@ -14,12 +14,13 @@ import { EQueueDeliveryModel, EQueueType, IMessageTransferable, - MessageRequeueError, + MessageMessageNotRequeuableError, ProducibleMessage, } from '../../../src/lib/index.js'; import { getConsumer } from '../../common/consumer.js'; import { untilMessageAcknowledged } from '../../common/events.js'; import { defaultQueue } from '../../common/message-producing-consuming.js'; +import { getMessage } from '../../common/message.js'; import { getProducer } from '../../common/producer.js'; import { getQueue } from '../../common/queue.js'; import { getQueueAcknowledgedMessages } from '../../common/queue-acknowledged-messages.js'; @@ -43,8 +44,8 @@ test('Combined test. Requeue a priority message from acknowledged queue. Check q ), }); - const message = new ProducibleMessage(); - message + const msg = new ProducibleMessage(); + msg .setBody({ hello: 'world' }) .setQueue(defaultQueue) .setPriority(EMessagePriority.ABOVE_NORMAL); @@ -52,7 +53,7 @@ test('Combined test. Requeue a priority message from acknowledged queue. Check q const producer = getProducer(); await producer.runAsync(); - const [id] = await producer.produceAsync(message); + const [id] = await producer.produceAsync(msg); consumer.run(() => void 0); await untilMessageAcknowledged(consumer); @@ -71,7 +72,8 @@ test('Combined test. Requeue a priority message from acknowledged queue. Check q expect(count.pending).toBe(0); expect(count.acknowledged).toBe(1); - await acknowledgedMessages.requeueMessageAsync(defaultQueue, id); + const message = await getMessage(); + await message.requeueMessageByIdAsync(id); const count2 = await queueMessages.countMessagesByStatusAsync(defaultQueue); expect(count2.pending).toBe(1); @@ -91,7 +93,7 @@ test('Combined test. Requeue a priority message from acknowledged queue. Check q expect(res7.items.length).toBe(1); expect(res7.items[0].id).toEqual(id); - await expect( - acknowledgedMessages.requeueMessageAsync(defaultQueue, id), - ).rejects.toThrow(MessageRequeueError); + await expect(message.requeueMessageByIdAsync(id)).rejects.toThrow( + MessageMessageNotRequeuableError, + ); });