From 23084a4842843bc930ca8ae9e536fcae18098173 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Thu, 9 Feb 2023 17:57:05 +0100 Subject: [PATCH 1/3] msg processor: support multithreaded processor --- pandacommon/pandamsgbkr/msg_processor.py | 45 ++++++++++++++++-------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/pandacommon/pandamsgbkr/msg_processor.py b/pandacommon/pandamsgbkr/msg_processor.py index 2fb6737..88e9f3a 100644 --- a/pandacommon/pandamsgbkr/msg_processor.py +++ b/pandacommon/pandamsgbkr/msg_processor.py @@ -89,6 +89,12 @@ def process(self, msg_obj): """ raise NotImplementedError + def get_pid(self): + """ + get generic pid, including hostname, os process id, thread id + """ + return GenericThread().get_pid() + # muti-message processor plugin Base class MultiMsgProcPluginBase(object): @@ -106,7 +112,7 @@ class SimpleMsgProcThread(GenericThread): Thread of simple message processor of certain plugin """ - def __init__(self, attr_dict, sleep_time): + def __init__(self, attr_dict, sleep_time, thread_j=0): GenericThread.__init__(self) self.logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='SimpleMsgProcThread') self.__to_run = True @@ -114,6 +120,7 @@ def __init__(self, attr_dict, sleep_time): self.in_queue = attr_dict.get('in_queue') self.mb_sender_proxy = attr_dict.get('mb_sender_proxy') self.sleep_time = sleep_time + self.thread_j = thread_j self.verbose = attr_dict.get('verbose', False) def run(self): @@ -249,6 +256,7 @@ def _parse_config(self): 'enable': True, 'module': 'plugin.module', 'name': 'PluginClassName', + 'n_threads': 1, 'in_queue': 'Queue_1', 'out_queue': 'Queue_2', 'verbose': True, @@ -327,8 +335,14 @@ def _setup_instances(self): out_queue = processor_attr_map[proc]['out_queue'] if out_queue: processor_attr_map[proc]['mb_sender_proxy'] = mb_sender_proxy_dict[out_queue] + # fill processor list + self.init_processor_list = [] + for processor_name, attr_dict in processor_attr_map.items(): + n_threads = attr_dict.get('n_threads', 1) + for j in range(n_threads): + processor_id = (processor_name, j) + self.init_processor_list.append(processor_id) # set self attributes - self.init_processor_list = list(processor_attr_map.keys()) self.init_mb_listener_proxy_list = list(mb_listener_proxy_dict.values()) self.init_mb_sender_proxy_list = list(mb_sender_proxy_dict.values()) self.processor_attr_map = dict(processor_attr_map) @@ -418,18 +432,20 @@ def _spawn_processors(self, processor_list): """ tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_spawn_processors') tmp_logger.debug('start') - for processor_name in processor_list: + for processor_id in processor_list: try: + processor_name, thread_j = processor_id attr_dict = self.processor_attr_map[processor_name] - self.processor_thread_map[processor_name] = SimpleMsgProcThread(attr_dict, sleep_time=self.process_sleep_time) - mc_thread = self.processor_thread_map[processor_name] + self.processor_thread_map[processor_id] = SimpleMsgProcThread( + attr_dict, sleep_time=self.process_sleep_time, thread_j=thread_j) + mc_thread = self.processor_thread_map[processor_id] mc_thread.start() tmp_logger.info('spawned processors thread {0} with plugin={1} , in_q={2}, out_q={3}'.format( - processor_name, attr_dict['plugin'].__class__.__name__, + processor_id, attr_dict['plugin'].__class__.__name__, attr_dict['in_queue'], attr_dict['out_queue'])) except Exception as e: tmp_logger.error('failed to spawn processor thread {0} with plugin={1} , in_q={2}, out_q={3} ; {4}: {5} '.format( - processor_name, attr_dict['plugin'].__class__.__name__, + processor_id, attr_dict['plugin'].__class__.__name__, attr_dict['in_queue'], attr_dict['out_queue'], e.__class__.__name__, e)) tmp_logger.debug('done') @@ -439,23 +455,24 @@ def _kill_processors(self, processor_list, block=True): """ tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_kill_processors') tmp_logger.debug('start') - for processor_name in processor_list: + for processor_id in processor_list: try: - mc_thread = self.processor_thread_map.get(processor_name) + processor_name, thread_j = processor_id + mc_thread = self.processor_thread_map.get(processor_id) if mc_thread is None: - tmp_logger.debug('processor thread {0} does not exist. Skipped...'.format(processor_name)) + tmp_logger.debug('processor thread {0} does not exist. Skipped...'.format(processor_id)) elif not mc_thread.is_alive(): - tmp_logger.debug('processor thread {0} already stopped. Skipped...'.format(processor_name)) + tmp_logger.debug('processor thread {0} already stopped. Skipped...'.format(processor_id)) else: mc_thread.stop() - tmp_logger.info('signaled stop to processor thread {0}, block={1}'.format(processor_name, block)) + tmp_logger.info('signaled stop to processor thread {0}, block={1}'.format(processor_id, block)) if block: while mc_thread.is_alive(): time.sleep(0.125) - tmp_logger.info('processor thread {0} stopped'.format(processor_name)) + tmp_logger.info('processor thread {0} stopped'.format(processor_id)) except Exception as e: tmp_logger.error('failed to stop processor thread {0} ; {1}: {2} '.format( - processor_name, e.__class__.__name__, e)) + processor_id, e.__class__.__name__, e)) tmp_logger.debug('done') def initialize(self): From 5ce73a45760e580775f928a7968515e9523efa42 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Thu, 9 Feb 2023 19:05:53 +0100 Subject: [PATCH 2/3] msg processor: get thread id in non-thread instance --- pandacommon/pandamsgbkr/msg_processor.py | 5 +++-- pandacommon/pandautils/thread_utils.py | 9 +++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pandacommon/pandamsgbkr/msg_processor.py b/pandacommon/pandamsgbkr/msg_processor.py index 88e9f3a..b5d8b40 100644 --- a/pandacommon/pandamsgbkr/msg_processor.py +++ b/pandacommon/pandamsgbkr/msg_processor.py @@ -93,7 +93,7 @@ def get_pid(self): """ get generic pid, including hostname, os process id, thread id """ - return GenericThread().get_pid() + return GenericThread().get_pid(current=True) # muti-message processor plugin Base @@ -306,6 +306,7 @@ def _setup_instances(self): plugin = plugin_factory.get_plugin(pconf) # fill in thread attribute dict processor_attr_map[proc] = dict() + processor_attr_map[proc]['n_threads'] = pconf.get('n_threads', 1) processor_attr_map[proc]['in_queue'] = in_queue processor_attr_map[proc]['out_queue'] = out_queue processor_attr_map[proc]['plugin'] = plugin @@ -338,7 +339,7 @@ def _setup_instances(self): # fill processor list self.init_processor_list = [] for processor_name, attr_dict in processor_attr_map.items(): - n_threads = attr_dict.get('n_threads', 1) + n_threads = attr_dict['n_threads'] for j in range(n_threads): processor_id = (processor_name, j) self.init_processor_list.append(processor_id) diff --git a/pandacommon/pandautils/thread_utils.py b/pandacommon/pandautils/thread_utils.py index 2bbef93..b9b74e1 100644 --- a/pandacommon/pandautils/thread_utils.py +++ b/pandacommon/pandautils/thread_utils.py @@ -13,11 +13,16 @@ def __init__(self, **kwargs): self.hostname = socket.gethostname() self.os_pid = os.getpid() - def get_pid(self): + def get_pid(self, current=False): """ get host/process/thread identifier """ - thread_id = self.ident if self.ident else 0 + if current: + thread_id = threading.get_ident() + if not thread_id: + thread_id = 0 + else: + thread_id = self.ident if self.ident else 0 return '{0}_{1}-{2}'.format(self.hostname, self.os_pid, format(thread_id, 'x')) From 39ffe22b0b1a227ce6ffdfd021cc63cff788d341 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Fri, 10 Feb 2023 09:49:45 +0100 Subject: [PATCH 3/3] v0.0.33 --- PandaPkgInfo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index fb3b117..5dabf18 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.0.32" +release_version = "0.0.33"