-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdynamodb2mysql.js
159 lines (147 loc) · 4.19 KB
/
dynamodb2mysql.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
var program = require('commander');
var config = require('./config.json');
var aws = require('aws-sdk');
aws.config.update({
accessKeyId: config.aws.key,
secretAccessKey: config.aws.secret,
region: config.aws.region
});
var ddb = new aws.DynamoDB();
program.version('0.0.1')
.option('-s, --source [tablename]', 'The source (DynamoDB) table')
.option('-d, --destination [tablename]', 'The destination (MySQL) table')
.option('--destroy', 'Destroy source (DynamoDB) table on transfer complete')
.option('--notify', 'Notify by Amazon SES on transfer complete')
.option('-e, --execute [filename]', 'The SQL file will be executed on transfer complete')
.parse(process.argv);
if (!program.source || !program.destination) {
console.log('You must specify a source and destination tables');
program.outputHelp();
process.exit(1);
};
var mysql = require('mysql');
var connection = mysql.createConnection({
host: config.mysql.host,
user: config.mysql.user,
password: config.mysql.password,
database: config.mysql.database,
multipleStatements: true
});
var getSubject = function() {
return 'Transfer from "' + program.source + '"" to "' + program.destination + '"';
}
var sendNotify = function (subject, message) {
var params = {
Source: config.notify.sender,
Destination: {
ToAddresses: [
config.notify.recipient
]
},
Message: {
Subject: {
Data: subject
},
Body: {
Html: {
Data: message
}
}
}
};
var ses = new aws.SES();
ses.sendEmail(params, function (err, data) {
if (err) {
console.dir(err);
throw err;
};
});
};
var transfer = function (items) {
for (index in items) {
var data = {};
for (var propertyName in items[index]) {
var value = (items[index][propertyName].N) ? items[index][propertyName].N : String(items[index][propertyName].S);
data[propertyName.replace(' ', '')] = value.replace(/\'/g, "'\\''");
}
var statement = connection.query('INSERT INTO ' + program.destination + ' SET ?', data, function (err, result) {
if (err) {
sendNotify('DynamoDB2MYSQL Notification', getSubject() + ' Exception');
console.dir(err);
//throw err;
}
});
console.log(statement.sql + ";");
};
};
var transferComplete = function () {
connection.end(function() {
if (program.execute) {
var fs = require('fs');
require.extensions['.sql'] = function (module, filename) {
module.exports = fs.readFileSync(filename, 'utf8');
};
var sql_instructions = require('./' + program.execute);
var connection_2 = mysql.createConnection({
host: config.mysql.host,
user: config.mysql.user,
password: config.mysql.password,
database: config.mysql.database,
multipleStatements: true
});
var statement_2 = connection_2.query(sql_instructions, {}, function (err, result) {
connection_2.end();
if (err) {
sendNotify('DynamoDB2MYSQL Notification', getSubject() + ' SQL Execution Exception');
console.dir(err);
throw err;
}
});
};
if (program.destroy) {
ddb.deleteTable({
TableName: program.source
}, function (err, data) {
if (err) {
sendNotify('DynamoDB2MYSQL Notification', 'deleteTable Exception');
console.dir(err);
throw err;
} else if (program.notify)
sendNotify('DynamoDB2MYSQL Notification', getSubject() + ' & source destroy complete');
})
} else if (program.notify) {
sendNotify('DynamoDB2MYSQL Notification', getSubject() + ' complete');
}
});
};
var scan = function (query) {
ddb.scan(query, function (err, data) {
if (err) {
sendNotify('DynamoDB2MYSQL Notification', getSubject() + ' Scan Exception');
console.dir(err);
throw err;
} else {
transfer(data.Items);
if (data.LastEvaluatedKey) {
query.ExclusiveStartKey = data.LastEvaluatedKey;
scan(query);
} else
transferComplete();
};
});
};
var query = {
'TableName': program.source
};
ddb.describeTable(query, function (err, data) {
if (err) {
sendNotify('DynamoDB2MYSQL Notification', getSubject() + ' describeTable Exception');
console.dir(err);
throw err;
}
if (data == null) {
throw 'Table ' + program.source + ' not found in DynamoDB';
}
query.Limit = config.aws.limit ? config.aws.limit : data.Table.ProvisionedThroughput.ReadCapacityUnits;
scan(query);
});