-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcoroutines03.py
106 lines (83 loc) · 2.9 KB
/
coroutines03.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""重构coroutines02"""
import socket
import logging
from selectors import EVENT_READ, EVENT_WRITE
from coroutines02 import Future, selector, Task
logging.basicConfig(level=logging.DEBUG,
format='levelname: %(levelname)s output msg: %(message)s')
left_tasks = 10
def connect(sock, address):
f = Future()
sock.setblocking(False)
try:
sock.connect(address)
except BlockingIOError:
pass
def on_connected():
f.set_result(None)
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield f
selector.unregister(sock.fileno())
def read(sock):
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield f
selector.unregister(sock.fileno())
return chunk
def read_all(sock, id):
"""对于原流程只是一种改写,不过写两个yield from read(sock)有必要么,还不如之前那种呢
response = []
while True:
chunk = yield from read(sock)
logging.debug('执行任务{}的写入操作'.format(id))
if chunk:
response.append(chunk)
else:
break
return b''.join(response)
:param sock: socket连接
:param id: 任务的id,要考虑下函数独立出来之后log怎么打,难道只能传id么??
:return: 响应结果(bytes)
"""
response = []
chunk = yield from read(sock)
logging.debug('执行任务{}的写入操作'.format(id))
while chunk:
response.append(chunk)
chunk = yield from read(sock)
logging.debug('执行任务{}的写入操作'.format(id))
return b''.join(response)
class Crawler:
def __init__(self, crawler_id):
self.id = crawler_id
self.response = b''
def fetch(self):
global left_tasks
sock = socket.socket()
yield from connect(sock, ('example.com', 80))
logging.debug('执行任务{}的连接操作'.format(self.id))
get = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(get.encode('ascii'))
self.response = yield from read_all(sock, self.id)
left_tasks -= 1
def loop():
while left_tasks:
# 获取所有处于完成事件
# 根据api,select方法timeout参数为null时,会阻塞,直到一个file ready
# 由于其原理不是pop那样,所以必须在处理过程当中unregister处理完的任务
events = selector.select()
# event_mask is a bitmask of events ready on this file object
# 不清楚这个bitmask是什么
for event_key, event_mask in events:
callback = event_key.data
callback()
if __name__ == '__main__':
import time
start = time.time()
for i in range(left_tasks):
crawler = Crawler(i + 1)
Task(crawler.fetch())
loop()
print('{:.2f}'.format(time.time() - start))