-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathwest_bengal.py
358 lines (294 loc) · 13.2 KB
/
west_bengal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
import requests
from requests.adapters import HTTPAdapter
from scrapy import Selector
import csv
import os
import threading
# --------------------define variables-------------------
OUTPUT_FILE = 'west_bengal_final.csv'
DRAFT_ROLLS_FOLDER = 'wb_pdfs/draft_rolls/'
SUPPLEMENTS_FOLDER = 'wb_pdfs/supplements/'
class WBScraper:
def __init__(self,
base_url='http://wbceo.in/'
):
# define session object
self.session = requests.Session()
self.session.mount('https://', HTTPAdapter(max_retries=4))
# set proxy
# self.session.proxies.update({'http': 'http://127.0.0.1:40328'})
# define urls
self.base_url = base_url
self.form_data = {
'__EVENTVALIDATION': '',
'__VIEWSTATE': '',
'__VIEWSTATEENCRYPTED': ''
}
def GetDistrictList(self):
# set url
url = 'http://wbceo.in/DistrictList.aspx'
# get request
ret = self.session.get(url)
if ret.status_code == 200:
# get district list
hrefs = Selector(text=ret.text).xpath('//table[@class="dataTable"]/tbody/tr/td/a').extract()
district_list = []
for href in hrefs:
district = {
'href': self.base_url + Selector(text=href).xpath('//@href').extract()[0],
'name': str(Selector(text=href).xpath('//text()').extract()[0]).split(
'\r\n ')[1]
}
district_list.append(district)
return district_list
else:
print('fail to get district list')
def GetACList(self, url):
# set url
url = url
# get request
ret = self.session.get(url)
if ret.status_code == 200:
# get ac list
trs = Selector(text=ret.text).xpath('//table[@class="dataTable"]/tbody/tr').extract()
ac_list = []
for tr in trs:
ac = {
'no': str(Selector(text=tr).xpath('//td[1]/text()').extract()[0]).split(
'\r\n ')[1].split('\r\n')[0],
'href': self.base_url + Selector(text=tr).xpath('//td[2]/a/@href').extract()[0],
'name': str(Selector(text=tr).xpath('//td[2]/a/text()').extract()[0]).split(
'\r\n ')[1]
}
ac_list.append(ac)
return ac_list
else:
print('fail to get ac list')
def GetPSList(self, url, page):
# set url
url = url
# set params
if page > 1:
params = {
'__EVENTARGUMENT': 'Page$%s' % (page),
'__EVENTTARGET': 'ctl00$ContentPlaceHolder1$gvPs',
'__EVENTVALIDATION': self.form_data['__EVENTVALIDATION'],
'__VIEWSTATE': self.form_data['__VIEWSTATE'],
'__VIEWSTATEENCRYPTED': self.form_data['__VIEWSTATEENCRYPTED'],
'ctl00$TopMenu$srch': 'Search'
}
else:
params = {}
# get request
ret = self.session.post(url, data=params)
if ret.status_code == 200:
# get ps form data
self.form_data = {
'__EVENTVALIDATION': Selector(text=ret.text).xpath('//input[@id="__EVENTVALIDATION"]/@value').extract()[
0],
'__VIEWSTATE': Selector(text=ret.text).xpath('//input[@id="__VIEWSTATE"]/@value').extract()[0],
'__VIEWSTATEENCRYPTED':
Selector(text=ret.text).xpath('//input[@id="__VIEWSTATEENCRYPTED"]/@value').extract()[0]
}
# get ps list
trs = Selector(text=ret.text).xpath('//table[@class="mGrid"]/tr').extract()
ps_list = []
for idx in range(1, len(trs) - 1):
tr = trs[idx]
ps = {
'no': Selector(text=tr).xpath('//td[1]/span/text()').extract()[0],
'name': Selector(text=tr).xpath('//td[2]/span/text()').extract()[0],
'draft_roll_href': self.base_url + Selector(text=tr).xpath('//td[3]/a/@href').extract()[0],
'supplement_href': self.base_url + Selector(text=tr).xpath('//td[4]/a/@href').extract()[0]
}
ps_list.append(ps)
return ps_list
else:
print('fail to get ps list')
def GetCaptchaTextFromImage(self, image_url):
from python_anticaptcha import AnticaptchaClient, ImageToTextTask
import requests
from io import BytesIO
try:
import Image
except ImportError:
from PIL import Image
api_key = 'API KEY'
ret = self.session.get(image_url)
captcha_fp = BytesIO(ret.content)
client = AnticaptchaClient(api_key)
task = ImageToTextTask(captcha_fp)
job = client.createTask(task)
job.join()
return job.get_captcha_text()
def GetCaptchaImageUrl(self, url):
# set url
url = url
# get request
ret = self.session.get(url)
if ret.status_code == 200:
return ('http://wbceo.in/Capcha.ashx')
else:
print('fail to get captcha image url')
def WriteHeader(self):
# set headers
header_info = [
'district_name',
'ac_no',
'ac_name',
'part_no',
'polling_station_name',
'filename'#,
# 'download_url'
]
# write header into output csv file
writer = csv.writer(open(OUTPUT_FILE, 'w'), delimiter=',', lineterminator='\n')
writer.writerow(header_info)
def WriteData(self, data):
# write data into output csv file
writer = csv.writer(open(OUTPUT_FILE, 'a', encoding='utf-8'), delimiter=',', lineterminator='\n')
writer.writerow(data)
def DownloadPdfFile(self, download_url, filename):
print('downloading %s' % (download_url))
if os.path.isfile(filename) == True:
print('this file already downloaded.')
return 'ok'
# set url
url = download_url
# get request
ret = self.session.get(url, stream=True)
if ret.status_code == 200:
with open(filename, 'wb') as f:
f.write(ret.content)
print('success to download %s' % (filename))
return 'ok'
else:
print('fail to get pdf file: %s' % (download_url))
return 'fail'
def DownloadPdfFileThread(self, download_url_list, filename):
for download_url in download_url_list:
if self.DownloadPdfFile(download_url, filename) == 'ok':
break
def Start(self,
start_district='',
start_ac='',
start_ps=''):
# write header into output csv file
if start_district == '' and start_ac == '' and start_ps == '': self.WriteHeader()
# get district list
print('getting district list...')
district_list = self.GetDistrictList()
print(district_list)
district_flag = False
if start_district == '': district_flag = True
ac_flag = False
if start_ac == '': ac_flag = True
ps_flag = False
if start_ps == '': ps_flag = True
for district in district_list:
if start_district == district['name']: district_flag = True
if district_flag == False: continue
# get ac list
print('getting ac list for %s...' % (district['name']))
ac_list = self.GetACList(district['href'])
print(ac_list)
for ac in ac_list:
if start_ac == ac['no']: ac_flag = True
if ac_flag == False: continue
page = 1
while (True):
# get ps list
print('getting ac list for %s:%s page:%s...' % (district['name'], ac['name'], page))
ps_list = self.GetPSList(ac['href'], page)
print(ps_list)
if ps_list == None or len(ps_list) <= 0: break
page += 1
for ps in ps_list:
if start_ps == ps['no']: ps_flag = True
if ps_flag == False: continue
# # get captcha image url for draft_roll
# print('getting captcha image url for draft_roll...')
# captcha_image_url = self.GetCaptchaImageUrl(ps['draft_roll_href'])
# print(captcha_image_url)
#
# # get captcha text from image for draft_roll
# print('getting captcha text from image for draft_roll...')
# captcha_text = self.GetCaptchaTextFromImage(captcha_image_url)
# print(captcha_text)
# process draft roll
draft_roll_pdf_name = 'a%s%s.pdf' % (
str('{0:03d}'.format(int(ac['no']))),
str('{0:04d}'.format(int(ps['no']))),
)
draft_roll_pdf_url = 'http://wbceo.in/EROLLS/PDF/Bengali/A%s/%s' % (
str('{0:03d}'.format(int(ac['no']))),
draft_roll_pdf_name
)
draft_roll_pdf_url1 = 'http://wbceo.in/EROLLS/PDF/Hindi/A%s/%s' % (
str('{0:03d}'.format(int(ac['no']))),
draft_roll_pdf_name
)
draft_roll_pdf_url2 = 'http://wbceo.in/EROLLS/PDF/English/A%s/%s' % (
str('{0:03d}'.format(int(ac['no']))),
draft_roll_pdf_name
)
# self.DownloadPdfFile(draft_roll_pdf_url, DRAFT_ROLLS_FOLDER + draft_roll_pdf_name)
self.DownloadPdfFileThread([draft_roll_pdf_url, draft_roll_pdf_url1, draft_roll_pdf_url2], DRAFT_ROLLS_FOLDER + draft_roll_pdf_name)
# # run get download pdf file thread
# threading.Timer(1, self.DownloadPdfFileThread, [[draft_roll_pdf_url, draft_roll_pdf_url1], DRAFT_ROLLS_FOLDER + draft_roll_pdf_name]).start()
# write data into output csv file
data = []
data.append(district['name'])
data.append(ac['no'])
data.append(ac['name'])
data.append(ps['no'])
data.append(ps['name'])
data.append(DRAFT_ROLLS_FOLDER + draft_roll_pdf_name)
# data.append(draft_roll_pdf_url)
self.WriteData(data)
# process supplements
supplements_pdf_name = 'a%s%s.pdf' % (
str('{0:03d}'.format(int(ac['no']))),
str('{0:04d}'.format(int(ps['no']))),
)
supplements_pdf_url = 'http://wbceo.in/EROLLS/sup02/PDF/Bengali/A%s/%s' % (
str('{0:03d}'.format(int(ac['no']))),
supplements_pdf_name
)
supplements_pdf_url1 = 'http://wbceo.in/EROLLS/sup02/PDF/Hindi/A%s/%s' % (
str('{0:03d}'.format(int(ac['no']))),
supplements_pdf_name
)
supplements_pdf_url2 = 'http://wbceo.in/EROLLS/sup02/PDF/English/A%s/%s' % (
str('{0:03d}'.format(int(ac['no']))),
supplements_pdf_name
)
# self.DownloadPdfFile(supplements_pdf_url, SUPPLEMENTS_FOLDER + supplements_pdf_name)
self.DownloadPdfFileThread([supplements_pdf_url, supplements_pdf_url1, supplements_pdf_url2], SUPPLEMENTS_FOLDER + supplements_pdf_name)
# # run get download pdf file thread
# threading.Timer(1, self.DownloadPdfFileThread, [[supplements_pdf_url, supplements_pdf_url1], SUPPLEMENTS_FOLDER + supplements_pdf_name]).start()
# write data into output csv file
data = []
data.append(district['name'])
data.append(ac['no'])
data.append(ac['name'])
data.append(ps['no'])
data.append(ps['name'])
data.append(SUPPLEMENTS_FOLDER + supplements_pdf_name)
# data.append(supplements_pdf_url)
self.WriteData(data)
# break
# break
# break
# break
def main():
# create scraper object
scraper = WBScraper()
# start to scrape
scraper.Start(
start_district='NORTH 24 PARGANAS',
start_ac='102',
start_ps='225'
)
if __name__ == '__main__':
main()