From 92d5697631143cf1727973124b3cc240247da403 Mon Sep 17 00:00:00 2001 From: Seshathri-saravanan Date: Wed, 17 Dec 2025 17:49:52 +0530 Subject: [PATCH 1/3] using aws sdk v3 --- lib/index.d.ts | 35 +++++++------------ lib/index.js | 60 +++++++++++++++++--------------- package.json | 5 +-- src/index.ts | 89 +++++++++++++++++++++++++---------------------- test/push.spec.js | 2 +- test/sub.spec.js | 29 ++++++++------- 6 files changed, 108 insertions(+), 112 deletions(-) diff --git a/lib/index.d.ts b/lib/index.d.ts index b6df273..af12be1 100644 --- a/lib/index.d.ts +++ b/lib/index.d.ts @@ -1,8 +1,5 @@ -/// -import AWS, { AWSError } from "aws-sdk"; -import EventEmitter from "events"; -import { QueueAttributeMap, SendMessageBatchResult, SendMessageResult } from "aws-sdk/clients/sqs"; -import { PromiseResult } from "aws-sdk/lib/request"; +import { SendMessageBatchResult, SendMessageResult } from "@aws-sdk/client-sqs"; +import { EventEmitter } from "events"; export default class SQS { private name; private emitter; @@ -37,7 +34,7 @@ export default class SQS { * @param {String} opts.FifoQueue='false' [Use FIFO (true) or Standard queue (false)] * @return {Promise} */ - createQueue(name: string, opts?: QueueAttributeMap): Promise; + createQueue(name: string, opts?: Record): Promise; /** * Publish on SQS * @param {string} name @@ -48,7 +45,7 @@ export default class SQS { * @param {Number} [options.delay] [in seconds] * @return {Promise} */ - publish(name: string, content: Record, meta?: Record, handle?: boolean, options?: Record): Promise>; + publish(name: string, content: Record, meta?: Record, handle?: boolean, options?: Record): Promise; /** * Publish on SQS in batch * @param {string} name @@ -59,8 +56,8 @@ export default class SQS { * @param {Number} [options.delay] [in seconds] * @return {Promise} */ - publishBatch(name: string, contentList: Record[], meta?: Record, handle?: boolean, options?: Record): Promise>; - publishFifo(name: string, content: Record, meta: Record, group: Record, handle?: boolean): Promise>; + publishBatch(name: string, contentList: Record[], meta?: Record, handle?: boolean, options?: Record): Promise; + publishFifo(name: string, content: Record, meta: Record, group: Record, handle?: boolean): Promise; getQueueUrl(name: string): string; /** * Subscribe to a queue, using long polling @@ -68,22 +65,14 @@ export default class SQS { subscribe(name: string, cb: (arg1: Record, arg2: Record) => void, opts?: Record): Promise; deleteMessage(name: string, messageId: any, handle: string): Promise; returnMessage(name: string, messageId: any, handle: string): Promise; - fetchMessages(name: string, number?: number): Promise<({ + fetchMessages(name: string, number?: number): Promise<{ data: any; - ack: () => Promise; - nack: () => Promise; - id?: undefined; - handle?: undefined; - queueAttributes?: undefined; - messageAttributes?: undefined; - } | { id: string; handle: string; - queueAttributes: AWS.SQS.MessageSystemAttributeMap; - messageAttributes: AWS.SQS.MessageBodyAttributeMap; - data?: undefined; - ack?: undefined; - nack?: undefined; - })[][]>; + queueAttributes: Partial>; + messageAttributes: Record; + ack: () => Promise; + nack: () => Promise; + }[]>; fetchOne(name: string): Promise; } diff --git a/lib/index.js b/lib/index.js index 77b5e01..1fa9608 100644 --- a/lib/index.js +++ b/lib/index.js @@ -3,7 +3,7 @@ var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; Object.defineProperty(exports, "__esModule", { value: true }); -const aws_sdk_1 = __importDefault(require("aws-sdk")); +const client_sqs_1 = require("@aws-sdk/client-sqs"); const safely_parse_json_1 = __importDefault(require("safely-parse-json")); const sqs_consumer_1 = require("sqs-consumer"); const uuid_1 = require("uuid"); @@ -66,7 +66,7 @@ class SQS { ...(region && { region }), ...(accountId && { accountId }) }; - this.client = new aws_sdk_1.default.SQS(this.config); + this.client = new client_sqs_1.SQSClient(this.config); this.log(`Connected on SQS:${this.name}`, this.config); return this; } @@ -95,12 +95,11 @@ class SQS { // FifoQueue: 'false', // use standard by default }, opts); try { - const response = await this.client - .createQueue({ + const command = new client_sqs_1.CreateQueueCommand({ QueueName: name, Attributes: options, - }) - .promise(); + }); + const response = await this.client.send(command); const queueUrl = response.QueueUrl; const message = `Created queue ${name} => ${queueUrl}`; this.log(message, { name, queueUrl }); @@ -130,7 +129,8 @@ class SQS { params.DelaySeconds = options.delay; } try { - const res = await this.client.sendMessage(params).promise(); + const command = new client_sqs_1.SendMessageCommand(params); + const res = await this.client.send(command); return res; } catch (err) { @@ -169,7 +169,8 @@ class SQS { })), }; try { - const res = await this.client.sendMessageBatch(params).promise(); + const command = new client_sqs_1.SendMessageBatchCommand(params); + const res = await this.client.send(command); return res; } catch (err) { @@ -192,7 +193,8 @@ class SQS { MessageDeduplicationId: group.id, }; try { - const res = await this.client.sendMessage(params).promise(); + const command = new client_sqs_1.SendMessageCommand(params); + const res = await this.client.send(command); return res; } catch (err) { @@ -259,7 +261,8 @@ class SQS { QueueUrl: queueUrl, ReceiptHandle: handle, }; - await this.client.deleteMessage(params).promise(); + const command = new client_sqs_1.DeleteMessageCommand(params); + await this.client.send(command); } async returnMessage(name, messageId, handle) { const queueUrl = this.getQueueUrl(name); @@ -268,7 +271,8 @@ class SQS { ReceiptHandle: handle, VisibilityTimeout: 0, }; - await this.client.changeMessageVisibility(params).promise(); + const command = new client_sqs_1.ChangeMessageVisibilityCommand(params); + await this.client.send(command); } async fetchMessages(name, number = 10) { const queueUrl = this.getQueueUrl(name); @@ -276,25 +280,23 @@ class SQS { QueueUrl: queueUrl, MaxNumberOfMessages: number, }; - const res = await this.client.receiveMessage(params).promise(); - return res.Messages.map((msg) => { - return [ - { - data: (0, safely_parse_json_1.default)(msg.Body), - ack: () => { - return this.deleteMessage(name, msg.MessageId, msg.ReceiptHandle); - }, - nack: () => { - return this.returnMessage(name, msg.MessageId, msg.ReceiptHandle); - }, + const command = new client_sqs_1.ReceiveMessageCommand(params); + const res = await this.client.send(command); + const messages = res.Messages || []; + return messages.map((msg) => { + return { + data: (0, safely_parse_json_1.default)(msg.Body), + id: msg.MessageId, + handle: msg.ReceiptHandle, + queueAttributes: msg.Attributes, + messageAttributes: msg.MessageAttributes, + ack: () => { + return this.deleteMessage(name, msg.MessageId, msg.ReceiptHandle); }, - { - id: msg.MessageId, - handle: msg.ReceiptHandle, - queueAttributes: msg.Attributes, - messageAttributes: msg.MessageAttributes, + nack: () => { + return this.returnMessage(name, msg.MessageId, msg.ReceiptHandle); }, - ]; + }; }); } async fetchOne(name) { @@ -303,4 +305,4 @@ class SQS { } } exports.default = SQS; -//# sourceMappingURL=data:application/json;base64, \ No newline at end of file +//# sourceMappingURL=data:application/json;base64, \ No newline at end of file diff --git a/package.json b/package.json index 96c4231..4864604 100644 --- a/package.json +++ b/package.json @@ -33,15 +33,12 @@ "eslint-import-resolver-typescript": "^2.4.0", "eslint-plugin-es5": "^1.5.0", "eslint-plugin-import": "^2.23.4", - "typescript": "^4.4.4" + "typescript": "^5.0.0" }, "dependencies": { "@aws-sdk/client-sqs": "^3.600.0", "safely-parse-json": "0.1.0", "sqs-consumer": "5.8.0", "uuid": "^9.0.0" - }, - "peerDependencies": { - "aws-sdk": "^2.1403.0" } } \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index f185371..a726501 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,19 +1,22 @@ -import AWS, { AWSError } from "aws-sdk"; -import safeJSON from "safely-parse-json"; -import { Consumer } from "sqs-consumer"; -import EventEmitter from "events"; import { - QueueAttributeMap, + SQSClient, + CreateQueueCommand, + SendMessageCommand, + SendMessageBatchCommand, + DeleteMessageCommand, + ChangeMessageVisibilityCommand, + ReceiveMessageCommand, SendMessageBatchResult, SendMessageRequest, SendMessageResult, -} from "aws-sdk/clients/sqs"; -import { PromiseResult } from "aws-sdk/lib/request"; -import { ChangeMessageVisibilityRequest, DeleteMessageRequest, SendMessageBatchRequest, + CreateQueueCommandInput, } from "@aws-sdk/client-sqs"; +import safeJSON from "safely-parse-json"; +import { Consumer } from "sqs-consumer"; +import { EventEmitter } from "events"; import { v4 as uuidv4 } from "uuid"; import { Agent } from 'https'; @@ -21,7 +24,7 @@ export default class SQS { private name: string; private emitter: EventEmitter; private config: Record; - private client: AWS.SQS; + private client: SQSClient; private queues: Record; constructor(name: string, emitter: EventEmitter, config: Record = {}) { @@ -83,7 +86,7 @@ export default class SQS { ...(region && { region }), ...(accountId && { accountId }) }; - this.client = new AWS.SQS(this.config); + this.client = new SQSClient(this.config); this.log(`Connected on SQS:${this.name}`, this.config); return this; } catch (err: any) { @@ -107,7 +110,7 @@ export default class SQS { * @param {String} opts.FifoQueue='false' [Use FIFO (true) or Standard queue (false)] * @return {Promise} */ - async createQueue(name: string, opts: QueueAttributeMap = {}): Promise { + async createQueue(name: string, opts: Record = {}): Promise { const options = Object.assign( { // FifoQueue: 'false', // use standard by default @@ -115,12 +118,11 @@ export default class SQS { opts ); try { - const response = await this.client - .createQueue({ - QueueName: name, - Attributes: options, - }) - .promise(); + const command = new CreateQueueCommand({ + QueueName: name, + Attributes: options, + }); + const response = await this.client.send(command); const queueUrl = response.QueueUrl; const message = `Created queue ${name} => ${queueUrl}`; this.log(message, { name, queueUrl }); @@ -147,7 +149,7 @@ export default class SQS { meta: Record = {}, handle: boolean = true, options: Record = {} - ): Promise> { + ): Promise { const params: SendMessageRequest = { QueueUrl: this.getQueueUrl(name), MessageBody: JSON.stringify({ content, meta }), @@ -158,7 +160,8 @@ export default class SQS { } try { - const res = await this.client.sendMessage(params).promise(); + const command = new SendMessageCommand(params); + const res = await this.client.send(command); return res; } catch (err) { this.error(err, { @@ -189,7 +192,7 @@ export default class SQS { meta: Record = {}, handle: boolean = true, options: Record = {} - ): Promise> { + ): Promise { let DelaySeconds: number | undefined; if (typeof options.delay === "number") { DelaySeconds = options.delay; @@ -205,7 +208,8 @@ export default class SQS { }; try { - const res = await this.client.sendMessageBatch(params).promise(); + const command = new SendMessageBatchCommand(params); + const res = await this.client.send(command); return res; } catch (err) { this.error(err, { @@ -226,7 +230,7 @@ export default class SQS { meta: Record = {}, group: Record, handle: boolean = true - ): Promise> { + ): Promise { const params: SendMessageRequest = { QueueUrl: this.getQueueUrl(name), MessageBody: JSON.stringify({ content, meta }), @@ -234,7 +238,8 @@ export default class SQS { MessageDeduplicationId: group.id, }; try { - const res = await this.client.sendMessage(params).promise(); + const command = new SendMessageCommand(params); + const res = await this.client.send(command); return res; } catch (err) { this.error(err, { @@ -316,7 +321,8 @@ export default class SQS { ReceiptHandle: handle, }; - await this.client.deleteMessage(params).promise(); + const command = new DeleteMessageCommand(params); + await this.client.send(command); } async returnMessage(name: string, messageId: any, handle: string) { @@ -326,7 +332,8 @@ export default class SQS { ReceiptHandle: handle, VisibilityTimeout: 0, }; - await this.client.changeMessageVisibility(params).promise(); + const command = new ChangeMessageVisibilityCommand(params); + await this.client.send(command); } async fetchMessages(name: string, number = 10) { @@ -335,25 +342,23 @@ export default class SQS { QueueUrl: queueUrl, MaxNumberOfMessages: number, }; - const res = await this.client.receiveMessage(params).promise(); - return res.Messages.map((msg) => { - return [ - { - data: safeJSON(msg.Body), - ack: () => { - return this.deleteMessage(name, msg.MessageId, msg.ReceiptHandle); - }, - nack: () => { - return this.returnMessage(name, msg.MessageId, msg.ReceiptHandle); - }, + const command = new ReceiveMessageCommand(params); + const res = await this.client.send(command); + const messages = res.Messages || []; + return messages.map((msg) => { + return { + data: safeJSON(msg.Body), + id: msg.MessageId, + handle: msg.ReceiptHandle, + queueAttributes: msg.Attributes, + messageAttributes: msg.MessageAttributes, + ack: () => { + return this.deleteMessage(name, msg.MessageId, msg.ReceiptHandle); }, - { - id: msg.MessageId, - handle: msg.ReceiptHandle, - queueAttributes: msg.Attributes, - messageAttributes: msg.MessageAttributes, + nack: () => { + return this.returnMessage(name, msg.MessageId, msg.ReceiptHandle); }, - ]; + }; }); } diff --git a/test/push.spec.js b/test/push.spec.js index 48f8c5d..8cba7c2 100644 --- a/test/push.spec.js +++ b/test/push.spec.js @@ -2,7 +2,7 @@ const EventEmitter = require('events').EventEmitter; const { v4: uuidv4 } = require('uuid'); -const SQS = require('../src/index'); +const SQS = require('../lib/index').default; const emitter = new EventEmitter(); emitter.on('error', console.error.bind(console)); diff --git a/test/sub.spec.js b/test/sub.spec.js index 4867fe0..ad563e7 100644 --- a/test/sub.spec.js +++ b/test/sub.spec.js @@ -1,9 +1,7 @@ /* eslint no-console: 0 */ - const EventEmitter = require('events').EventEmitter; - -const SQS = require('../src/index'); +const SQS = require('../lib/index').default; const emitter = new EventEmitter(); emitter.on('error', console.error.bind(console)); @@ -15,21 +13,26 @@ process.on('uncaughtExceptionMonitor', console.log); const sqs = new SQS('sqs', emitter); -async function sub() { +async function fetch() { await sqs.init(); await sqs.createQueue('test.fifo', { FifoQueue: 'true', }); - await sqs.subscribe('test.fifo', (msg) => { - console.log(msg); - setTimeout(() => { - msg.ack(); - }, 1000); - }, { - maxInProgress: 2, - }); + // Fetch messages from the queue + const messages = await sqs.fetchMessages('test.fifo', 5); + console.log(`Fetched ${messages.length} messages`); + + // Process each message + for (const msg of messages) { + console.log('Message:', msg.data); + console.log('Message ID:', msg.id); + + // Acknowledge the message after processing + await msg.ack(); + console.log('Message acknowledged'); + } } -sub().then(() => console.log('Subsciption')).catch(console.error.bind(console)); +fetch().then(() => console.log('Fetch completed')).catch(console.error.bind(console)); From f5309ba6e2c853737e9d2f28c4ad028edc9f93c8 Mon Sep 17 00:00:00 2001 From: Seshathri-saravanan Date: Wed, 17 Dec 2025 17:59:02 +0530 Subject: [PATCH 2/3] fixed types --- lib/index.d.ts | 8 ++++---- lib/index.js | 5 ++++- src/index.ts | 11 +++++------ 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/lib/index.d.ts b/lib/index.d.ts index af12be1..f21db98 100644 --- a/lib/index.d.ts +++ b/lib/index.d.ts @@ -1,4 +1,4 @@ -import { SendMessageBatchResult, SendMessageResult } from "@aws-sdk/client-sqs"; +import { SendMessageBatchCommandOutput, SendMessageCommandOutput } from "@aws-sdk/client-sqs"; import { EventEmitter } from "events"; export default class SQS { private name; @@ -45,7 +45,7 @@ export default class SQS { * @param {Number} [options.delay] [in seconds] * @return {Promise} */ - publish(name: string, content: Record, meta?: Record, handle?: boolean, options?: Record): Promise; + publish(name: string, content: Record, meta?: Record, handle?: boolean, options?: Record): Promise; /** * Publish on SQS in batch * @param {string} name @@ -56,8 +56,8 @@ export default class SQS { * @param {Number} [options.delay] [in seconds] * @return {Promise} */ - publishBatch(name: string, contentList: Record[], meta?: Record, handle?: boolean, options?: Record): Promise; - publishFifo(name: string, content: Record, meta: Record, group: Record, handle?: boolean): Promise; + publishBatch(name: string, contentList: Record[], meta?: Record, handle?: boolean, options?: Record): Promise; + publishFifo(name: string, content: Record, meta: Record, group: Record, handle?: boolean): Promise; getQueueUrl(name: string): string; /** * Subscribe to a queue, using long polling diff --git a/lib/index.js b/lib/index.js index 1fa9608..f91275c 100644 --- a/lib/index.js +++ b/lib/index.js @@ -171,6 +171,9 @@ class SQS { try { const command = new client_sqs_1.SendMessageBatchCommand(params); const res = await this.client.send(command); + if (res.Successful && res.Successful.length > 0) { + this.success(`Successfully published ${res.Successful.length} messages to ${name}`); + } return res; } catch (err) { @@ -305,4 +308,4 @@ class SQS { } } exports.default = SQS; -//# sourceMappingURL=data:application/json;base64, \ No newline at end of file +//# sourceMappingURL=data:application/json;base64, \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index a726501..5ded077 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,13 +6,12 @@ import { DeleteMessageCommand, ChangeMessageVisibilityCommand, ReceiveMessageCommand, - SendMessageBatchResult, + SendMessageBatchCommandOutput, SendMessageRequest, - SendMessageResult, + SendMessageCommandOutput, ChangeMessageVisibilityRequest, DeleteMessageRequest, SendMessageBatchRequest, - CreateQueueCommandInput, } from "@aws-sdk/client-sqs"; import safeJSON from "safely-parse-json"; import { Consumer } from "sqs-consumer"; @@ -149,7 +148,7 @@ export default class SQS { meta: Record = {}, handle: boolean = true, options: Record = {} - ): Promise { + ): Promise { const params: SendMessageRequest = { QueueUrl: this.getQueueUrl(name), MessageBody: JSON.stringify({ content, meta }), @@ -192,7 +191,7 @@ export default class SQS { meta: Record = {}, handle: boolean = true, options: Record = {} - ): Promise { + ): Promise { let DelaySeconds: number | undefined; if (typeof options.delay === "number") { DelaySeconds = options.delay; @@ -230,7 +229,7 @@ export default class SQS { meta: Record = {}, group: Record, handle: boolean = true - ): Promise { + ): Promise { const params: SendMessageRequest = { QueueUrl: this.getQueueUrl(name), MessageBody: JSON.stringify({ content, meta }), From d7ced38c4825206257d9e5ac99ff443c28b51d3b Mon Sep 17 00:00:00 2001 From: Seshathri-saravanan Date: Wed, 17 Dec 2025 18:01:56 +0530 Subject: [PATCH 3/3] fixed types --- lib/index.d.ts | 6 +++--- lib/index.js | 11 +++++++---- src/index.ts | 12 +++++++++--- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/lib/index.d.ts b/lib/index.d.ts index f21db98..993b77f 100644 --- a/lib/index.d.ts +++ b/lib/index.d.ts @@ -45,7 +45,7 @@ export default class SQS { * @param {Number} [options.delay] [in seconds] * @return {Promise} */ - publish(name: string, content: Record, meta?: Record, handle?: boolean, options?: Record): Promise; + publish(name: string, content: Record, meta?: Record, handle?: boolean, options?: Record): Promise; /** * Publish on SQS in batch * @param {string} name @@ -56,8 +56,8 @@ export default class SQS { * @param {Number} [options.delay] [in seconds] * @return {Promise} */ - publishBatch(name: string, contentList: Record[], meta?: Record, handle?: boolean, options?: Record): Promise; - publishFifo(name: string, content: Record, meta: Record, group: Record, handle?: boolean): Promise; + publishBatch(name: string, contentList: Record[], meta?: Record, handle?: boolean, options?: Record): Promise; + publishFifo(name: string, content: Record, meta: Record, group: Record, handle?: boolean): Promise; getQueueUrl(name: string): string; /** * Subscribe to a queue, using long polling diff --git a/lib/index.js b/lib/index.js index f91275c..fc483e4 100644 --- a/lib/index.js +++ b/lib/index.js @@ -143,6 +143,8 @@ class SQS { if (handle === false) { throw err; } + // Return empty response when error is handled + return {}; } } /** @@ -171,9 +173,6 @@ class SQS { try { const command = new client_sqs_1.SendMessageBatchCommand(params); const res = await this.client.send(command); - if (res.Successful && res.Successful.length > 0) { - this.success(`Successfully published ${res.Successful.length} messages to ${name}`); - } return res; } catch (err) { @@ -186,6 +185,8 @@ class SQS { if (handle === false) { throw err; } + // Return empty response when error is handled + return { Successful: [], Failed: [] }; } } async publishFifo(name, content, meta = {}, group, handle = true) { @@ -210,6 +211,8 @@ class SQS { if (handle === false) { throw err; } + // Return empty response when error is handled + return {}; } } getQueueUrl(name) { @@ -308,4 +311,4 @@ class SQS { } } exports.default = SQS; -//# sourceMappingURL=data:application/json;base64, \ No newline at end of file +//# sourceMappingURL=data:application/json;base64, \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 5ded077..2e2c148 100644 --- a/src/index.ts +++ b/src/index.ts @@ -148,7 +148,7 @@ export default class SQS { meta: Record = {}, handle: boolean = true, options: Record = {} - ): Promise { + ): Promise { const params: SendMessageRequest = { QueueUrl: this.getQueueUrl(name), MessageBody: JSON.stringify({ content, meta }), @@ -172,6 +172,8 @@ export default class SQS { if (handle === false) { throw err; } + // Return empty response when error is handled + return {} as SendMessageCommandOutput; } } @@ -191,7 +193,7 @@ export default class SQS { meta: Record = {}, handle: boolean = true, options: Record = {} - ): Promise { + ): Promise { let DelaySeconds: number | undefined; if (typeof options.delay === "number") { DelaySeconds = options.delay; @@ -220,6 +222,8 @@ export default class SQS { if (handle === false) { throw err; } + // Return empty response when error is handled + return { Successful: [], Failed: [] } as SendMessageBatchCommandOutput; } } @@ -229,7 +233,7 @@ export default class SQS { meta: Record = {}, group: Record, handle: boolean = true - ): Promise { + ): Promise { const params: SendMessageRequest = { QueueUrl: this.getQueueUrl(name), MessageBody: JSON.stringify({ content, meta }), @@ -250,6 +254,8 @@ export default class SQS { if (handle === false) { throw err; } + // Return empty response when error is handled + return {} as SendMessageCommandOutput; } }