-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathfetcher.go
120 lines (103 loc) · 2.84 KB
/
fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// +build fetcherExec
package main
import (
"encoding/base64"
"encoding/json"
"flag"
"github.com/fmpwizard/owlcrawler/couchdb"
log "github.com/golang/glog"
"github.com/nats-io/nats"
"io/ioutil"
"net/http"
"os/user"
"path/filepath"
"time"
)
const fetchQueue = "fetch_url"
const extractQueue = "extract_url"
type dataStore struct {
ID string `json:"_id"`
URL string `json:"url"`
HTML string `json:"html"`
FetchedOn time.Time `json:"fetched_on"`
}
var gnatsdCredentials gnatsdCred
type gnatsdCred struct {
URL string
}
func fetchHTML(url string) {
log.V(2).Infof("Fetching %s\n", url)
nc, err := nats.Connect(gnatsdCredentials.URL)
if err != nil {
log.Fatalf("Could not connect to gnatsd, got: %s\n", err)
}
//Fetch url
client := &http.Client{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Errorf("Error parsing url: %s, got: %v\n", url, err)
}
req.Header.Set("User-Agent", "OwlCrawler - https://github.com/fmpwizard/owlcrawler")
resp, err := client.Do(req)
if err != nil {
log.Errorf("Error while fetching url: %s, got error: %v\n", url, err)
return
}
defer resp.Body.Close()
htmlData, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Errorf("Error while reading html for url: %s, got error: %v\n", url, err)
return
}
data := &dataStore{
ID: base64.URLEncoding.EncodeToString([]byte(url)),
URL: url,
HTML: string(htmlData[:]),
FetchedOn: time.Now().UTC(),
}
pageData, err := json.Marshal(data)
if err != nil {
log.Errorf("Error generating json to save in database, got: %v\n", err)
}
ret, err := couchdb.AddURLData(url, pageData, false)
if err == nil {
//Send fethed url to parse queue
err := nc.Publish(extractQueue, []byte(ret.ID))
if err != nil {
log.Errorf("Failed to push %s to extract queue\n", url)
}
}
log.V(2).Infof("Finished getting %s", url)
}
func main() {
log.V(1).Infof("Starting Fetcher.")
nc, _ := nats.Connect(gnatsdCredentials.URL)
sub, err := nc.QueueSubscribeSync(fetchQueue, "fetch-pool")
if err != nil {
log.Fatalf("Error while subscribing to fetch_url, got %s\n", err)
}
for {
if payload, err := sub.NextMsg(30 * time.Second); err == nil {
if couchdb.ShouldURLBeFetched(string(payload.Data[:])) {
//TODO implement a distributed tick, so you can have 100 fetchers
//and you don't all go at the same time, in 5 sec intervals
<-time.Tick(5 * time.Second)
fetchHTML(string(payload.Data[:]))
}
}
}
}
func init() {
flag.Parse()
if u, err := user.Current(); err == nil {
path := filepath.Join(u.HomeDir, ".gnatsd.json")
content, err := ioutil.ReadFile(path)
if err != nil {
log.Fatalf("Error reading gnatds user file, got: %v\n", err)
}
err = json.Unmarshal(content, &gnatsdCredentials)
if err != nil {
log.Fatalf("Invalid gnatsd credentials file, got: %v\n", err)
}
}
}