Skip to content

Commit faf2994

Browse files
author
dPys
committed
[FIX] Prevent premature watchdog timeouts
1 parent 2d1bfec commit faf2994

File tree

7 files changed

+1224
-57
lines changed

7 files changed

+1224
-57
lines changed

Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ RUN apt-get update -qq \
8686
&& rm -r fsl* \
8787
&& chmod 777 -R $FSLDIR/bin \
8888
&& chmod 777 -R /usr/lib/fsl/5.0 \
89-
&& echo "tmpfs /tmp tmpfs rw,nodev,nosuid,size=10G 0 0" >> /etc/fstab
89+
&& echo "tmpfs /tmp tmpfs rw,nodev,nosuid,size=10G 0 0" >> /etc/fstab \
90+
&& echo "GRUB_CMDLINE_LINUX_DEFAULT="rootflags=uquota,pquota"" >> /etc/default/grub
9091
# && wget --retry-connrefused --waitretry=5 --read-timeout=60 --timeout=60 -t 0 -q -O examples.tar.gz "https://osf.io/ye4vf/download" && tar -xvzf examples.tar.gz -C /tmp \
9192
# && rm -rf examples.tar.gz
9293

pynets/cli/pynets_collect.py

Lines changed: 1187 additions & 20 deletions
Large diffs are not rendered by default.

pynets/cli/pynets_run.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3515,6 +3515,7 @@ def main():
35153515
"PyNets not installed! Ensure that you are referencing the correct"
35163516
" site-packages and using Python3.6+"
35173517
)
3518+
sys.exit(1)
35183519

35193520
if len(sys.argv) < 1:
35203521
print("\nMissing command-line inputs! See help options with the -h"
@@ -3541,11 +3542,9 @@ def main():
35413542
run_uuid = retval.get("run_uuid", None)
35423543

35433544
retcode = retcode or int(pynets_wf is None)
3544-
if retcode != 0:
3545-
sys.exit(retcode)
3545+
if retcode == 1:
3546+
return retcode
35463547

3547-
# Clean up master process before running workflow, which may create
3548-
# forks
35493548
gc.collect()
35503549

35513550
mgr.shutdown()
@@ -3555,7 +3554,7 @@ def main():
35553554

35563555
rmtree(work_dir, ignore_errors=True)
35573556

3558-
sys.exit(0)
3557+
return 0
35593558

35603559

35613560
if __name__ == "__main__":

pynets/core/utils.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1973,16 +1973,15 @@ def run(self):
19731973
watchdog_thread.join()
19741974
return 0
19751975

1976-
def _watchdog(self):
1977-
WATCHDOG_HARD_KILL_TIMEOUT = 7200
1976+
def _watchdog(self, watchdog_timeout=10800):
19781977

19791978
self.last_progress_time = time.time()
19801979

19811980
while self.last_progress_time == time.time():
19821981
if self.shutdown.wait(timeout=5):
19831982
return
19841983
last_progress_delay = time.time() - self.last_progress_time
1985-
if last_progress_delay < WATCHDOG_HARD_KILL_TIMEOUT:
1984+
if last_progress_delay < watchdog_timeout:
19861985
continue
19871986
try:
19881987
signal.signal(signal.SIGQUIT, dumpstacks)

pynets/dmri/track.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def reconstruction(conn_model, gtab, dwi_data, B0_mask):
6969
)
7070
except ValueError:
7171
import sys
72-
sys.exit(0)
72+
sys.exit(1)
7373

7474
del dwi_data
7575

@@ -191,7 +191,7 @@ def prep_tissues(
191191
raise ValueError("Tissue classifier cannot be none.")
192192
except ValueError:
193193
import sys
194-
sys.exit(0)
194+
sys.exit(1)
195195

196196
del gm_data, wm_data, vent_csf_in_dwi_data
197197

@@ -482,8 +482,7 @@ def track_ensemble(
482482
while float(stream_counter) < float(target_samples) and float(ix) < 0.50*float(len(all_combs)):
483483
with Parallel(n_jobs=nthreads, backend='loky',
484484
mmap_mode='r+', temp_folder=joblib_dir,
485-
verbose=0, max_nbytes='50000M',
486-
timeout=timeout) as parallel:
485+
verbose=0, timeout=timeout) as parallel:
487486
out_streams = parallel(
488487
delayed(run_tracking)(
489488
i, recon_path, n_seeds_per_iter, directget, maxcrossing,
@@ -502,14 +501,15 @@ def track_ensemble(
502501

503502
if len(out_streams) < min_streams:
504503
ix += 2
505-
print(f"Fewer than {min_streams} streamlines tracked on last"
506-
f" iteration. Loosening tolerance and anatomical"
507-
f" constraints. Check {tissues4d} or {recon_path}"
508-
f" for errors...")
509-
if track_type != 'particle':
510-
tiss_class = 'wb'
504+
print(f"Fewer than {min_streams} streamlines tracked "
505+
f"on last iteration with cache directory: "
506+
f"{cache_dir}. Loosening tolerance and "
507+
f"anatomical constraints. Check {tissues4d} or "
508+
f"{recon_path} for errors...")
509+
# if track_type != 'particle':
510+
# tiss_class = 'wb'
511511
roi_neighborhood_tol = float(roi_neighborhood_tol) * 1.25
512-
min_length = float(min_length) * 0.9875
512+
# min_length = float(min_length) * 0.9875
513513
continue
514514
else:
515515
ix -= 1
@@ -730,7 +730,7 @@ def run_tracking(step_curv_combinations, recon_path,
730730
"ERROR: No valid tracking method(s) specified.")
731731
except ValueError:
732732
import sys
733-
sys.exit(0)
733+
sys.exit(1)
734734

735735
# Filter resulting streamlines by those that stay entirely
736736
# inside the brain

pynets/runconfig.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,15 +209,15 @@ resource_dict: # Nipype workflow resource settings
209209
- 'get_fa_node':
210210
- (2, 1)
211211
- 'run_tracking_node':
212-
- (3, 8)
212+
- (3, 10)
213213
- 'thresh_diff_node':
214214
- (1, 1.5)
215215
- 'dsn_node':
216216
- (1, 2)
217217
- 'plot_all_node':
218218
- (1, 2)
219219
- 'streams2graph_node':
220-
- (3, 4)
220+
- (3, 6)
221221
- 'build_multigraphs_node':
222222
- (2, 8)
223223
- 'plot_all_struct_func_node':

pynets/stats/benchmarking.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -449,19 +449,19 @@ def benchmark_reproducibility(comb, modality, alg, par_dict, disc,
449449
with open(label_file, 'r+') as f:
450450
node_dict = json.load(f)
451451
indices = [i['index'] for i in
452-
node_dict.values()]
452+
node_dict]
453453
if indices == ixs:
454454
coords = [i['coord'] for i in
455-
node_dict.values()]
455+
node_dict]
456456

457457
df_coords = pd.DataFrame(
458458
[str(tuple(x)) for x in
459459
coords]).T
460460
df_coords.columns = [f"rsn-{comb_tuple[0]}_res-{comb_tuple[-2]}_{i}" for i in ixs]
461461
labels = [
462-
list(i['label'].values())[7] for i
462+
list(i['label'])[7] for i
463463
in
464-
node_dict.values()]
464+
node_dict]
465465

466466
df_labels = pd.DataFrame(
467467
labels).T
@@ -520,7 +520,6 @@ def benchmark_reproducibility(comb, modality, alg, par_dict, disc,
520520
df_summary.at[0, f"{lp}_icc"] = np.nan
521521
coord_in = np.nan
522522
label_in = np.nan
523-
del c_icc
524523

525524
dict_sum[f"{lp}_coord"] = coord_in
526525
dict_sum[f"{lp}_label"] = label_in
@@ -593,11 +592,11 @@ def benchmark_reproducibility(comb, modality, alg, par_dict, disc,
593592
if __name__ == "__main__":
594593
__spec__ = "ModuleSpec(name='builtins', loader=<class '_" \
595594
"frozen_importlib.BuiltinImporter'>)"
596-
base_dir = '/scratch/04171/dpisner/HNU/HNU_outs/triple'
597-
#base_dir = '/scratch/04171/dpisner/HNU/HNU_outs/outputs_language'
595+
#base_dir = '/scratch/04171/dpisner/HNU/HNU_outs/triple'
596+
base_dir = '/scratch/04171/dpisner/HNU/HNU_outs/outputs_language'
598597
thr_type = "MST"
599-
icc = False
600-
disc = True
598+
icc = True
599+
disc = False
601600
int_consist = False
602601
target_modality = 'dwi'
603602

@@ -606,8 +605,10 @@ def benchmark_reproducibility(comb, modality, alg, par_dict, disc,
606605
#embedding_types = ['OMNI']
607606
embedding_types = ['OMNI', 'ASE']
608607
modalities = ['func', 'dwi']
609-
rsns = ['kmeans']
610-
#rsns = ['language']
608+
#rsns = ['kmeans', 'triple']
609+
#rsns = ['triple']
610+
#rsns = ['kmeans']
611+
rsns = ['language']
611612
#template = 'CN200'
612613
template = 'MNI152_T1'
613614
mets = ["global_efficiency",
@@ -719,7 +720,7 @@ def tuple_insert(tup, pos, ele):
719720
cache_dir = tempfile.mkdtemp()
720721

721722
with Parallel(
722-
n_jobs=128, require="sharedmem", backend='threading',
723+
n_jobs=-1, require="sharedmem", backend='threading',
723724
verbose=10, max_nbytes='20000M',
724725
temp_folder=cache_dir
725726
) as parallel:
@@ -734,8 +735,8 @@ def tuple_insert(tup, pos, ele):
734735
# outs = []
735736
# for comb in grid:
736737
# outs.append(benchmark_reproducibility(
737-
# comb, modality, alg, par_dict,
738-
# disc, final_missingness_summary,
738+
# comb, modality, alg, sub_dict_clean,
739+
# disc, final_missingness_summary, icc_tmps_dir,
739740
# ))
740741
df_summary = pd.concat([i for i in outs if i is not None and not i.empty], axis=0)
741742
df_summary = df_summary.dropna(axis=0, how='all')

0 commit comments

Comments
 (0)