1
1
package us .codecraft .webmagic .scheduler ;
2
2
3
- import java .io .BufferedReader ;
4
- import java .io .Closeable ;
5
- import java .io .File ;
6
- import java .io .FileNotFoundException ;
7
- import java .io .FileReader ;
8
- import java .io .FileWriter ;
9
- import java .io .IOException ;
10
- import java .io .PrintWriter ;
11
- import java .util .LinkedHashSet ;
12
- import java .util .Set ;
13
- import java .util .concurrent .BlockingQueue ;
14
- import java .util .concurrent .Executors ;
15
- import java .util .concurrent .LinkedBlockingQueue ;
16
- import java .util .concurrent .ScheduledExecutorService ;
17
- import java .util .concurrent .TimeUnit ;
18
- import java .util .concurrent .atomic .AtomicBoolean ;
19
- import java .util .concurrent .atomic .AtomicInteger ;
20
-
21
- import org .apache .commons .io .IOUtils ;
22
3
import org .apache .commons .lang3 .math .NumberUtils ;
23
-
24
4
import us .codecraft .webmagic .Request ;
25
5
import us .codecraft .webmagic .Task ;
26
- import us .codecraft .webmagic .scheduler .component .DuplicateRemover ;
6
+
7
+ import java .io .*;
8
+ import java .util .concurrent .*;
9
+ import java .util .concurrent .atomic .AtomicBoolean ;
10
+ import java .util .concurrent .atomic .AtomicInteger ;
27
11
28
12
29
13
/**
32
16
* @author code4crafter@gmail.com <br>
33
17
* @since 0.2.0
34
18
*/
35
- public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler ,Closeable {
19
+ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler , Closeable {
36
20
37
21
private String filePath = System .getProperty ("java.io.tmpdir" );
38
22
@@ -52,8 +36,6 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement
52
36
53
37
private BlockingQueue <Request > queue ;
54
38
55
- private Set <String > urls ;
56
-
57
39
private ScheduledExecutorService flushThreadPool ;
58
40
59
41
public FileCacheQueueScheduler (String filePath ) {
@@ -83,36 +65,13 @@ private void init(Task task) {
83
65
}
84
66
85
67
private void initDuplicateRemover () {
86
- setDuplicateRemover (
87
- new DuplicateRemover () {
88
- @ Override
89
- public boolean isDuplicate (Request request , Task task ) {
90
- if (!inited .get ()) {
91
- init (task );
92
- }
93
- return !urls .add (request .getUrl ());
94
- }
95
-
96
- @ Override
97
- public void resetDuplicateCheck (Task task ) {
98
- urls .clear ();
99
- }
100
-
101
- @ Override
102
- public int getTotalRequestsCount (Task task ) {
103
- return urls .size ();
104
- }
105
- });
68
+ BloomFilterDuplicateRemover bloomFilterDuplicateRemover = new BloomFilterDuplicateRemover (this .filePath .hashCode ());
69
+ setDuplicateRemover (bloomFilterDuplicateRemover );
106
70
}
107
71
108
72
private void initFlushThread () {
109
- flushThreadPool = Executors .newScheduledThreadPool (1 );
110
- flushThreadPool .scheduleAtFixedRate (new Runnable () {
111
- @ Override
112
- public void run () {
113
- flush ();
114
- }
115
- }, 10 , 10 , TimeUnit .SECONDS );
73
+ flushThreadPool = Executors .newScheduledThreadPool (1 );
74
+ flushThreadPool .scheduleAtFixedRate (this ::flush , 10 , 10 , TimeUnit .SECONDS );
116
75
}
117
76
118
77
private void initWriter () {
@@ -127,7 +86,6 @@ private void initWriter() {
127
86
private void readFile () {
128
87
try {
129
88
queue = new LinkedBlockingQueue <Request >();
130
- urls = new LinkedHashSet <String >();
131
89
readCursorFile ();
132
90
readUrlFile ();
133
91
// initDuplicateRemover();
@@ -140,46 +98,43 @@ private void readFile() {
140
98
}
141
99
142
100
private void readUrlFile () throws IOException {
143
- String line ;
144
- BufferedReader fileUrlReader = null ;
145
- try {
146
- fileUrlReader = new BufferedReader (new FileReader (getFileName (fileUrlAllName )));
101
+ try (BufferedReader fileUrlReader = new BufferedReader (new FileReader (getFileName (fileUrlAllName )))) {
102
+ String line ;
147
103
int lineReaded = 0 ;
148
104
while ((line = fileUrlReader .readLine ()) != null ) {
149
- urls .add (line .trim ());
105
+ Request request = deserializeRequest (line );
106
+ this .getDuplicateRemover ().isDuplicate (request , null );
150
107
lineReaded ++;
151
108
if (lineReaded > cursor .get ()) {
152
- queue .add (deserializeRequest ( line ) );
109
+ queue .add (request );
153
110
}
154
111
}
155
- } finally {
156
- if (fileUrlReader != null ) {
157
- IOUtils .closeQuietly (fileUrlReader );
158
- }
159
112
}
160
113
}
161
114
162
115
private void readCursorFile () throws IOException {
163
- BufferedReader fileCursorReader = null ;
164
- try {
165
- fileCursorReader = new BufferedReader (new FileReader (getFileName (fileCursor )));
116
+ String fileName = getFileName (fileCursor );
117
+ try (BufferedReader fileCursorReader = new BufferedReader (new FileReader (fileName ))) {
166
118
String line ;
119
+ String lastLine = null ;
167
120
//read the last number
168
121
while ((line = fileCursorReader .readLine ()) != null ) {
169
- cursor = new AtomicInteger (NumberUtils .toInt (line ));
122
+ line = line .trim ();
123
+ if (!line .isEmpty ()) {
124
+ lastLine = line ;
125
+ }
170
126
}
171
- } finally {
172
- if (fileCursorReader != null ) {
173
- IOUtils .closeQuietly (fileCursorReader );
127
+ if (lastLine != null ) {
128
+ cursor .set (NumberUtils .toInt (line ));
174
129
}
175
130
}
176
131
}
177
-
132
+
178
133
public void close () throws IOException {
179
- flushThreadPool .shutdown ();
180
- fileUrlWriter .close ();
181
- fileCursorWriter .close ();
182
- }
134
+ flushThreadPool .shutdown ();
135
+ fileUrlWriter .close ();
136
+ fileCursorWriter .close ();
137
+ }
183
138
184
139
private String getFileName (String filename ) {
185
140
return filePath + task .getUUID () + filename ;
0 commit comments