-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstreams.js
106 lines (99 loc) · 2.32 KB
/
streams.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
const fs = require('fs');
const { Transform,Writable } = require("stream");
const { nodeParser } = require('./parser');
const saxstream = require('./sax-stream');
function streams(config){
let dbcon = config.dbcon;
const transformStream = new Transform({
objectMode:true,
transform(node,encoding,done){
let newnode = nodeParser.createNode(node);
this.push(newnode);
done();
}
});
class BatchStream extends Transform{
constructor(...args){
super(...args);
this.batchBuffer = [];
this.batchSize = config.batchSize;
}
_transform(node,encoding,done){
this.batchBuffer.push(node);
if(this.batchBuffer.length >= this.batchSize){
this.push(this.batchBuffer);
this.batchBuffer=[];
}
done();
}
_flush(done){
if(this.batchBuffer.length){
this.push(this.batchBuffer);
this.batchBuffer=[];
}
done();
}
}
const batchStream = new BatchStream({
objectMode:true,
});
const groupStream = new Transform({
objectMode:true,
transform(batch,encoding,done){
let grouped = {};
batch.forEach(function(node){
grouped[node.nodetype] = (grouped[node.nodetype] || [])
grouped[node.nodetype].push(node);
});
this.push(grouped);
done();
}
});
const progressStream = new Transform({
objectMode:true,
transform(node,encoding,done){
this.push('.');
done();
}
});
const dbStream = new Writable({
objectMode:true,
write(groupnodes,encoding,callback){
let promises = [];
for(let dbname in groupnodes){
promises.push(dbcon.writeToMongo(dbname,groupnodes[dbname]));
}
Promise.all(promises)
.then(function(res){
//debug(res);
callback(null,res);
})
.catch(function(err){
console.log(err);
callback();
});
},
});
const saxStream = saxstream({
strict:true,
tag:['node','way','relation'],
//highWaterMark:2
});
const inputStream = fs.createReadStream(
config.inputfile,{
//highWaterMark:10
}
);
return {
inputStream,
saxStream,
dbStream,
progressStream,
groupStream,
batchStream,
transformStream
}
}
module.exports = function(config){
return streams(config);
}