Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"mocha": true
},
"parserOptions": {
"ecmaVersion": 8
"ecmaVersion": 13
},
"rules": {
"import/no-unresolved": ["error", { "commonjs": true, "caseSensitive": true }],
Expand Down
1 change: 1 addition & 0 deletions .nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
16.13.0
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"name": "@quizizz/sqs",
"version": "0.1.3",
"version": "0.2.0",
"description": "A simple wrapper for aws sqs",
"main": "index.js",
"main": "src/index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
Expand All @@ -23,15 +23,15 @@
},
"homepage": "https://github.com/akshendra/sqs-wrapper#readme",
"devDependencies": {
"eslint": "^4.18.2",
"eslint-plugin-import": "^2.9.0",
"uuid": "^3.3.2"
"eslint": "8.9.0",
"eslint-plugin-import": "2.25.4",
"uuid": "8.3.2"
},
"dependencies": {
"safely-parse-json": "^0.1.0",
"sqs-consumer": "^3.8.0"
"safely-parse-json": "0.1.0",
"sqs-consumer": "5.6.0"
},
"peerDependencies": {
"aws-sdk": "^2.205.0"
"aws-sdk": "^2.699.0"
}
}
61 changes: 61 additions & 0 deletions src/index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@

type AWSConfig = {
region?: string,
};

type CreateQueueOpts = {
DelaySeconds: number,
MaximumMessageSize: number,
MessageRetentionPeriod: number,
ReceiveMessageWaitTimeSeconds: number,
FifoQueue: 'false' | 'true',
}

type Content = Record<string, unknown>
type Meta = Record<string, unknown>
type PublishOptions = {
delay: number
}

type Message = {
data: Record<string, unknown>;
ack: () => {},
nack: (err) => {},
};
type Metadata = {
id: string;
handle: string;
queueAttributes: Record<string, unknown>;
messageAttributes: Record<string, unknown>;
}

type SubscribeCallback = (message: Message, metadata: Metadata) => void
type SubscribeOptions = {
maxInProgress: number;
}

export default class SQS {
constructor(name: string, emitter: any, config: AWSConfig);
init(): Promise<SQS>;
createQueue(queueName: string, opts: CreateQueueOpts): Promise<void>;
publish(
queueName: string,
content: Content,
meta: Meta,
handle: boolean,
options: PublishOptions,
): Promise<unknown>;
publishFifo(
queueName: string,
content: Content,
meta: Meta,
group: string,
handle: boolean,
): Promise<unknown>;
getQueueUrl(queueName: string): string;
subscribe(queueName: string, cb: SubscribeCallback, opts: SubscribeOptions): Promise<void>;
deleteMessage(queueName: string, messageId: string, handle: string): Promise<void>;
returnMessage(queueName: string, messageId: string, handle: string): Promise<void>;
fetchMessages(queueName: string, number?: number): Promise<Message[]>;
fetchOne(queueName: string): Promise<Message>;
};
37 changes: 20 additions & 17 deletions index.js → src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

const AWS = require('aws-sdk');
const safeJSON = require('safely-parse-json');
const Consumer = require('sqs-consumer');
const uuid = require('uuid/v4');
const { Consumer } = require('sqs-consumer');

class SQS {
constructor(name, emitter, config = {}) {
Expand Down Expand Up @@ -40,10 +39,10 @@ class SQS {
});
}

init() {
async init() {
this.client = new AWS.SQS(this.config);
// try to list queues
return this.client.listQueues({
await this.client.listQueues({
QueueNamePrefix: '',
}).promise()
.then((response) => {
Expand All @@ -61,6 +60,7 @@ class SQS {
this.error(err, this.config);
throw err;
});
return this;
}

/**
Expand Down Expand Up @@ -187,7 +187,7 @@ class SQS {
if (!queueUrl) {
const error = new Error(`Queue ${name} does not exists`);
this.error(error, error.cause);
return Promise.reject(error);
throw error;
}
return queueUrl;
}
Expand All @@ -201,19 +201,22 @@ class SQS {
return new Promise((resolve) => {
const sub = Consumer.create({
queueUrl,
handleMessage: (msg, done) => {
cb({
data: safeJSON(msg.Body),
ack: done,
nack: (err) => {
done(err || new Error('Unable to process message'));
},
}, {
id: msg.MessageId,
handle: msg.ReceiptHandle,
queueAttributes: msg.Attributes,
messageAttributes: msg.MessageAttributes,
handleMessage: (msg) => {
return new Promise((_resolve, reject) => {
cb({
data: safeJSON(msg.Body),
ack: _resolve,
nack: (err) => {
reject(err || new Error('Unable to process message'));
},
}, {
id: msg.MessageId,
handle: msg.ReceiptHandle,
queueAttributes: msg.Attributes,
messageAttributes: msg.MessageAttributes,
});
});

},
batchSize: opts.maxInProgress || 10,
sqs: this.client,
Expand Down
10 changes: 6 additions & 4 deletions test/push.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/* eslint no-console: 0 */

const EventEmitter = require('events').EventEmitter;
const uuid = require('uuid/v4');
const SQS = require('../index');
const { v4: uuidv4 } = require('uuid');
const SQS = require('../src/index');

const emitter = new EventEmitter();
emitter.on('error', console.error.bind(console));
Expand All @@ -12,7 +14,7 @@ const sqs = new SQS('sqs', emitter);
function getGroup() {
return {
name: 'testing',
id: uuid(),
id: uuidv4(),
};
}

Expand All @@ -34,5 +36,5 @@ async function push() {
await sqs.publishFifo('test.fifo', { message: 'seven' }, {}, getGroup());
}

push().then(() => console.log('Pusblish')).catch(console.error.bind(console));
push().then(() => console.log('Published')).catch(console.error.bind(console));

9 changes: 7 additions & 2 deletions test/sub.spec.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
/* eslint no-console: 0 */


const EventEmitter = require('events').EventEmitter;

const SQS = require('../index');
const SQS = require('../src/index');

const emitter = new EventEmitter();
emitter.on('error', console.error.bind(console));
emitter.on('log', console.log.bind(console));
emitter.on('success', console.log.bind(console));
process.on('unhandledRejection', console.log);
process.on('uncaughtException', console.log);
process.on('uncaughtExceptionMonitor', console.log);

const sqs = new SQS('sqs', emitter);

Expand All @@ -17,7 +22,7 @@ async function sub() {
});

await sqs.subscribe('test.fifo', (msg) => {
console.log(msg.data);
console.log(msg);
setTimeout(() => {
msg.ack();
}, 1000);
Expand Down