Skip to content

Commit

Permalink
Merge pull request #1 from PushSpring/updates3stream
Browse files Browse the repository at this point in the history
Fix memory growth issue when using --gzip option
  • Loading branch information
chriskinsman committed Sep 24, 2015
2 parents ea9cec7 + 9074961 commit ce8bc2c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 35 deletions.
69 changes: 38 additions & 31 deletions lib/DynamoDbExportCsv.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
// Configure dynamoDb
AWS.config.update({accessKeyId: _awsAccessKeyId, secretAccessKey: _awsSecretAccessKey, region: _awsRegion});
var dynamodb = new AWS.DynamoDB({maxRetries: 20});
var s3 = new AWS.S3();

// Total row count
var rowCount = 0;
Expand All @@ -39,7 +40,7 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
row[key] = value.S || value.N;
});

stream.write(row);
return stream.write(row);
}

// Writes out an item and ensures that every specified column
Expand All @@ -57,7 +58,7 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
}
});

stream.write(row);
return stream.write(row);
}

// Does the real work of writing the table to a CSV file
Expand Down Expand Up @@ -86,20 +87,13 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
var writableStream;
if(s3Bucket)
{
var s3stream = s3StreamUpload({
accessKeyId: _awsAccessKeyId,
secretAccessKey: _awsSecretAccessKey,
region: _awsRegion,
Bucket: s3Bucket
});

var filePath = '';
if(s3Path)
{
filePath += s3Path + "/";
}
filePath += table + "/" + fileName;
writableStream = s3stream({Key: filePath});
writableStream = s3StreamUpload(s3, {Bucket: s3Bucket, Key: filePath}, {concurrent: totalSegments});
self.emit(infoEvent, "Starting new file: s3://" + s3Bucket + "/" + filePath);
}
else
Expand All @@ -108,8 +102,6 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
self.emit(infoEvent, "Starting new file: " + fileName);
}



// If we are compressing pipe it through gzip
if(compressed)
{
Expand All @@ -125,25 +117,8 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
// Repeatedly scan dynamodb until there are no more rows
async.doWhilst(
function(done) {
var noDrainRequired = false;
dynamodb.scan(query, function(err, data) {
if(data) {
_.each(data.Items, function (item) {
if(fileRowCount===0)
{
writeItemWithHeaders(csvStream, item, columns);
}
else
{
writeItemWithoutHeaders(csvStream, item);
}
fileRowCount++;
rowCount++;
});

// Grab the key to start the next scan on
query.ExclusiveStartKey = data.LastEvaluatedKey;
}

if(err)
{
// Check for throughput exceeded
Expand All @@ -160,7 +135,39 @@ function DynamoDBExportCSV(awsAccessKeyId, awsSecretAccessKey, awsRegion) {
}
else
{
return setImmediate(function() { done(null); });
if(data) {
// Grab the key to start the next scan on
query.ExclusiveStartKey = data.LastEvaluatedKey;

async.eachSeries(data.Items, function (item, done) {
if(fileRowCount===0)
{
noDrainRequired = writeItemWithHeaders(csvStream, item, columns);
}
else
{
noDrainRequired = writeItemWithoutHeaders(csvStream, item);
}
fileRowCount++;
rowCount++;

// Check if we need to drain to avoid bloating memory
if(!noDrainRequired) {
csvStream.once('drain', function() {
return setImmediate(function() { done(null); });
})
}
else {
return setImmediate(function() { done(null); });
}

}, function(err) {
return setImmediate(function() { done(null); });
});
}
else {
return setImmediate(function() { done(null); });
}
}
});
},
Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dynamodbexportcsv",
"version": "0.1.4",
"version": "0.1.5",
"description": "Export DynamoDb tables to CSV",
"repository": {
"type": "git",
Expand All @@ -15,9 +15,9 @@
"dependencies": {
"fast-csv": "0.6.0",
"commander": "~2.8.1",
"async": "~0.9.0",
"aws-sdk": "~2.1.39",
"async": "~1.4.2",
"aws-sdk": "~2.2.4",
"underscore": "~1.8.3",
"s3-stream-upload": "git+ssh://git@github.com:PushSpring/s3-stream-upload.git#master"
"s3-stream-upload": "~2.0.0"
}
}

0 comments on commit ce8bc2c

Please sign in to comment.