forked from codelucas/newspaper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmthreading.py
133 lines (104 loc) · 3.98 KB
/
mthreading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# -*- coding: utf-8 -*-
"""
Anything that has to do with threading in this library
must be abstracted in this file. If we decide to do gevent
also, it will deserve its own gevent file.
"""
__title__ = 'newspaper'
__author__ = 'Lucas Ou-Yang'
__license__ = 'MIT'
__copyright__ = 'Copyright 2014, Lucas Ou-Yang'
import logging
import queue
import traceback
from threading import Thread
from .configuration import Configuration
log = logging.getLogger(__name__)
class ConcurrencyException(Exception):
pass
class Worker(Thread):
"""
Thread executing tasks from a given tasks queue.
"""
def __init__(self, tasks, timeout_seconds):
Thread.__init__(self)
self.tasks = tasks
self.timeout = timeout_seconds
self.daemon = True
self.start()
def run(self):
while True:
try:
func, args, kargs = self.tasks.get(timeout=self.timeout)
except queue.Empty:
# Extra thread allocated, no job, exit gracefully
break
try:
func(*args, **kargs)
except Exception:
traceback.print_exc()
self.tasks.task_done()
class ThreadPool:
def __init__(self, num_threads, timeout_seconds):
self.tasks = queue.Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks, timeout_seconds)
def add_task(self, func, *args, **kargs):
self.tasks.put((func, args, kargs))
def wait_completion(self):
self.tasks.join()
class NewsPool(object):
def __init__(self, config=None):
"""
Abstraction of a threadpool. A newspool can accept any number of
source OR article objects together in a list. It allocates one
thread to every source and then joins.
We allocate one thread per source to avoid rate limiting.
5 sources = 5 threads, one per source.
>>> import newspaper
>>> from newspaper import news_pool
>>> cnn_paper = newspaper.build('http://cnn.com')
>>> tc_paper = newspaper.build('http://techcrunch.com')
>>> espn_paper = newspaper.build('http://espn.com')
>>> papers = [cnn_paper, tc_paper, espn_paper]
>>> news_pool.set(papers)
>>> news_pool.join()
# All of your papers should have their articles html all populated now.
>>> cnn_paper.articles[50].html
u'<html>blahblah ... '
"""
self.pool = None
self.config = config or Configuration()
def join(self):
"""
Runs the mtheading and returns when all threads have joined
resets the task.
"""
if self.pool is None:
raise ConcurrencyException('Call set(..) with a list of source objects '
'before calling .join(..)')
self.pool.wait_completion()
self.pool = None
def set(self, news_list, threads_per_source=1, override_threads=None):
"""
news_list can be a list of `Article`, `Source`, or both.
If caller wants to decide how many threads to use, they can use
`override_threads` which takes precedence over all. Otherwise,
this api infers that if the input is all `Source` objects, to
allocate one thread per `Source` to not spam the host.
If both of the above conditions are not true, default to 1 thread.
"""
from .source import Source
if override_threads is not None:
num_threads = override_threads
elif all([isinstance(n, Source) for n in news_list]):
num_threads = threads_per_source * len(news_list)
else:
num_threads = 1
timeout = self.config.thread_timeout_seconds
self.pool = ThreadPool(num_threads, timeout)
for news_object in news_list:
if isinstance(news_object, Source):
self.pool.add_task(news_object.download_articles)
else:
self.pool.add_task(news_object.download)