-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
78 lines (65 loc) · 1.56 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main
import (
"crawler/engine"
"crawler/scheduler"
"crawler/zhenai/parser"
itemsaver "rpc-crawler/persist/client"
"fmt"
crawler_config "crawler/config"
worker "rpc-crawler/worker/client"
"net/rpc"
"rpc-crawler/rpcsupport"
"log"
"flag"
"strings"
)
var(
itemSaverHost = flag.String("itemsaver_hosts", "", "itemsaver host")
workerHosts = flag.String("worker_hosts", "", "worker hosts(comma separated)")
)
func main() {
flag.Parse()
// use distributed itemsaver
itemChan, err := itemsaver.ItemSaver(fmt.Sprintf("%s", *itemSaverHost))
if err != nil{
panic(err)
}
pool := createClientPool(strings.Split(*workerHosts, ","))
processor := worker.CreateProcessor(pool)
e := engine.ConcurrentEngine{
Scheduler: &scheduler.QueueScheduler{},
WorkerCount: 100,
ItemChan: itemChan,
RequestProcessor: processor,
}
//e := engine.SimpleEngine{}
e.Run(engine.Request{
Url: "http://www.zhenai.com/zhenghun",
Parser: engine.NewFuncParser(parser.ParseCityList, crawler_config.ParseCityList),
})
//e.Run(engine.Request{
// Url: "http://www.zhenai.com/zhenghun/aba",
// ParserFunc:parser.ParseCity,
//})
}
func createClientPool(hosts []string) chan *rpc.Client{
var clients []*rpc.Client
for _,h := range hosts{
client, err := rpcsupport.NewClient(h)
if err == nil{
clients = append(clients, client)
log.Printf("Connected to %s", h)
}else{
log.Printf("Error connected to %s: %v", h, err)
}
}
out := make(chan *rpc.Client)
go func() {
for{
for _, client := range clients{
out <- client
}
}
}()
return out
}