This repository has been archived by the owner on Mar 18, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Crawler.js
74 lines (64 loc) · 2.26 KB
/
Crawler.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* Copyright (c) 2016-present, Absolvent
* All rights reserved.
*
* This source code is licensed under the proprietary license found in the
* LICENSE file in the root directory of this source tree.
*/
'use strict';
const assert = require('chai').assert;
const DataBus = require('./EventEmitter/DataBus');
const ExtractorScheduler = require('./EventEmitter/ExtractorScheduler');
const ExtractorSession = require('./ExtractorSession');
const ExtractorSet = require('./ExtractorSet');
const isNull = require('lodash/isNull');
const Logger = require('./Logger');
const through2 = require('through2');
const UrlListDuplexStream = require('./UrlListDuplexStream');
class Crawler {
constructor(dataBus, extractorSet, logger = new Logger(), extractorScheduler = null) {
if (isNull(extractorScheduler)) {
this.extractorScheduler = new ExtractorScheduler(void 0, logger);
} else {
assert.instanceOf(extractorScheduler, ExtractorScheduler);
this.extractorScheduler = extractorScheduler;
}
assert.instanceOf(dataBus, DataBus);
assert.instanceOf(extractorSet, ExtractorSet);
this.dataBus = dataBus;
this.extractorSet = extractorSet;
}
onUrlListDuplexStreamData(urlListDuplexStream, url, callback) {
return this.extractorSet.findExtractorListForUrl(url)
.then(extractorList => {
for (const extractor of extractorList) {
this.extractorScheduler.schedule(new ExtractorSession(
urlListDuplexStream,
this.dataBus,
extractor,
url
));
}
})
.then(() => this.extractorScheduler.awaitCanRunExtractor())
.then(callback)
;
}
onUrlListDuplexStreamEnd() {
return this.extractorScheduler.flush();
}
run(urlListDuplexStream) {
return new Promise((resolve, reject) => {
assert.instanceOf(urlListDuplexStream, UrlListDuplexStream);
urlListDuplexStream
.pipe(through2.obj((url, encoding, callback) => (
this.onUrlListDuplexStreamData(urlListDuplexStream, url, callback)
)))
.on('end', () => this.onUrlListDuplexStreamEnd().then(resolve))
.on('error', reject)
;
this.extractorScheduler.once(ExtractorScheduler.EVENT_DEPLETED, resolve);
});
}
}
module.exports = Crawler;