From 46567a7112414ae53723c15bdf9e43216deb4a07 Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Sun, 23 Aug 2015 07:45:38 -0700 Subject: [PATCH] add partition counter based on stream shards --- bin/dynamodb-partition-count.js | 10 ++++++++ index.js | 43 +++++++++++++++++++++++++++++++++ package.json | 8 +++--- 3 files changed, 58 insertions(+), 3 deletions(-) create mode 100755 bin/dynamodb-partition-count.js diff --git a/bin/dynamodb-partition-count.js b/bin/dynamodb-partition-count.js new file mode 100755 index 0000000..07fa904 --- /dev/null +++ b/bin/dynamodb-partition-count.js @@ -0,0 +1,10 @@ +#!/usr/bin/env node + +var region = process.argv[2].split('/')[0]; +var table = process.argv[2].split('/')[1]; +var throughput = require('..')(table, { region: region }); + +throughput.partitionCount(function(err, partitionCount) { + if (err) throw err; + console.log('Found %s partitions', partitionCount); +}); diff --git a/index.js b/index.js index e2ae6e5..aa2c1f6 100644 --- a/index.js +++ b/index.js @@ -1,4 +1,5 @@ var AWS = require('aws-sdk'); +var async = require('queue-async'); module.exports = function(tableName, dynamoOptions) { var cache = { @@ -66,6 +67,46 @@ module.exports = function(tableName, dynamoOptions) { }); } + function describePartitions(callback) { + var streams = new AWS.DynamoDBStreams({ region: dynamoOptions.region }); + streams.listStreams({ TableName: tableName }, function(err, data) { + if (err) return callback(err); + if (!data.Streams[0]) return callback(new Error('Must enable DynamoDB Streams to count partitions')); + + var arn = data.Streams[0].StreamArn; + streams.describeStream({ StreamArn: arn }, function(err, data) { + if (err) return callback(err); + + var shards = data.StreamDescription.Shards; + var queue = async(); + + shards.forEach(function(shard) { + queue.defer(function(next) { + streams.getShardIterator({ + ShardId: shard.ShardId, + ShardIteratorType: 'LATEST', + StreamArn: arn + }, function(err, data) { + if (err) return next(err); + + streams.getRecords({ + ShardIterator: data.ShardIterator + }, function(err, data) { + if (err) return next(err); + next(null, data.Records.length || 'NextShardIterator' in data ? 1 : 0); + }); + }); + }); + }); + + queue.awaitAll(function(err, results) { + if (err) return callback(err); + callback(null, results.reduce(function(count, num) { count += num; return count; }, 0)); + }); + }); + }); + } + var throughput = { tableInfo: function(callback) { describeTable(function(err, main, indexes) { @@ -117,6 +158,8 @@ module.exports = function(tableName, dynamoOptions) { }); }, + partitionCount: describePartitions, + setCapacity: function(capacity, callback) { if (cache.main.read && cache.main.write) return update(); diff --git a/package.json b/package.json index 2f0d7c1..d1b0761 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,8 @@ }, "bin": { "dynamodb-throughput-info": "bin/dynamodb-throughput-info.js", - "dynamodb-throughput-adjustment": "bin/dynamodb-throughput-adjustment.js" + "dynamodb-throughput-adjustment": "bin/dynamodb-throughput-adjustment.js", + "dynamodb-partition-count": "bin/dynamodb-partition-count.js" }, "repository": { "type": "git", @@ -23,8 +24,9 @@ "author": "Mapbox", "license": "ISC", "dependencies": { - "aws-sdk": "^2.1.17", - "minimist": "^1.1.1" + "aws-sdk": "^2.1.46", + "minimist": "^1.1.1", + "queue-async": "^1.0.7" }, "devDependencies": { "dynalite": "^0.12.0",