forked from onetwotrip/simple-riak-migrator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttp_dump.js
81 lines (71 loc) · 2.32 KB
/
http_dump.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/usr/bin/env node
const request = require('requestretry');
const fs = require('fs');
const through2 = require('through2');
const keysPath = __dirname + '/keys';
const LimitedParallelStream = require('./src/modules/limited_parallel_stream').LimitedParallelStream;
const byline = require('byline');
const program = require('commander');
const path = require('path');
program
.version('0.0.1')
.usage('[options] bucketName')
.option('-H, --host [host]', 'specify the host (default: http://127.0.0.1)', 'http://127.0.0.1')
.option('-p, --port [port]', 'specify the post (default: 8098)', 8098)
.option('-c, --concurrency [concurrency]', 'specify the concurrency (default: 100)', 100)
.option('-f, --file [file]', 'specify the folder of dump (default: __dirname + /dump)', __dirname + '/dump')
.parse(process.argv);
if(!program.args.length) {
console.log('\nBucket is required!');
program.help();
}
const bucket = program.args[0];
const baseUrl = `${program.host}:${program.port}/riak/${bucket}/`;
/**
* @TODO: Move this to dump.js
*/
console.info('Dump started ^_^');
request({url: `${baseUrl}?keys=stream`})
.pipe(through2({ objectMode: true, allowHalfOpen: false, highWaterMark: 50, bufferSize: 30 }, function(chunk, enc, cb){
let data;
try{
data = JSON.parse(chunk.toString());
}
catch(e) {
console.log('Not valid JSON', chunk.toString());
return cb();
}
if(data.keys && data.keys.length > 0) {
this.push(data.keys.join('\n') + '\n');
}
cb()
}))
.pipe(fs.createWriteStream(keysPath))
.on('finish', () =>{
console.info('\tKeys saved on disk ✔');
//Start saving all data
let stream = fs.createReadStream(keysPath);
stream = byline.createStream(stream);
stream
.pipe(new LimitedParallelStream(program.concurrency, function(key, enc, done){
const url = `${baseUrl}${key}`;
request({url: url}, (err, data) =>{
if(!err && data.statusCode === 200) {
this.push(`${key}\t${data.body}\t${data.headers['content-type']}\n`);
} else {
console.log(url, err ? err.message : '', 'statusCode:', data ? data.statusCode : 200);
}
done()
});
}))
.pipe(fs.createWriteStream(path.resolve(program.file)))
.on('finish', () =>{
console.info('\tAll data saved on disk ✔')
})
.on('error', (err) =>{
console.log(err)
})
})
.on('error', (e) =>{
console.log(e)
});