@@ -13,7 +13,16 @@ import com.github.jaitl.crawler.models.task.Task
13
13
import com .github .jaitl .crawler .models .worker .WorkerManager .TasksBatchProcessResult
14
14
import com .github .jaitl .crawler .worker .crawler .CrawlResult
15
15
import com .github .jaitl .crawler .worker .creator .TwoArgumentActorCreator
16
- import com .github .jaitl .crawler .worker .executor .SaveCrawlResultController .{AddResults , BannedTask , FailedTask , FailureSaveResults , SaveCrawlResultControllerConfig , SaveResults , SkippedTask , SuccessAddedResults , SuccessCrawledTask , SuccessSavedResults }
16
+ import com .github .jaitl .crawler .worker .executor .SaveCrawlResultController .AddResults
17
+ import com .github .jaitl .crawler .worker .executor .SaveCrawlResultController .BannedTask
18
+ import com .github .jaitl .crawler .worker .executor .SaveCrawlResultController .FailedTask
19
+ import com .github .jaitl .crawler .worker .executor .SaveCrawlResultController .FailureSaveResults
20
+ import com .github .jaitl .crawler .worker .executor .SaveCrawlResultController .SaveCrawlResultControllerConfig
21
+ import com .github .jaitl .crawler .worker .executor .SaveCrawlResultController .SaveResults
22
+ import com .github .jaitl .crawler .worker .executor .SaveCrawlResultController .SkippedTask
23
+ import com .github .jaitl .crawler .worker .executor .SaveCrawlResultController .SuccessAddedResults
24
+ import com .github .jaitl .crawler .worker .executor .SaveCrawlResultController .SuccessCrawledTask
25
+ import com .github .jaitl .crawler .worker .executor .SaveCrawlResultController .SuccessSavedResults
17
26
import com .github .jaitl .crawler .worker .parser .ParseResult
18
27
import com .github .jaitl .crawler .worker .pipeline .Pipeline
19
28
import com .github .jaitl .crawler .worker .scheduler .Scheduler
@@ -28,13 +37,17 @@ class SaveCrawlResultController[T](
28
37
queueTaskBalancer : ActorRef ,
29
38
tasksBatchController : ActorRef ,
30
39
saveScheduler : Scheduler ,
31
- config : SaveCrawlResultControllerConfig
32
- ) extends Actor with ActorLogging with Stash {
33
- private implicit val executionContext : ExecutionContext = context.dispatcher
34
-
35
- var successTasks : mutable.Seq [SuccessCrawledTask ] = mutable.ArraySeq .empty[SuccessCrawledTask ]
40
+ config : SaveCrawlResultControllerConfig )
41
+ extends Actor
42
+ with ActorLogging
43
+ with Stash {
44
+ implicit private val executionContext : ExecutionContext = context.dispatcher
45
+
46
+ var successTasks : mutable.Seq [SuccessCrawledTask ] =
47
+ mutable.ArraySeq .empty[SuccessCrawledTask ]
36
48
var failedTasks : mutable.Seq [FailedTask ] = mutable.ArraySeq .empty[FailedTask ]
37
- var skippedTasks : mutable.Seq [SkippedTask ] = mutable.ArraySeq .empty[SkippedTask ]
49
+ var skippedTasks : mutable.Seq [SkippedTask ] =
50
+ mutable.ArraySeq .empty[SkippedTask ]
38
51
var bannedTasks : mutable.Seq [BannedTask ] = mutable.ArraySeq .empty[BannedTask ]
39
52
40
53
override def preStart (): Unit = {
@@ -43,7 +56,7 @@ class SaveCrawlResultController[T](
43
56
saveScheduler.schedule(config.saveInterval, self, SaveResults )
44
57
}
45
58
46
- override def receive : Receive = addResultHandler orElse waitSave
59
+ override def receive : Receive = addResultHandler. orElse( waitSave)
47
60
48
61
private def addResultHandler : Receive = {
49
62
case AddResults (result) =>
@@ -73,7 +86,8 @@ class SaveCrawlResultController[T](
73
86
case SaveResults =>
74
87
context.become(saveResultHandler)
75
88
76
- val parserResults = successTasks.flatMap(_.parseResult).map(_.parsedData.asInstanceOf [T ])
89
+ val parserResults =
90
+ successTasks.flatMap(_.parseResult).map(_.parsedData.asInstanceOf [T ])
77
91
val rawResult = successTasks.map(r => (r.task, r.crawlResult))
78
92
79
93
val saveFuture : Future [SaveResults ] = for {
@@ -91,7 +105,7 @@ class SaveCrawlResultController[T](
91
105
case ex : Exception => FailureSaveResults (ex)
92
106
}
93
107
94
- recoveredSaveFuture pipeTo self
108
+ recoveredSaveFuture. pipeTo( self)
95
109
}
96
110
97
111
private def saveResultHandler : Receive = {
@@ -103,8 +117,11 @@ class SaveCrawlResultController[T](
103
117
val failureIds = failedTasks.map(_.task.id)
104
118
val skippedIds = skippedTasks.map(_.task.id)
105
119
val bannedIds = bannedTasks.map(_.task.id)
106
- val newCrawlTasks = successTasks.flatMap(_.parseResult.map(_.newCrawlTasks).getOrElse(Seq .empty))
107
- val newTasks = newCrawlTasks.groupBy(_.taskType)
120
+ val newCrawlTasks = successTasks.flatMap(
121
+ _.parseResult.map(_.newCrawlTasks).getOrElse(Seq .empty)
122
+ )
123
+ val newTasks = newCrawlTasks
124
+ .groupBy(_.taskType)
108
125
.map {
109
126
case (taskType, vals) =>
110
127
val newTasks = vals.flatMap(_.tasks).distinct
@@ -150,7 +167,11 @@ object SaveCrawlResultController {
150
167
case object SuccessAddedResults
151
168
152
169
trait CrawlTaskResult
153
- case class SuccessCrawledTask (task : Task , crawlResult : CrawlResult , parseResult : Option [ParseResult [_]]) extends CrawlTaskResult
170
+ case class SuccessCrawledTask (
171
+ task : Task ,
172
+ crawlResult : CrawlResult ,
173
+ parseResult : Option [ParseResult [_]]
174
+ ) extends CrawlTaskResult
154
175
case class FailedTask (task : Task , t : Throwable ) extends CrawlTaskResult
155
176
case class SkippedTask (task : Task , t : Throwable ) extends CrawlTaskResult
156
177
case class BannedTask (task : Task , t : Throwable ) extends CrawlTaskResult
@@ -162,14 +183,16 @@ object SaveCrawlResultController {
162
183
queueTaskBalancer : ActorRef ,
163
184
tasksBatchController : ActorRef ,
164
185
saveScheduler : Scheduler ,
165
- config : SaveCrawlResultControllerConfig
166
- ): Props = Props (new SaveCrawlResultController (
167
- pipeline = pipeline,
168
- queueTaskBalancer = queueTaskBalancer,
169
- tasksBatchController = tasksBatchController,
170
- saveScheduler = saveScheduler,
171
- config = config
172
- ))
186
+ config : SaveCrawlResultControllerConfig ): Props =
187
+ Props (
188
+ new SaveCrawlResultController (
189
+ pipeline = pipeline,
190
+ queueTaskBalancer = queueTaskBalancer,
191
+ tasksBatchController = tasksBatchController,
192
+ saveScheduler = saveScheduler,
193
+ config = config
194
+ )
195
+ )
173
196
174
197
def name (): String = " saveCrawlResultController"
175
198
}
@@ -179,16 +202,17 @@ private[worker] class SaveCrawlResultControllerCreator(
179
202
saveScheduler : Scheduler ,
180
203
config : SaveCrawlResultControllerConfig
181
204
) extends TwoArgumentActorCreator [Pipeline [_], ActorRef ] {
182
- override def create (factory : ActorRefFactory , firstArg : Pipeline [_], secondArg : ActorRef ): ActorRef = {
205
+ override def create (factory : ActorRefFactory , firstArg : Pipeline [_], secondArg : ActorRef ): ActorRef =
183
206
factory.actorOf(
184
- props = SaveCrawlResultController .props(
185
- pipeline = firstArg,
186
- queueTaskBalancer = queueTaskBalancer,
187
- tasksBatchController = secondArg,
188
- saveScheduler = saveScheduler,
189
- config = config
190
- ).withDispatcher(" worker.blocking-io-dispatcher" ),
207
+ props = SaveCrawlResultController
208
+ .props(
209
+ pipeline = firstArg,
210
+ queueTaskBalancer = queueTaskBalancer,
211
+ tasksBatchController = secondArg,
212
+ saveScheduler = saveScheduler,
213
+ config = config
214
+ )
215
+ .withDispatcher(" worker.blocking-io-dispatcher" ),
191
216
name = SaveCrawlResultController .name()
192
217
)
193
- }
194
218
}
0 commit comments