diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index fb3b117..5dabf18 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.0.32" +release_version = "0.0.33" diff --git a/pandacommon/pandamsgbkr/msg_processor.py b/pandacommon/pandamsgbkr/msg_processor.py index 2fb6737..b5d8b40 100644 --- a/pandacommon/pandamsgbkr/msg_processor.py +++ b/pandacommon/pandamsgbkr/msg_processor.py @@ -89,6 +89,12 @@ def process(self, msg_obj): """ raise NotImplementedError + def get_pid(self): + """ + get generic pid, including hostname, os process id, thread id + """ + return GenericThread().get_pid(current=True) + # muti-message processor plugin Base class MultiMsgProcPluginBase(object): @@ -106,7 +112,7 @@ class SimpleMsgProcThread(GenericThread): Thread of simple message processor of certain plugin """ - def __init__(self, attr_dict, sleep_time): + def __init__(self, attr_dict, sleep_time, thread_j=0): GenericThread.__init__(self) self.logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='SimpleMsgProcThread') self.__to_run = True @@ -114,6 +120,7 @@ def __init__(self, attr_dict, sleep_time): self.in_queue = attr_dict.get('in_queue') self.mb_sender_proxy = attr_dict.get('mb_sender_proxy') self.sleep_time = sleep_time + self.thread_j = thread_j self.verbose = attr_dict.get('verbose', False) def run(self): @@ -249,6 +256,7 @@ def _parse_config(self): 'enable': True, 'module': 'plugin.module', 'name': 'PluginClassName', + 'n_threads': 1, 'in_queue': 'Queue_1', 'out_queue': 'Queue_2', 'verbose': True, @@ -298,6 +306,7 @@ def _setup_instances(self): plugin = plugin_factory.get_plugin(pconf) # fill in thread attribute dict processor_attr_map[proc] = dict() + processor_attr_map[proc]['n_threads'] = pconf.get('n_threads', 1) processor_attr_map[proc]['in_queue'] = in_queue processor_attr_map[proc]['out_queue'] = out_queue processor_attr_map[proc]['plugin'] = plugin @@ -327,8 +336,14 @@ def _setup_instances(self): out_queue = processor_attr_map[proc]['out_queue'] if out_queue: processor_attr_map[proc]['mb_sender_proxy'] = mb_sender_proxy_dict[out_queue] + # fill processor list + self.init_processor_list = [] + for processor_name, attr_dict in processor_attr_map.items(): + n_threads = attr_dict['n_threads'] + for j in range(n_threads): + processor_id = (processor_name, j) + self.init_processor_list.append(processor_id) # set self attributes - self.init_processor_list = list(processor_attr_map.keys()) self.init_mb_listener_proxy_list = list(mb_listener_proxy_dict.values()) self.init_mb_sender_proxy_list = list(mb_sender_proxy_dict.values()) self.processor_attr_map = dict(processor_attr_map) @@ -418,18 +433,20 @@ def _spawn_processors(self, processor_list): """ tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_spawn_processors') tmp_logger.debug('start') - for processor_name in processor_list: + for processor_id in processor_list: try: + processor_name, thread_j = processor_id attr_dict = self.processor_attr_map[processor_name] - self.processor_thread_map[processor_name] = SimpleMsgProcThread(attr_dict, sleep_time=self.process_sleep_time) - mc_thread = self.processor_thread_map[processor_name] + self.processor_thread_map[processor_id] = SimpleMsgProcThread( + attr_dict, sleep_time=self.process_sleep_time, thread_j=thread_j) + mc_thread = self.processor_thread_map[processor_id] mc_thread.start() tmp_logger.info('spawned processors thread {0} with plugin={1} , in_q={2}, out_q={3}'.format( - processor_name, attr_dict['plugin'].__class__.__name__, + processor_id, attr_dict['plugin'].__class__.__name__, attr_dict['in_queue'], attr_dict['out_queue'])) except Exception as e: tmp_logger.error('failed to spawn processor thread {0} with plugin={1} , in_q={2}, out_q={3} ; {4}: {5} '.format( - processor_name, attr_dict['plugin'].__class__.__name__, + processor_id, attr_dict['plugin'].__class__.__name__, attr_dict['in_queue'], attr_dict['out_queue'], e.__class__.__name__, e)) tmp_logger.debug('done') @@ -439,23 +456,24 @@ def _kill_processors(self, processor_list, block=True): """ tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_kill_processors') tmp_logger.debug('start') - for processor_name in processor_list: + for processor_id in processor_list: try: - mc_thread = self.processor_thread_map.get(processor_name) + processor_name, thread_j = processor_id + mc_thread = self.processor_thread_map.get(processor_id) if mc_thread is None: - tmp_logger.debug('processor thread {0} does not exist. Skipped...'.format(processor_name)) + tmp_logger.debug('processor thread {0} does not exist. Skipped...'.format(processor_id)) elif not mc_thread.is_alive(): - tmp_logger.debug('processor thread {0} already stopped. Skipped...'.format(processor_name)) + tmp_logger.debug('processor thread {0} already stopped. Skipped...'.format(processor_id)) else: mc_thread.stop() - tmp_logger.info('signaled stop to processor thread {0}, block={1}'.format(processor_name, block)) + tmp_logger.info('signaled stop to processor thread {0}, block={1}'.format(processor_id, block)) if block: while mc_thread.is_alive(): time.sleep(0.125) - tmp_logger.info('processor thread {0} stopped'.format(processor_name)) + tmp_logger.info('processor thread {0} stopped'.format(processor_id)) except Exception as e: tmp_logger.error('failed to stop processor thread {0} ; {1}: {2} '.format( - processor_name, e.__class__.__name__, e)) + processor_id, e.__class__.__name__, e)) tmp_logger.debug('done') def initialize(self): diff --git a/pandacommon/pandautils/thread_utils.py b/pandacommon/pandautils/thread_utils.py index 2bbef93..b9b74e1 100644 --- a/pandacommon/pandautils/thread_utils.py +++ b/pandacommon/pandautils/thread_utils.py @@ -13,11 +13,16 @@ def __init__(self, **kwargs): self.hostname = socket.gethostname() self.os_pid = os.getpid() - def get_pid(self): + def get_pid(self, current=False): """ get host/process/thread identifier """ - thread_id = self.ident if self.ident else 0 + if current: + thread_id = threading.get_ident() + if not thread_id: + thread_id = 0 + else: + thread_id = self.ident if self.ident else 0 return '{0}_{1}-{2}'.format(self.hostname, self.os_pid, format(thread_id, 'x'))