forked from eskerda/citybikes-gyro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtasks.py
121 lines (110 loc) · 4.22 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from redis import Redis, ConnectionPool
from rq import Queue
from rq_scheduler import Scheduler
from datetime import datetime
import random
import pybikes
from gyro.configuration import db_credentials as credentials
from gyro.configuration import redis_server
from gyro.configuration import proxify as proxify_list
from pymongo import Connection
from gyro.models import StationDocument, SystemDocument, Stat, StatDocument
connection = Connection(credentials['host'], credentials['port'])
db = getattr(connection, credentials['database'])
pool = ConnectionPool(
host=redis_server['host'],
port=redis_server['port'],
db=redis_server['db']
)
redis_connection = Redis(connection_pool = pool)
q_medium = Queue('medium', connection = redis_connection)
q_high = Queue('high', connection = redis_connection)
scheduler = Scheduler('medium', connection = redis_connection)
scheduler_high = Scheduler('high', connection = redis_connection)
scraper = pybikes.utils.PyBikesScraper()
scraper.setProxies({
"http": "http://127.0.0.1:8118",
"https":"http://127.0.0.1:8118"}
)
scraper.setUserAgent('PyBikes™ v0.2')
def syncSystem(scheme, system, key = None):
sys = pybikes.getBikeShareSystem(scheme, system, key)
sysDoc = SystemDocument(db, connection, scheme, sys)
sysDoc.save()
syncStations(sys, True)
def syncStation(station_chunk, tag, resync = False):
print "Processing chunk"
for station in station_chunk:
try:
station.update(scraper)
except Exception:
print "Got an error, enabling proxy just for the lulz.."
scraper.enableProxy()
station.update(scraper)
sDoc = StationDocument(db, connection, station, tag)
if resync or sDoc.find({'_id': station.get_hash()}).count() == 0:
# Save this unsynched station
print "Saving STATION %s.%s" % (tag, station.get_hash())
sDoc.save()
# Update the stat...
print "Adding STAT for %s.%s" % (tag, station.get_hash())
stat = Stat(station)
statDoc = StatDocument(db, connection, stat)
statDoc.save()
def syncStations(system, resync = False, reschedule = False, proxify = False):
if proxify or (
'system' in system.meta and system.meta['system'] in proxify_list
):
print "System in proxify list, proxifying!"
scraper.enableProxy()
try:
system.update(scraper)
if len(system.stations) == 0:
raise Exception
except Exception:
print "Got an error updating, enabling proxy for the time being"
scraper.enableProxy()
system.update(scraper)
#Async stations in parallel...
print "Generating chunks..."
chunks = [system.stations[i:i+10] for i in range(0, len(system.stations), 10)]
print "%d chunks!" % len(chunks)
for station_chunk in chunks:
if system.sync:
syncStation(station_chunk, system.tag, resync)
else:
if reschedule:
scheduler_high.schedule(
scheduled_time = datetime.now(),
func = syncStation,
args = (station_chunk, system.tag, resync,),
interval = 300,
repeat = None
)
else:
q_high.enqueue_call(
func = syncStation,
args = (station_chunk, system.tag, resync,),
timeout = 240
)
def updateSystem(scheme, system, key = None):
instance = pybikes.getBikeShareSystem(scheme, system, key)
if instance.sync:
interval = 60
if (scheme == 'bcycle'):
interval = random.randint(180, 300)
proxify = scheme in proxify_list
print "Programming %s update interval at %d seconds" % (system, interval)
scheduler.schedule(
scheduled_time = datetime.now(),
func = syncStations,
args = (instance, False,False,proxify),
interval = interval,
repeat = None
)
else:
q_medium.enqueue_call(func = syncStations,
args = (instance, False, True),
timeout = 240)