Skip to content

Commit

Permalink
Merge pull request #23 from PanDAWMS/flin
Browse files Browse the repository at this point in the history
msg processor: support multithreaded processor
  • Loading branch information
mightqxc authored Feb 10, 2023
2 parents 51dbb00 + 39ffe22 commit 204a3cc
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 17 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.0.32"
release_version = "0.0.33"
46 changes: 32 additions & 14 deletions pandacommon/pandamsgbkr/msg_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ def process(self, msg_obj):
"""
raise NotImplementedError

def get_pid(self):
"""
get generic pid, including hostname, os process id, thread id
"""
return GenericThread().get_pid(current=True)


# muti-message processor plugin Base
class MultiMsgProcPluginBase(object):
Expand All @@ -106,14 +112,15 @@ class SimpleMsgProcThread(GenericThread):
Thread of simple message processor of certain plugin
"""

def __init__(self, attr_dict, sleep_time):
def __init__(self, attr_dict, sleep_time, thread_j=0):
GenericThread.__init__(self)
self.logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='SimpleMsgProcThread')
self.__to_run = True
self.plugin = attr_dict['plugin']
self.in_queue = attr_dict.get('in_queue')
self.mb_sender_proxy = attr_dict.get('mb_sender_proxy')
self.sleep_time = sleep_time
self.thread_j = thread_j
self.verbose = attr_dict.get('verbose', False)

def run(self):
Expand Down Expand Up @@ -249,6 +256,7 @@ def _parse_config(self):
'enable': True,
'module': 'plugin.module',
'name': 'PluginClassName',
'n_threads': 1,
'in_queue': 'Queue_1',
'out_queue': 'Queue_2',
'verbose': True,
Expand Down Expand Up @@ -298,6 +306,7 @@ def _setup_instances(self):
plugin = plugin_factory.get_plugin(pconf)
# fill in thread attribute dict
processor_attr_map[proc] = dict()
processor_attr_map[proc]['n_threads'] = pconf.get('n_threads', 1)
processor_attr_map[proc]['in_queue'] = in_queue
processor_attr_map[proc]['out_queue'] = out_queue
processor_attr_map[proc]['plugin'] = plugin
Expand Down Expand Up @@ -327,8 +336,14 @@ def _setup_instances(self):
out_queue = processor_attr_map[proc]['out_queue']
if out_queue:
processor_attr_map[proc]['mb_sender_proxy'] = mb_sender_proxy_dict[out_queue]
# fill processor list
self.init_processor_list = []
for processor_name, attr_dict in processor_attr_map.items():
n_threads = attr_dict['n_threads']
for j in range(n_threads):
processor_id = (processor_name, j)
self.init_processor_list.append(processor_id)
# set self attributes
self.init_processor_list = list(processor_attr_map.keys())
self.init_mb_listener_proxy_list = list(mb_listener_proxy_dict.values())
self.init_mb_sender_proxy_list = list(mb_sender_proxy_dict.values())
self.processor_attr_map = dict(processor_attr_map)
Expand Down Expand Up @@ -418,18 +433,20 @@ def _spawn_processors(self, processor_list):
"""
tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_spawn_processors')
tmp_logger.debug('start')
for processor_name in processor_list:
for processor_id in processor_list:
try:
processor_name, thread_j = processor_id
attr_dict = self.processor_attr_map[processor_name]
self.processor_thread_map[processor_name] = SimpleMsgProcThread(attr_dict, sleep_time=self.process_sleep_time)
mc_thread = self.processor_thread_map[processor_name]
self.processor_thread_map[processor_id] = SimpleMsgProcThread(
attr_dict, sleep_time=self.process_sleep_time, thread_j=thread_j)
mc_thread = self.processor_thread_map[processor_id]
mc_thread.start()
tmp_logger.info('spawned processors thread {0} with plugin={1} , in_q={2}, out_q={3}'.format(
processor_name, attr_dict['plugin'].__class__.__name__,
processor_id, attr_dict['plugin'].__class__.__name__,
attr_dict['in_queue'], attr_dict['out_queue']))
except Exception as e:
tmp_logger.error('failed to spawn processor thread {0} with plugin={1} , in_q={2}, out_q={3} ; {4}: {5} '.format(
processor_name, attr_dict['plugin'].__class__.__name__,
processor_id, attr_dict['plugin'].__class__.__name__,
attr_dict['in_queue'], attr_dict['out_queue'], e.__class__.__name__, e))
tmp_logger.debug('done')

Expand All @@ -439,23 +456,24 @@ def _kill_processors(self, processor_list, block=True):
"""
tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_kill_processors')
tmp_logger.debug('start')
for processor_name in processor_list:
for processor_id in processor_list:
try:
mc_thread = self.processor_thread_map.get(processor_name)
processor_name, thread_j = processor_id
mc_thread = self.processor_thread_map.get(processor_id)
if mc_thread is None:
tmp_logger.debug('processor thread {0} does not exist. Skipped...'.format(processor_name))
tmp_logger.debug('processor thread {0} does not exist. Skipped...'.format(processor_id))
elif not mc_thread.is_alive():
tmp_logger.debug('processor thread {0} already stopped. Skipped...'.format(processor_name))
tmp_logger.debug('processor thread {0} already stopped. Skipped...'.format(processor_id))
else:
mc_thread.stop()
tmp_logger.info('signaled stop to processor thread {0}, block={1}'.format(processor_name, block))
tmp_logger.info('signaled stop to processor thread {0}, block={1}'.format(processor_id, block))
if block:
while mc_thread.is_alive():
time.sleep(0.125)
tmp_logger.info('processor thread {0} stopped'.format(processor_name))
tmp_logger.info('processor thread {0} stopped'.format(processor_id))
except Exception as e:
tmp_logger.error('failed to stop processor thread {0} ; {1}: {2} '.format(
processor_name, e.__class__.__name__, e))
processor_id, e.__class__.__name__, e))
tmp_logger.debug('done')

def initialize(self):
Expand Down
9 changes: 7 additions & 2 deletions pandacommon/pandautils/thread_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ def __init__(self, **kwargs):
self.hostname = socket.gethostname()
self.os_pid = os.getpid()

def get_pid(self):
def get_pid(self, current=False):
"""
get host/process/thread identifier
"""
thread_id = self.ident if self.ident else 0
if current:
thread_id = threading.get_ident()
if not thread_id:
thread_id = 0
else:
thread_id = self.ident if self.ident else 0
return '{0}_{1}-{2}'.format(self.hostname, self.os_pid, format(thread_id, 'x'))


Expand Down

0 comments on commit 204a3cc

Please sign in to comment.