1
- import copy
2
- import queue
3
- import threading
4
- import multiprocessing
5
- import concurrent .futures
6
- import time
1
+ import multiprocessing , concurrent .futures
2
+ import time , copy
7
3
4
+ # Sample callback function
8
5
def callback_sample (config , task_id : int , task_data ):
9
6
if task_id == 1 :
10
7
print ("Runing sample task function, please customize yours according to the actual usage." )
@@ -13,11 +10,13 @@ def callback_sample(config, task_id: int, task_data):
13
10
}
14
11
return result
15
12
13
+ # Sample result callback function
16
14
def result_callback_sample (config , task_id : int , task_result , log ):
17
15
if task_id == 1 :
18
16
print ("Runing sample result function, please customize yours according to the actual usage." )
19
17
return task_result
20
18
19
+ # Default configuration
21
20
default_config = {
22
21
'debug' : False ,
23
22
'task' : {
@@ -27,22 +26,16 @@ def result_callback_sample(config, task_id: int, task_result, log):
27
26
'result_callback' : False
28
27
},
29
28
'worker' : {
30
- 'number' : 1 ,
29
+ 'number' : multiprocessing . cpu_count () ,
31
30
'per_second' : 0 ,
32
- 'multiprocessing' : False ,
33
- },
34
- 'output' : {
35
- 'enabled' : False ,
36
- 'format' : 'csv' ,
37
- 'filepath' : './' ,
38
- 'filename' : 'output.csv' ,
39
- 'header' : [],
31
+ 'multiprocessing' : False
40
32
}
41
33
}
42
34
43
35
results = []
44
36
logs = []
45
37
38
+ # Start function
46
39
def start (userConfig : dict ):
47
40
48
41
global results , logs
@@ -64,10 +57,10 @@ def start(userConfig: dict):
64
57
in_child_process = (multiprocessing .current_process ().name != 'MainProcess' )
65
58
# Return False if is in worker process to let caller handle
66
59
if in_child_process :
67
- print ("Exit procedure due to the child process" )
60
+ # print("Exit procedure due to the child process")
68
61
return False
69
62
70
-
63
+ # Debug mode
71
64
if config ['debug' ]:
72
65
print ("Configuration:" )
73
66
print (config )
@@ -77,7 +70,6 @@ def start(userConfig: dict):
77
70
exit ("Callback function is invalied" )
78
71
79
72
# Task list to queue
80
- task_queue = multiprocessing .Queue () if use_multiprocessing else queue .Queue ()
81
73
task_list = []
82
74
user_task_list = config ['task' ]['list' ]
83
75
if isinstance (user_task_list , list ):
@@ -101,19 +93,22 @@ def start(userConfig: dict):
101
93
# Worker dispatch
102
94
worker_num = config ['worker' ]['number' ]
103
95
worker_num = worker_num if isinstance (worker_num , int ) else 1
96
+ worker_per_second = config ['worker' ]['per_second' ] if config ['worker' ]['per_second' ] else 0
97
+ max_workers = len (task_list ) if worker_per_second else worker_num
104
98
# result_queue = multiprocessing.Queue() if use_multiprocessing else queue.Queue()
105
99
pool_executor_class = concurrent .futures .ProcessPoolExecutor if use_multiprocessing else concurrent .futures .ThreadPoolExecutor
106
100
print ("Start to dispatch workers ---\n " )
107
101
108
102
# Pool Executor
109
- with pool_executor_class (max_workers = worker_num ) as executor :
103
+ with pool_executor_class (max_workers = max_workers ) as executor :
110
104
pool_results = []
105
+ # Task dispatch
111
106
for task in task_list :
112
107
pool_result = executor .submit (consume_task , task , config )
113
108
pool_results .append (pool_result )
114
109
# Worker per_second setting
115
- if config [ 'worker' ][ 'per_second' ] and task ['id' ] % worker_num == 0 :
116
- time .sleep (float (config [ 'worker' ][ 'per_second' ] ))
110
+ if worker_per_second and task ['id' ] % worker_num == 0 :
111
+ time .sleep (float (worker_per_second ))
117
112
# Get results from the async results
118
113
for pool_result in concurrent .futures .as_completed (pool_results ):
119
114
log = pool_result .result ()
@@ -127,20 +122,23 @@ def start(userConfig: dict):
127
122
print ("End of worker dispatch ---\n " )
128
123
return results
129
124
130
- def get_results ():
131
- return results
132
-
133
- def get_logs ():
134
- return logs
135
-
125
+ # Worker function
136
126
def consume_task (data , config ):
137
127
started_at = time .time ()
138
128
return_value = config ['task' ]['callback' ](config ['task' ]['config' ], data ['id' ], data ['task' ])
139
129
ended_at = time .time ()
130
+ duration = ended_at - started_at
140
131
log = {
141
132
'task_id' : data ['id' ],
142
133
'started_at' : started_at ,
143
134
'ended_at' : ended_at ,
135
+ 'duration' : duration ,
144
136
'result' : return_value
145
137
}
146
- return log
138
+ return log
139
+
140
+ def get_results ():
141
+ return results
142
+
143
+ def get_logs ():
144
+ return logs
0 commit comments