Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 12 additions & 23 deletions lib/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
/// <reference types="node" />
import AWS, { AWSError } from "aws-sdk";
import EventEmitter from "events";
import { QueueAttributeMap, SendMessageBatchResult, SendMessageResult } from "aws-sdk/clients/sqs";
import { PromiseResult } from "aws-sdk/lib/request";
import { SendMessageBatchCommandOutput, SendMessageCommandOutput } from "@aws-sdk/client-sqs";
import { EventEmitter } from "events";
export default class SQS {
private name;
private emitter;
Expand Down Expand Up @@ -37,7 +34,7 @@ export default class SQS {
* @param {String} opts.FifoQueue='false' [Use FIFO (true) or Standard queue (false)]
* @return {Promise}
*/
createQueue(name: string, opts?: QueueAttributeMap): Promise<void>;
createQueue(name: string, opts?: Record<string, string>): Promise<void>;
/**
* Publish on SQS
* @param {string} name
Expand All @@ -48,7 +45,7 @@ export default class SQS {
* @param {Number} [options.delay] [in seconds]
* @return {Promise}
*/
publish(name: string, content: Record<string, any>, meta?: Record<string, any>, handle?: boolean, options?: Record<string, any>): Promise<PromiseResult<SendMessageResult, AWSError>>;
publish(name: string, content: Record<string, any>, meta?: Record<string, any>, handle?: boolean, options?: Record<string, any>): Promise<SendMessageCommandOutput>;
/**
* Publish on SQS in batch
* @param {string} name
Expand All @@ -59,31 +56,23 @@ export default class SQS {
* @param {Number} [options.delay] [in seconds]
* @return {Promise}
*/
publishBatch(name: string, contentList: Record<string, any>[], meta?: Record<string, any>, handle?: boolean, options?: Record<string, any>): Promise<PromiseResult<SendMessageBatchResult, AWSError>>;
publishFifo(name: string, content: Record<string, any>, meta: Record<string, any>, group: Record<string, any>, handle?: boolean): Promise<PromiseResult<SendMessageResult, AWSError>>;
publishBatch(name: string, contentList: Record<string, any>[], meta?: Record<string, any>, handle?: boolean, options?: Record<string, any>): Promise<SendMessageBatchCommandOutput>;
publishFifo(name: string, content: Record<string, any>, meta: Record<string, any>, group: Record<string, any>, handle?: boolean): Promise<SendMessageCommandOutput>;
getQueueUrl(name: string): string;
/**
* Subscribe to a queue, using long polling
*/
subscribe(name: string, cb: (arg1: Record<string, any>, arg2: Record<string, any>) => void, opts?: Record<string, any>): Promise<unknown>;
deleteMessage(name: string, messageId: any, handle: string): Promise<void>;
returnMessage(name: string, messageId: any, handle: string): Promise<void>;
fetchMessages(name: string, number?: number): Promise<({
fetchMessages(name: string, number?: number): Promise<{
data: any;
ack: () => Promise<void>;
nack: () => Promise<void>;
id?: undefined;
handle?: undefined;
queueAttributes?: undefined;
messageAttributes?: undefined;
} | {
id: string;
handle: string;
queueAttributes: AWS.SQS.MessageSystemAttributeMap;
messageAttributes: AWS.SQS.MessageBodyAttributeMap;
data?: undefined;
ack?: undefined;
nack?: undefined;
})[][]>;
queueAttributes: Partial<Record<import("@aws-sdk/client-sqs").MessageSystemAttributeName, string>>;
messageAttributes: Record<string, import("@aws-sdk/client-sqs").MessageAttributeValue>;
ack: () => Promise<void>;
nack: () => Promise<void>;
}[]>;
fetchOne(name: string): Promise<any>;
}
66 changes: 37 additions & 29 deletions lib/index.js

Large diffs are not rendered by default.

5 changes: 1 addition & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,12 @@
"eslint-import-resolver-typescript": "^2.4.0",
"eslint-plugin-es5": "^1.5.0",
"eslint-plugin-import": "^2.23.4",
"typescript": "^4.4.4"
"typescript": "^5.0.0"
},
"dependencies": {
"@aws-sdk/client-sqs": "^3.600.0",
"safely-parse-json": "0.1.0",
"sqs-consumer": "5.8.0",
"uuid": "^9.0.0"
},
"peerDependencies": {
"aws-sdk": "^2.1403.0"
}
}
98 changes: 54 additions & 44 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
import AWS, { AWSError } from "aws-sdk";
import safeJSON from "safely-parse-json";
import { Consumer } from "sqs-consumer";
import EventEmitter from "events";
import {
QueueAttributeMap,
SendMessageBatchResult,
SQSClient,
CreateQueueCommand,
SendMessageCommand,
SendMessageBatchCommand,
DeleteMessageCommand,
ChangeMessageVisibilityCommand,
ReceiveMessageCommand,
SendMessageBatchCommandOutput,
SendMessageRequest,
SendMessageResult,
} from "aws-sdk/clients/sqs";
import { PromiseResult } from "aws-sdk/lib/request";
import {
SendMessageCommandOutput,
ChangeMessageVisibilityRequest,
DeleteMessageRequest,
SendMessageBatchRequest,
} from "@aws-sdk/client-sqs";
import safeJSON from "safely-parse-json";
import { Consumer } from "sqs-consumer";
import { EventEmitter } from "events";
import { v4 as uuidv4 } from "uuid";
import { Agent } from 'https';

export default class SQS {
private name: string;
private emitter: EventEmitter;
private config: Record<string, any>;
private client: AWS.SQS;
private client: SQSClient;
private queues: Record<string, string>;

constructor(name: string, emitter: EventEmitter, config: Record<string, any> = {}) {
Expand Down Expand Up @@ -83,7 +85,7 @@ export default class SQS {
...(region && { region }),
...(accountId && { accountId })
};
this.client = new AWS.SQS(this.config);
this.client = new SQSClient(this.config);
this.log(`Connected on SQS:${this.name}`, this.config);
return this;
} catch (err: any) {
Expand All @@ -107,20 +109,19 @@ export default class SQS {
* @param {String} opts.FifoQueue='false' [Use FIFO (true) or Standard queue (false)]
* @return {Promise}
*/
async createQueue(name: string, opts: QueueAttributeMap = {}): Promise<void> {
async createQueue(name: string, opts: Record<string, string> = {}): Promise<void> {
const options = Object.assign(
{
// FifoQueue: 'false', // use standard by default
},
opts
);
try {
const response = await this.client
.createQueue({
QueueName: name,
Attributes: options,
})
.promise();
const command = new CreateQueueCommand({
QueueName: name,
Attributes: options,
});
const response = await this.client.send(command);
const queueUrl = response.QueueUrl;
const message = `Created queue ${name} => ${queueUrl}`;
this.log(message, { name, queueUrl });
Expand All @@ -147,7 +148,7 @@ export default class SQS {
meta: Record<string, any> = {},
handle: boolean = true,
options: Record<string, any> = {}
): Promise<PromiseResult<SendMessageResult, AWSError>> {
): Promise<SendMessageCommandOutput> {
const params: SendMessageRequest = {
QueueUrl: this.getQueueUrl(name),
MessageBody: JSON.stringify({ content, meta }),
Expand All @@ -158,7 +159,8 @@ export default class SQS {
}

try {
const res = await this.client.sendMessage(params).promise();
const command = new SendMessageCommand(params);
const res = await this.client.send(command);
return res;
} catch (err) {
this.error(err, {
Expand All @@ -170,6 +172,8 @@ export default class SQS {
if (handle === false) {
throw err;
}
// Return empty response when error is handled
return {} as SendMessageCommandOutput;
}
}

Expand All @@ -189,7 +193,7 @@ export default class SQS {
meta: Record<string, any> = {},
handle: boolean = true,
options: Record<string, any> = {}
): Promise<PromiseResult<SendMessageBatchResult, AWSError>> {
): Promise<SendMessageBatchCommandOutput> {
let DelaySeconds: number | undefined;
if (typeof options.delay === "number") {
DelaySeconds = options.delay;
Expand All @@ -205,7 +209,8 @@ export default class SQS {
};

try {
const res = await this.client.sendMessageBatch(params).promise();
const command = new SendMessageBatchCommand(params);
const res = await this.client.send(command);
return res;
} catch (err) {
this.error(err, {
Expand All @@ -217,6 +222,8 @@ export default class SQS {
if (handle === false) {
throw err;
}
// Return empty response when error is handled
return { Successful: [], Failed: [] } as SendMessageBatchCommandOutput;
}
}

Expand All @@ -226,15 +233,16 @@ export default class SQS {
meta: Record<string, any> = {},
group: Record<string, any>,
handle: boolean = true
): Promise<PromiseResult<SendMessageResult, AWSError>> {
): Promise<SendMessageCommandOutput> {
const params: SendMessageRequest = {
QueueUrl: this.getQueueUrl(name),
MessageBody: JSON.stringify({ content, meta }),
MessageGroupId: group.name,
MessageDeduplicationId: group.id,
};
try {
const res = await this.client.sendMessage(params).promise();
const command = new SendMessageCommand(params);
const res = await this.client.send(command);
return res;
} catch (err) {
this.error(err, {
Expand All @@ -246,6 +254,8 @@ export default class SQS {
if (handle === false) {
throw err;
}
// Return empty response when error is handled
return {} as SendMessageCommandOutput;
}
}

Expand Down Expand Up @@ -316,7 +326,8 @@ export default class SQS {
ReceiptHandle: handle,
};

await this.client.deleteMessage(params).promise();
const command = new DeleteMessageCommand(params);
await this.client.send(command);
}

async returnMessage(name: string, messageId: any, handle: string) {
Expand All @@ -326,7 +337,8 @@ export default class SQS {
ReceiptHandle: handle,
VisibilityTimeout: 0,
};
await this.client.changeMessageVisibility(params).promise();
const command = new ChangeMessageVisibilityCommand(params);
await this.client.send(command);
}

async fetchMessages(name: string, number = 10) {
Expand All @@ -335,25 +347,23 @@ export default class SQS {
QueueUrl: queueUrl,
MaxNumberOfMessages: number,
};
const res = await this.client.receiveMessage(params).promise();
return res.Messages.map((msg) => {
return [
{
data: safeJSON(msg.Body),
ack: () => {
return this.deleteMessage(name, msg.MessageId, msg.ReceiptHandle);
},
nack: () => {
return this.returnMessage(name, msg.MessageId, msg.ReceiptHandle);
},
const command = new ReceiveMessageCommand(params);
const res = await this.client.send(command);
const messages = res.Messages || [];
return messages.map((msg) => {
return {
data: safeJSON(msg.Body),
id: msg.MessageId,
handle: msg.ReceiptHandle,
queueAttributes: msg.Attributes,
messageAttributes: msg.MessageAttributes,
ack: () => {
return this.deleteMessage(name, msg.MessageId, msg.ReceiptHandle);
},
{
id: msg.MessageId,
handle: msg.ReceiptHandle,
queueAttributes: msg.Attributes,
messageAttributes: msg.MessageAttributes,
nack: () => {
return this.returnMessage(name, msg.MessageId, msg.ReceiptHandle);
},
];
};
});
}

Expand Down
2 changes: 1 addition & 1 deletion test/push.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const EventEmitter = require('events').EventEmitter;
const { v4: uuidv4 } = require('uuid');
const SQS = require('../src/index');
const SQS = require('../lib/index').default;

const emitter = new EventEmitter();
emitter.on('error', console.error.bind(console));
Expand Down
29 changes: 16 additions & 13 deletions test/sub.spec.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
/* eslint no-console: 0 */


const EventEmitter = require('events').EventEmitter;

const SQS = require('../src/index');
const SQS = require('../lib/index').default;

const emitter = new EventEmitter();
emitter.on('error', console.error.bind(console));
Expand All @@ -15,21 +13,26 @@ process.on('uncaughtExceptionMonitor', console.log);

const sqs = new SQS('sqs', emitter);

async function sub() {
async function fetch() {
await sqs.init();
await sqs.createQueue('test.fifo', {
FifoQueue: 'true',
});

await sqs.subscribe('test.fifo', (msg) => {
console.log(msg);
setTimeout(() => {
msg.ack();
}, 1000);
}, {
maxInProgress: 2,
});
// Fetch messages from the queue
const messages = await sqs.fetchMessages('test.fifo', 5);
console.log(`Fetched ${messages.length} messages`);

// Process each message
for (const msg of messages) {
console.log('Message:', msg.data);
console.log('Message ID:', msg.id);

// Acknowledge the message after processing
await msg.ack();
console.log('Message acknowledged');
}
}

sub().then(() => console.log('Subsciption')).catch(console.error.bind(console));
fetch().then(() => console.log('Fetch completed')).catch(console.error.bind(console));