Skip to content

Commit

Permalink
Merge pull request #3 from chriskinsman/mprocess
Browse files Browse the repository at this point in the history
Improved performance, utilize EC2 IAM role or ~/.aws/credentials if available
  • Loading branch information
chriskinsman authored Jan 2, 2017
2 parents ce8bc2c + f3af5ec commit 3feb974
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 36 deletions.
50 changes: 46 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ or streamed to S3.
## Features
* Write to local file system
* Stream to S3
* DynamoDb arallel scans to utilize provisioned throughput
* DynamoDb parallel scans to utilize provisioned throughput
* Multiple child processes to maximize usage of multiple cores

## Installation

Expand Down Expand Up @@ -58,9 +59,11 @@ __Arguments__
* `awsSecretAccessKey` - AWS secret
* `awsRegion` - AWS region

### exportTable(table, columns, totalSegments, compressed, filesize, s3Bucket, s3Key, callback)
### exportTable(table, columns, totalSegments, compressed, filesize, s3Bucket, s3Path, callback)

Exports the specified columns in the dynamodb table to one or more files
Exports the specified columns in the dynamodb table to one or more files. This method will spawn
multiple child processes for each parallel scan. This allows it to maximize performance by
utilizing multiple cores.

__Arguments__

Expand All @@ -75,10 +78,49 @@ higher read throughput. One file is created per segment at a minimum.
* `filesize` - Maximum size of each file in megabytes. Once file hits this size it is closed and a new file
created
* `s3Bucket` - Optional. If specified the files are streamed to s3 instead of the local file system.
* `s3Key` - Optional. Key prefix for files in s3. Used as a prefix with sequential numbers
* `s3Path` - Optional. Key prefix for files in s3. Used as a prefix with sequential numbers
appended for each file created
* `callback(err)` - A callback which is executed when finished and includes any errors that occurred

### exportTableWorker(table, columns, totalSegments, segment, compressed, filesize, s3Bucket, s3Path, callback)

Exports one slice of a dynamodb table. Used when running parallel scans. If you use exportTable there
is no reason to use this. You might want to use it to break up a scan into chunks perhaps across machines
and manually break up what exportTable already does for you.

__Arguments__

* `table` - Name of dynamodb table
* `columns` - Array of column names. Dynamodb has no way to query the table and determine columns without scanning
the entire table. Only columns specified here are exported. Columns do not have to be present on every
record.
* `totalSegments` - Number of parallel scans to run. The dynamodb table key space is split into this many segments
and reads are done in parallel across these segments. This spreads the query load across the key space and allows
higher read throughput. One file is created per segment at a minimum.
* 'segment' - The index of the parallel scan
* `compressed` - When set to true files are output in compressed gzip format
* `filesize` - Maximum size of each file in megabytes. Once file hits this size it is closed and a new file
created
* `s3Bucket` - Optional. If specified the files are streamed to s3 instead of the local file system.
* `s3Path` - Optional. Key prefix for files in s3. Used as a prefix with sequential numbers
appended for each file created
* `callback(err)` - A callback which is executed when finished and includes any errors that occurred


## Performance

With the last update I ran a few performance comparisons while improving performance. These were not rigorously
isolated, repeatable performance comparisons. All tests were run against a DynamoDb table scaled to 5000 read IOPS.
The table contained 187,363,510 rows and was 98GB in size. All tests wrote the resulting CSV files to S3.

| Instance Size | Scans | Execution Time | CPU | IOPS |
| --- | --- | --- | --- | --- |
| c4.4xlarge | 10 | 120 m | 45% | 1450 |
| c4.4xlarge | 20 | 86 m | 90% | 2500 |
| c4.8xlarge | 20 | 45 m | 42% | 4500 |
| c4.8xlarge | 30 | 36 m | 67% | 5500 |


## People

The author is [Chris Kinsman](https://github.com/chriskinsman) from [PushSpring](http://www.pushspring.com)
Expand Down
59 changes: 39 additions & 20 deletions bin/DynamoDbExportCsv
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ var dynamoDbExport = require('../lib/DynamoDbExportCsv.js');
var commander = require('commander');

commander.version('0.0.1')
.option('-i, --awsid <awsid>', 'AWS Secret ID')
.option('-s, --awssecret <awssecret>', 'AWS Secret Access Key')
.option('-i, --awsid [awsid]', 'AWS Secret ID')
.option('-s, --awssecret [awssecret]', 'AWS Secret Access Key')
.option('-r, --awsregion <awsregion>', 'AWS Region')
.option('-t, --table <tableName>', 'Table to export')
.option('-c, --columns <columns>', 'CSV list of columns')
Expand All @@ -15,6 +15,8 @@ commander.version('0.0.1')
.option('-g, --gzip', 'Gzip output')
.option('-f, --filesize [filesize]', 'Size of each file in Mb. If ommitted 250Mb', parseInt)
.option('-s, --scans [parallelscans]', 'Parallel scans to run - Defaults to 1', parseInt)
.option('-w, --worker [scanindex]', 'Which portion of a parallel scan this worker will do - If specified will only scan this portion of table', parseInt)
.option('-q, --quiet', 'Turn off progress information')
.parse(process.argv);

if (! commander.table)
Expand All @@ -24,20 +26,6 @@ if (! commander.table)
process.exit(1);
}

if(! commander.awsid)
{
console.error("You must specify an aws secret id");
commander.outputHelp();
process.exit(1);
}

if(! commander.awssecret)
{
console.error("You must specify an aws secret access key");
commander.outputHelp();
process.exit(1);
}

if(! commander.awsregion)
{
console.error("You must specify an aws region");
Expand All @@ -61,18 +49,49 @@ if (commander.scans)

var columns = commander.columns.split(',');

console.info("Export DynamoDb table: " + commander.table + ", parallel scans: " + parallelScans);
if(commander.worker!==undefined) {
console.info("Export DynamoDb table: " + commander.table + ", scan index: " + commander.worker);
}
else {
console.info("Export DynamoDb table: " + commander.table + ", parallel scans: " + parallelScans);
}

var exporter = new dynamoDbExport(commander.awsid, commander.awssecret, commander.awsregion);

if(!commander.quiet) {
exporter.on("info", function(message) {
console.info(message);
});

exporter.on("error", function(message) {
console.error(message);
});

exporter.on("throughputexceeded", function() {
console.warn("Throughput exceeded");
});
}

exporter.exportTable(commander.table, columns, parallelScans, commander.gzip, commander.filesize, commander.bucket, commander.path, function(err) {
console.info("Done exporting table");
});
var finished = function(err) {
var workerPrefix = "";
if(commander.worker!==undefined) {
workerPrefix = "Worker [" + commander.worker + "]: ";
}

if(err) {
console.error(workerPrefix + "Finished with errors: ", err);
process.exit(1);
}
else {
console.info(workerPrefix + "Done exporting table");
process.exit(0);
}

}

if(commander.worker!==undefined) {
exporter.exportTableWorker(commander.table, columns, parallelScans, commander.worker, commander.gzip, commander.filesize, commander.bucket, commander.path, finished);
}
else {
exporter.exportTable(commander.table, columns, parallelScans, commander.gzip, commander.filesize, commander.bucket, commander.path, finished);
}
99 changes: 93 additions & 6 deletions lib/DynamoDbExportCsv.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ var csv = require('fast-csv');
var fs = require('fs');
var util = require('util');
var zlib = require('zlib');
var path = require('path');
var child_process = require('child_process');
var _ = require('underscore');
var s3StreamUpload = require('s3-stream-upload');

Expand All @@ -14,10 +16,12 @@ var EventEmitter = require('events').EventEmitter;
function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
// Events this emits
var infoEvent = 'info';
var errorEvent = 'error';
var throughputExceededEvent = 'throughputexceeded';
var _awsAccessKeyId = awsAccessKeyId;
var _awsSecretAccessKey = awsSecretAccessKey;
var _awsRegion = awsRegion;
var _args = [];

// Save off reference to this for later
var self = this;
Expand All @@ -26,7 +30,22 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
EventEmitter.call(this);

// Configure dynamoDb
AWS.config.update({accessKeyId: _awsAccessKeyId, secretAccessKey: _awsSecretAccessKey, region: _awsRegion});
var config = { region: _awsRegion};
_args.push('--awsregion');
_args.push(_awsRegion);
if(_awsAccessKeyId) {
config.accessKeyId = _awsAccessKeyId;
_args.push('--awsid');
_args.push(_awsAccessKeyId);
}

if(_awsSecretAccessKey) {
config.secretAccessKey = _awsSecretAccessKey;
_args.push('--awssecret');
_args.push(_awsSecretAccessKey);
}
AWS.config.update(config);

var dynamodb = new AWS.DynamoDB({maxRetries: 20});
var s3 = new AWS.S3();

Expand Down Expand Up @@ -70,6 +89,7 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
};

var csvStream;
var backoff = 1;
// Count of files used to increment number in filename for each file
var fileCount = 0;
async.doWhilst(
Expand Down Expand Up @@ -126,7 +146,9 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
{
self.emit(throughputExceededEvent);
// Wait at least one second before the next query
setTimeout(function() { return done(null); }, 1000);
setTimeout(function() { return done(null); }, backoff *1000);
// Increment backoff
backoff *= 2;
}
else
{
Expand All @@ -135,6 +157,8 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
}
else
{
// Reset backoff
backoff = 1;
if(data) {
// Grab the key to start the next scan on
query.ExclusiveStartKey = data.LastEvaluatedKey;
Expand Down Expand Up @@ -172,7 +196,7 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
});
},
function() {
self.emit(infoEvent, "Segment: " + segment + ", Row: " + rowCount + ", Mb: " + writableStream.bytesWritten / 1024 / 1024);
self.emit(infoEvent, "Row: " + rowCount + ", Mb: " + (writableStream.bytesWritten / 1024 / 1024).toFixed(2));
// Keep going if there is more data and we haven't exceeded the file size
return query.ExclusiveStartKey && writableStream.bytesWritten < 1024 * 1024 * filesize;
},
Expand Down Expand Up @@ -209,7 +233,7 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
}

// Public export table function
this.exportTable = function(table, columns, totalSegments, compressed, filesize, s3Bucket, s3Path, callback) {
this.exportTable = function exportTable(table, columns, totalSegments, compressed, filesize, s3Bucket, s3Path, callback) {
if(!filesize)
{
filesize = 250;
Expand Down Expand Up @@ -237,14 +261,43 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
// Scan the table
function(done) {
var parallelScanFunctions = [];
var cli = path.join(__dirname, '../bin/DynamoDbExportCsv');
var allWorkerArgs = _.union(_args, ['--table', table, '--columns', columns.join(','), '--scans', totalSegments]);
if(compressed) {
allWorkerArgs.push('--gzip');
}
if(filesize) {
allWorkerArgs.push('--filesize');
allWorkerArgs.push(filesize);
}
if(s3Bucket) {
allWorkerArgs.push('--bucket');
allWorkerArgs.push(s3Bucket);
}
if(s3Path) {
allWorkerArgs.push('--path');
allWorkerArgs.push(s3Path);
}
for(var i = 0; i < totalSegments; i++)
{
parallelScanFunctions.push(
function(segment) {
function(worker) {
return function(done) {
writeTableToCsv(table, columns, totalSegments, segment, compressed, filesize, s3Bucket, s3Path, done);
var args = _.union(allWorkerArgs, ['--worker', worker]);
var child = child_process.spawn(cli,args, {});
var logPrefix = 'Worker [' + worker + ']: ';
child.stdout.on('data', function(data) { self.emit(infoEvent, logPrefix + data.slice(0,-1));});
child.stderr.on('data', function(data) { self.emit(errorEvent, logPrefix + data.slice(0,-1));});
child.on('error', function(err) { self.emit(errorEvent, logPrefix + err);});
child.on('exit', function(code) { done(); });
};
}(i)

// function(segment) {
// return function(done) {
// writeTableToCsv(table, columns, totalSegments, segment, compressed, filesize, s3Bucket, s3Path, done);
// };
// }(i)
);
}

Expand All @@ -253,6 +306,40 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
], callback);
};

// Public export table function
this.exportTableWorker = function exportTableWorker(table, columns, totalSegments, segment, compressed, filesize, s3Bucket, s3Path, callback) {
if(!filesize)
{
filesize = 250;
}

async.series([
// Create a directory based on the table name if one doesn't exist
function(done) {
// Only if we aren't uploading to s3
if(!s3Bucket) {
fs.exists(table, function (exists) {
if (!exists) {
fs.mkdir(table, done);
}
else {
return setImmediate(done);
}
});
}
else
{
return setImmediate(done);
}
},
// Scan the table
function(done) {
writeTableToCsv(table, columns, totalSegments, segment, compressed, filesize, s3Bucket, s3Path, done);
}
], callback);
};


return (this);
}

Expand Down
12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "dynamodbexportcsv",
"version": "0.1.5",
"version": "0.2.0",
"description": "Export DynamoDb tables to CSV",
"repository": {
"type": "git",
"url": "git@github.com:PushSpring/DynamoDbExportCsv.git"
"url": "git@github.com:chriskinsman/DynamoDbExportCsv.git"
},
"bin": {
"DynamoDbExportCsv": "./bin/DynamoDbExportCsv"
Expand All @@ -13,10 +13,10 @@
"author": "Chris Kinsman - PushSpring",
"license": "MIT",
"dependencies": {
"fast-csv": "0.6.0",
"commander": "~2.8.1",
"async": "~1.4.2",
"aws-sdk": "~2.2.4",
"fast-csv": "2.3.0",
"commander": "~2.9.0",
"async": "~2.1.4",
"aws-sdk": "~2.7.20",
"underscore": "~1.8.3",
"s3-stream-upload": "~2.0.0"
}
Expand Down

0 comments on commit 3feb974

Please sign in to comment.