Skip to content

Commit

Permalink
Fix mp zombie issue (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
tleonardi authored Jun 21, 2019
1 parent fff3d87 commit d64fe78
Showing 1 changed file with 29 additions and 17 deletions.
46 changes: 29 additions & 17 deletions nanocompore/SampComp.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,25 +236,26 @@ def __call__(self):
ps_list.append(mp.Process(target=self.__process_references, args=(in_q, out_q, error_q)))
ps_list.append(mp.Process(target=self.__write_output, args=(out_q, error_q)))

# Start processes and block until done
# Start processes and monitor error queue
try:
# Start all processes
for ps in ps_list:
ps.start()
# Monitor error queue
for tb in iter(error_q.get, None):
raise NanocomporeError(tb)
# Join processes
for ps in ps_list:
ps.join()

# Kill processes if any error
# Catch error and reraise it
except(BrokenPipeError, KeyboardInterrupt, NanocomporeError) as E:
logger.debug("An error occured. Killing all processes\n")
raise E

# Make sure all processes are killed
finally:
# Soft processes stopping
for ps in ps_list:
ps.join()

# Hard failsafe processes killing
for ps in ps_list:
if ps.exitcode == None:
ps.terminate()
Expand All @@ -274,13 +275,17 @@ def __list_refid(self, in_q, error_q):
logger.debug("Adding {} to in_q".format(ref_id))
in_q.put((ref_id, ref_dict))

# Deal 1 poison pill and close file pointer
logger.debug("Adding poison pill to in_q")
for i in range(self.__nthreads):
in_q.put(None)

# Manage exceptions and deal poison pills
except Exception:
error_q.put(traceback.format_exc())
finally:
logger.debug("Adding poison pill to in_q")
logger.debug("Error in Reader. Kill input queue")
for i in range(self.__nthreads):
in_q.put(None)
error_q.put(traceback.format_exc())

def __process_references(self, in_q, out_q, error_q):
"""
Expand Down Expand Up @@ -373,14 +378,19 @@ def __process_references(self, in_q, out_q, error_q):
logger.debug("Adding %s to out_q"%(ref_id))
out_q.put((ref_id, ref_pos_list))

# Manage exceptions, deal poison pills and close files
except Exception:
error_q.put(traceback.format_exc())
finally:
# Deal 1 poison pill and close file pointer
logger.debug("Adding poison pill to out_q")
out_q.put(None)
self.__eventalign_fn_close(fp_dict)

# Manage exceptions, deal poison pills and close files
except Exception:
logger.debug("Error in worker. Kill output queue")
for i in range(self.__nthreads):
out_q.put(None)
self.__eventalign_fn_close(fp_dict)
error_q.put(traceback.format_exc())

def __write_output(self, out_q, error_q):
# Get results out of the out queue and write in shelve
pvalue_tests = set()
Expand Down Expand Up @@ -417,13 +427,15 @@ def __write_output(self, out_q, error_q):
"min_coverage": self.__min_coverage,
"n_samples": self.__n_samples}

# Manage exceptions, deal poison pills and close files
except Exception:
error_q.put(traceback.format_exc())
finally:
# Ending process bar and deal poison pill in error queue
pbar.close()
error_q.put(None)

# Manage exceptions and add error trackback to error queue
except Exception:
pbar.close()
error_q.put(traceback.format_exc())

#~~~~~~~~~~~~~~PRIVATE HELPER METHODS~~~~~~~~~~~~~~#
def __check_eventalign_fn_dict(self, d):
""""""
Expand Down

0 comments on commit d64fe78

Please sign in to comment.