diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 54a5891..faf174d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,6 +8,7 @@ on: branches: [master] pull_request: branches: [master] + workflow_dispatch: concurrency: group: test-${{ github.ref }} @@ -30,15 +31,21 @@ jobs: steps: - name: Checkout repository uses: actions/checkout@v4.0.0 + - name: Use Node.js ${{ matrix.node-version }} uses: actions/setup-node@v3.8.1 with: node-version: ${{ matrix.node-version }} cache: npm cache-dependency-path: ./package.json + - name: Start Redis uses: supercharge/redis-github-action@1.7.0 with: redis-version: ${{ matrix.redis-version }} + + - name: Start DynamoDB local + uses: rrainn/dynamodb-action@v3.0.0 + - run: npm install - run: npm run test diff --git a/index.js b/index.js index 1930772..3d06545 100644 --- a/index.js +++ b/index.js @@ -10,6 +10,7 @@ const RateLimiterUnion = require('./lib/RateLimiterUnion'); const RateLimiterQueue = require('./lib/RateLimiterQueue'); const BurstyRateLimiter = require('./lib/BurstyRateLimiter'); const RateLimiterRes = require('./lib/RateLimiterRes'); +const RateLimiterDynamo = require('./lib/RateLimiterDynamo'); module.exports = { RateLimiterRedis, @@ -26,4 +27,5 @@ module.exports = { RateLimiterQueue, BurstyRateLimiter, RateLimiterRes, + RateLimiterDynamo }; diff --git a/lib/RateLimiterDynamo.js b/lib/RateLimiterDynamo.js new file mode 100644 index 0000000..dca5cf8 --- /dev/null +++ b/lib/RateLimiterDynamo.js @@ -0,0 +1,388 @@ +const RateLimiterRes = require("./RateLimiterRes"); +const RateLimiterStoreAbstract = require("./RateLimiterStoreAbstract"); + +class DynamoItem { + /** + * Create a DynamoItem. + * @param {string} rlKey - The key for the rate limiter. + * @param {number} points - The number of points. + * @param {number} expire - The expiration time in seconds. + */ + constructor(rlKey, points, expire) { + this.key = rlKey; + this.points = points; + this.expire = expire; + } +} + +// Free tier DynamoDB provisioned mode params +const DEFAULT_READ_CAPACITY_UNITS = 25; +const DEFAULT_WRITE_CAPACITY_UNITS = 25; + +/** + * Implementation of RateLimiterStoreAbstract using DynamoDB. + * @class RateLimiterDynamo + * @extends RateLimiterStoreAbstract + */ +class RateLimiterDynamo extends RateLimiterStoreAbstract { + + /** + * Constructs a new instance of the class. + * The storeClient MUST be an instance of AWS.DynamoDB NOT of AWS.DynamoDBClient. + * + * @param {Object} opts - The options for the constructor. + * @param {function} cb - The callback function (optional). + * @return {void} + */ + constructor(opts, cb = null) { + super(opts); + + this.client = opts.storeClient; + this.tableName = opts.tableName; + this.tableCreated = opts.tableCreated; + + if (!this.tableCreated) { + this._createTable(opts.dynamoTableOpts) + .then((data) => { + this.tableCreated = true; + + this._setTTL() + .finally(() => { + // Callback invocation + if (typeof cb === 'function') { + cb(); + } + }); + + }) + .catch( err => { + //callback invocation + if (typeof cb === 'function') { + cb(err); + } else { + throw err; + } + }); + + } else { + + this._setTTL() + .finally(() => { + // Callback invocation + if (typeof cb === 'function') { + cb(); + } + }); + } + } + + get tableName() { + return this._tableName; + } + + set tableName(value) { + this._tableName = typeof value === 'undefined' ? 'node-rate-limiter-flexible' : value; + } + + get tableCreated() { + return this._tableCreated + } + + set tableCreated(value) { + this._tableCreated = typeof value === 'undefined' ? false : !!value; + } + + /** + * Creates a table in the database. Return null if the table already exists. + * + * @param {{readCapacityUnits: number, writeCapacityUnits: number}} tableOpts + * @return {Promise} A promise that resolves with the result of creating the table. + */ + async _createTable(tableOpts) { + + const params = { + TableName: this.tableName, + AttributeDefinitions: [ + { + AttributeName: 'key', + AttributeType: 'S' + } + ], + KeySchema: [ + { + AttributeName: 'key', + KeyType: 'HASH' + } + ], + ProvisionedThroughput: { + ReadCapacityUnits: tableOpts && tableOpts.readCapacityUnits ? tableOpts.readCapacityUnits : DEFAULT_READ_CAPACITY_UNITS, + WriteCapacityUnits: tableOpts && tableOpts.writeCapacityUnits ? tableOpts.writeCapacityUnits : DEFAULT_WRITE_CAPACITY_UNITS + } + }; + + try { + const data = await this.client.createTable(params); + return data; + } catch(err) { + if (err.__type && err.__type.includes('ResourceInUseException')) { + return null; + } else { + throw err; + } + } + } + + /** + * Retrieves an item from the table based on the provided key. + * + * @param {string} rlKey - The key used to retrieve the item. + * @throws {Error} Throws an error if the table is not created yet. + * @return {DynamoItem|null} - The retrieved item, or null if it doesn't exist. + */ + async _get(rlKey) { + + if (!this.tableCreated) { + throw new Error('Table is not created yet'); + } + + const params = { + TableName: this.tableName, + Key: { + key: {S: rlKey} + } + }; + + const data = await this.client.getItem(params); + if(data.Item) { + return new DynamoItem( + data.Item.key.S, + Number(data.Item.points.N), + Number(data.Item.expire.N) + ); + } else { + return null; + } + } + + /** + * Deletes an item from the table based on the given rlKey. + * + * @param {string} rlKey - The rlKey of the item to delete. + * @throws {Error} Throws an error if the table is not created yet. + * @return {boolean} Returns true if the item was successfully deleted, otherwise false. + */ + async _delete(rlKey) { + + if (!this.tableCreated) { + throw new Error('Table is not created yet'); + } + + const params = { + TableName: this.tableName, + Key: { + key: {S: rlKey} + }, + ConditionExpression: 'attribute_exists(#k)', + ExpressionAttributeNames: { + '#k': 'key' + } + } + + try { + const data = await this._client.deleteItem(params); + return data.$metadata.httpStatusCode === 200; + } catch(err) { + // ConditionalCheckFailed, item does not exist in table + if (err.__type && err.__type.includes('ConditionalCheckFailedException')) { + return false; + } else { + throw err; + } + } + + } + + /** + * Implemented with DynamoDB Atomic Counters. 3 calls are made to DynamoDB but each call is atomic. + * From the documentation: "UpdateItem calls are naturally serialized within DynamoDB, + * so there are no race condition concerns with making multiple simultaneous calls." + * See: https://aws.amazon.com/it/blogs/database/implement-resource-counters-with-amazon-dynamodb/ + * @param {*} rlKey + * @param {*} points + * @param {*} msDuration + * @param {*} forceExpire + * @param {*} options + * @returns + */ + async _upsert(rlKey, points, msDuration, forceExpire = false, options = {}) { + + if (!this.tableCreated) { + throw new Error('Table is not created yet'); + } + + const dateNow = Date.now(); + const dateNowSec = dateNow / 1000; + /* -1 means never expire, DynamoDb do not support null values in number fields. + DynamoDb TTL use unix timestamp in seconds. + */ + const newExpireSec = msDuration > 0 ? (dateNow + msDuration) / 1000 : -1; + + // Force expire, overwrite points. Create a new entry if not exists + if (forceExpire) { + return await this._baseUpsert({ + TableName: this.tableName, + Key: { key: {S: rlKey} }, + UpdateExpression: 'SET points = :points, expire = :expire', + ExpressionAttributeValues: { + ':points': {N: points.toString()}, + ':expire': {N: newExpireSec.toString()} + }, + ReturnValues: 'ALL_NEW' + }); + } + + try { + // First try update, success if entry NOT exists or IS expired + return await this._baseUpsert({ + TableName: this.tableName, + Key: { key: {S: rlKey} }, + UpdateExpression: 'SET points = :new_points, expire = :new_expire', + ExpressionAttributeValues: { + ':new_points': {N: points.toString()}, + ':new_expire': {N: newExpireSec.toString()}, + ':where_expire': {N: dateNowSec.toString()} + }, + ConditionExpression: 'expire <= :where_expire OR attribute_not_exists(points)', + ReturnValues: 'ALL_NEW' + }); + + } catch (err) { + // Second try update, success if entry exists and IS NOT expired + return await this._baseUpsert({ + TableName: this.tableName, + Key: { key: {S: rlKey} }, + UpdateExpression: 'SET points = points + :new_points', + ExpressionAttributeValues: { + ':new_points': {N: points.toString()}, + ':where_expire': {N: dateNowSec.toString()} + }, + ConditionExpression: 'expire > :where_expire', + ReturnValues: 'ALL_NEW' + }); + } + } + + /** + * Asynchronously upserts data into the table. params is a DynamoDB params object. + * + * @param {Object} params - The parameters for the upsert operation. + * @throws {Error} Throws an error if the table is not created yet. + * @return {DynamoItem} Returns a DynamoItem object with the updated data. + */ + async _baseUpsert(params) { + + if (!this.tableCreated) { + throw new Error('Table is not created yet'); + } + + try { + const data = await this.client.updateItem(params); + return new DynamoItem( + data.Attributes.key.S, + Number(data.Attributes.points.N), + Number(data.Attributes.expire.N) + ); + } catch (err) { + //console.log('_baseUpsert', params, err); + throw err; + } + } + + /** + * Sets the Time-to-Live (TTL) for the table. TTL use the expire field in the table. + * See: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html + * + * @return {Promise} A promise that resolves when the TTL is successfully set. + * @throws {Error} Throws an error if the table is not created yet. + * @returns {Promise} + */ + async _setTTL() { + + if (!this.tableCreated) { + throw new Error('Table is not created yet'); + } + + try { + + // Check if the TTL is already set + const isTTLSet = await this._isTTLSet(); + if (isTTLSet) { + return; + } + + const params = { + TableName: this.tableName, + TimeToLiveSpecification: { + AttributeName: 'expire', + Enabled: true + } + } + + const res = await this.client.updateTimeToLive(params); + return res; + + } catch (err) { + throw err; + } + + } + + /** + * Checks if the Time To Live (TTL) feature is set for the DynamoDB table. + * + * @return {boolean} Returns true if the TTL feature is enabled for the table, otherwise false. + * @throws {Error} Throws an error if the table is not created yet or if there is an error while checking the TTL status. + */ + async _isTTLSet() { + + if (!this.tableCreated) { + throw new Error('Table is not created yet'); + } + + try { + + const res = await this.client.describeTimeToLive({TableName: this.tableName}); + return ( + res.$metadata.httpStatusCode == 200 + && res.TimeToLiveDescription.TimeToLiveStatus === 'ENABLED' + && res.TimeToLiveDescription.AttributeName === 'expire' + ); + + } catch (err) { + throw err; + } + } + + /** + * Generate a RateLimiterRes object based on the provided parameters. + * + * @param {string} rlKey - The key for the rate limiter. + * @param {number} changedPoints - The number of points that have changed. + * @param {DynamoItem} result - The result object of _get() method. + * @returns {RateLimiterRes} - The generated RateLimiterRes object. + */ + _getRateLimiterRes(rlKey, changedPoints, result) { + + const res = new RateLimiterRes(); + res.isFirstInDuration = changedPoints === result.points; + res.consumedPoints = res.isFirstInDuration ? changedPoints : result.points; + res.remainingPoints = Math.max(this.points - res.consumedPoints, 0); + // Expire time saved in unix time seconds not ms + res.msBeforeNext = result.expire != -1 ? Math.max(result.expire * 1000 - Date.now(), 0) : -1; + + return res; + } + +} + +module.exports = RateLimiterDynamo; \ No newline at end of file diff --git a/lib/constants.js b/lib/constants.js index d897c8a..2da360f 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -6,6 +6,7 @@ const LIMITER_TYPES = { REDIS: 'redis', MYSQL: 'mysql', POSTGRES: 'postgres', + DYNAMO: 'dynamo' }; const ERR_UNKNOWN_LIMITER_TYPE_MESSAGE = 'Unknown limiter type. Use one of LIMITER_TYPES constants.'; diff --git a/lib/index.d.ts b/lib/index.d.ts index 7ae2336..36e2731 100644 --- a/lib/index.d.ts +++ b/lib/index.d.ts @@ -388,3 +388,14 @@ export class BurstyRateLimiter { options?: IRateLimiterMongoFunctionOptions ): Promise; } + +interface IRateLimiterDynamoOptions extends IRateLimiterStoreOptions { + dynamoTableOpts?: { + readCapacityUnits: number; + writeCapacityUnits: number; + } +} + +export class RateLimiterDynamo extends RateLimiterStoreAbstract { + constructor(opts: IRateLimiterDynamoOptions, cb?: ICallbackReady); +} diff --git a/package.json b/package.json index 0489183..ce944dd 100644 --- a/package.json +++ b/package.json @@ -41,6 +41,7 @@ "homepage": "https://github.com/animir/node-rate-limiter-flexible#readme", "types": "./lib/index.d.ts", "devDependencies": { + "@aws-sdk/client-dynamodb": "^3.431.0", "chai": "^4.1.2", "coveralls": "^3.0.1", "eslint": "^4.19.1", @@ -49,7 +50,7 @@ "eslint-plugin-node": "^6.0.1", "eslint-plugin-security": "^1.4.0", "ioredis": "^5.3.2", - "istanbul": "^0.4.5", + "istanbul": "^1.1.0-alpha.1", "memcached-mock": "^0.1.0", "mocha": "^10.2.0", "redis": "^4.6.8", diff --git a/test/RateLimiterDynamo.test.js b/test/RateLimiterDynamo.test.js new file mode 100644 index 0000000..3549eec --- /dev/null +++ b/test/RateLimiterDynamo.test.js @@ -0,0 +1,346 @@ +const {DynamoDB} = require('@aws-sdk/client-dynamodb') +const { expect } = require('chai'); +const { describe, it } = require('mocha'); +const RateLimiterDynamo = require('../lib/RateLimiterDynamo'); +const sinon = require('sinon'); + +/* + In order to perform this tests, you need to run a local instance of dynamodb: + docker run -p 8000:8000 amazon/dynamodb-local +*/ +describe('RateLimiterDynamo with fixed window', function RateLimiterDynamoTest() { + this.timeout(5000); + + const dynamoClient = new DynamoDB({ + region: 'eu-central-1', + credentials: { + accessKeyId: 'fake', + secretAccessKey: 'fake' + }, + endpoint: 'http://localhost:8000' + }); + + it('DynamoDb client connection', (done) => { + expect(dynamoClient).to.not.equal(null); + dynamoClient.listTables() + .then((data) => { + done(); + }) + .catch((err) => { + done(err); + }); + }); + + it('get item from DynamoDB', (done) => { + + const testKey = 'test'; + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient + }, + () => { + rateLimiter.set(testKey, 999, 10000) + .then((data) => { + rateLimiter.get(testKey) + .then((response) => { + expect(response).to.not.equal(null); + done(); + }) + .catch((err) => { + done(err); + }); + }) + .catch((err) => { + done(err); + }) + } + ); + }); + + it('get NOT existing item from DynamoDB', (done) => { + + const testKey = 'not_existing'; + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient + }, + () => { + rateLimiter.get(testKey) + .then((response) => { + expect(response).to.equal(null); + done(); + }) + .catch((err) => { + done(err); + }); + } + ); + }); + + it('delete item from DynamoDB', (done) => { + + const testKey = 'delete_test'; + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient + }, + () => { + rateLimiter.set(testKey, 999, 10000) + .then((data) => { + rateLimiter.delete(testKey) + .then((response) => { + expect(response).to.equal(true); + done(); + }) + .catch((err) => { + done(err); + }); + }) + .catch((err) => { + done(err); + }) + } + ); + }); + + it('delete NOT existing item from DynamoDB return false', (done) => { + + const testKey = 'delete_test_2'; + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient + }, + () => { + rateLimiter.delete(testKey) + .then((response) => { + expect(response).to.equal(false); + done(); + }) + .catch((err) => { + done(err); + }); + } + ); + }); + + it('consume 1 point', (done) => { + const testKey = 'consume1'; + + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient, + points: 2, + duration: 10 + }, + () => { + rateLimiter.consume(testKey) + .then((result) => { + expect(result.consumedPoints).to.equal(1); + rateLimiter.delete(testKey); + done(); + }) + .catch((err) => { + done(err); + }); + + }); + + }); + + it('rejected when consume more than maximum points', (done) => { + const testKey = 'consumerej'; + + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient, + points: 1, + duration: 5 + }, + () => { + rateLimiter.consume(testKey, 2) + .then((result) => { + expect(result.consumedPoints).to.equal(2); + done(Error('must not resolve')); + }) + .catch((err) => { + expect(err.consumedPoints).to.equal(2); + done(); + }); + + }); + }); + + it('blocks key for block duration when consumed more than points', (done) => { + const testKey = 'block'; + + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient, + points: 1, + duration: 1, + blockDuration: 2 + }, + () => { + rateLimiter.consume(testKey, 2) + .then((result) => { + expect(result.consumedPoints).to.equal(2); + done(Error('must not resolve')); + }) + .catch((err) => { + expect(err.msBeforeNext > 1000).to.equal(true); + done(); + }); + + }); + + }); + + it('return correct data with _getRateLimiterRes', () => { + const testKey = 'test'; + + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient, + points: 5, + }, + () => { + + const res = rateLimiter._getRateLimiterRes( + 'test', + 1, + { key: 'test', points: 3, expire: (Date.now() + 1000) / 1000} + ); + + expect(res.msBeforeNext <= 1000 && + res.consumedPoints === 3 && + res.isFirstInDuration === false && + res.remainingPoints === 2 + ).to.equal(true); + + }); + }); + + it('get points', (done) => { + const testKey = 'get'; + + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient, + points: 5, + }, + () => { + + rateLimiter.set(testKey, 999, 10000) + .then((data) => { + rateLimiter.get(testKey) + .then((response) => { + expect(response.consumedPoints).to.equal(999); + rateLimiter.delete(testKey); + done(); + }) + .catch((err) => { + done(err); + }); + }) + .catch((err) => { + done(err); + }); + + }); + + }); + + it('get points return NULL if key is not set', (done) => { + const testKey = 'getnull'; + + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient, + points: 5, + }, + () => { + + rateLimiter.get(testKey) + .then((response) => { + expect(response).to.equal(null); + done(); + }) + .catch((err) => { + done(err); + }); + }); + + }); + + it('delete returns false, if there is no key', (done) => { + const testKey = 'getnull3'; + + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient, + points: 5, + }, + () => { + + rateLimiter.delete(testKey) + .then((response) => { + expect(response).to.equal(false); + done(); + }) + .catch((err) => { + done(err); + }); + }); + + }); + + it('delete rejects on error', (done) => { + const testKey = 'deleteerr'; + + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient, + points: 5, + }, + () => { + + sinon.stub(dynamoClient, 'deleteItem').callsFake(() => { + throw new Error('stub error'); + }); + + rateLimiter.delete(testKey) + .catch(() => { + done(); + }); + + dynamoClient.deleteItem.restore(); + }); + + }); + + + it('does not expire key if duration set to 0', (done) => { + const testKey = 'neverexpire'; + + const rateLimiter = new RateLimiterDynamo({ + storeClient: dynamoClient, + points: 2, + duration: 0 + }, + () => { + + rateLimiter.set(testKey, 2, 0) + .then(() => { + rateLimiter.consume(testKey, 1) + .then(() => { + rateLimiter.get(testKey) + .then((res) => { + expect(res.consumedPoints).to.equal(1); + expect(res.msBeforeNext).to.equal(-1); + done(); + }) + .catch((err) => { + done(err); + }) + }) + .catch((err) => { + done(err); + }) + }) + .catch((err) => { + done(err); + }); + + }); + + }); + +});