Skip to content

Commit

Permalink
Merge pull request #442 from dPys/development
Browse files Browse the repository at this point in the history
[MAINT] Make indexed_gzip a conditional import for unix-based os
  • Loading branch information
dPys authored Sep 22, 2020
2 parents fc7c423 + 929fa24 commit 9a4daf9
Show file tree
Hide file tree
Showing 24 changed files with 691 additions and 446 deletions.
2 changes: 1 addition & 1 deletion pynets/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# from ._version import get_versions
# __version__ = get_versions()['version']
# del get_versions
__version__ = "1.0.14"
__version__ = "1.0.15"

__packagename__ = "pynets"
__copyright__ = "Copyright 2016, Derek Pisner"
Expand Down
169 changes: 71 additions & 98 deletions pynets/cli/pynets_collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import pandas as pd
import warnings
warnings.filterwarnings("ignore")
import sys
if sys.platform.startswith('win') is False:
import indexed_gzip


def get_parser():
Expand Down Expand Up @@ -101,17 +104,12 @@ def load_pd_dfs(file_):
if op.isfile(file_) and not file_.endswith("_clean.csv"):
try:
df = pd.read_csv(file_, chunksize=100000, encoding="utf-8",
nrows=1, skip_blank_lines=False,
warn_bad_lines=True, error_bad_lines=False,
memory_map=True, engine='python').read()
engine='python').read()
except:
print(f"Load failed for {file_}. Trying again with c engine.")
try:
df = pd.read_csv(file_, chunksize=100000, encoding="utf-8",
nrows=1, skip_blank_lines=False,
warn_bad_lines=True,
error_bad_lines=False,
memory_map=True, engine='c').read()
engine='c').read()
except:
print(f"Cannot load {file_}")
df = pd.DataFrame()
Expand Down Expand Up @@ -295,7 +293,7 @@ def mergedicts(dict1, dict2):
'otherwise create an inventory of missingness...')
par_dict = rerun_dict.copy()
cache_dir = tempfile.mkdtemp()
with Parallel(n_jobs=-1, require='sharedmem', verbose=10,
with Parallel(n_jobs=-1, backend='loky', verbose=10,
temp_folder=cache_dir) as parallel:
outs = parallel(delayed(recover_missing)(bad_col, bad_cols_dict,
par_dict, modality,
Expand Down Expand Up @@ -373,18 +371,12 @@ def recover_missing(bad_col, bad_cols_dict, rerun_dict, modality,
try:
df_tmp = pd.read_csv(
outs[0], chunksize=100000, compression="gzip",
encoding="utf-8",
nrows=1, skip_blank_lines=False,
warn_bad_lines=True, error_bad_lines=False,
memory_map=True, engine='python').read()
encoding="utf-8", engine='python').read()
except:
try:
df_tmp = pd.read_csv(
outs[0], chunksize=100000, compression="gzip",
encoding="utf-8",
nrows=1, skip_blank_lines=False,
warn_bad_lines=True, error_bad_lines=False,
memory_map=True, engine='c').read()
encoding="utf-8", engine='c').read()
except:
print(f"Cannot load {outs[0]}")
continue
Expand All @@ -396,52 +388,45 @@ def recover_missing(bad_col, bad_cols_dict, rerun_dict, modality,

if bad_col not in frame.columns:
continue
# from pynets.stats.netstats import \
# collect_pandas_df_make
# collect_pandas_df_make(
# glob.glob(f"{working_path}/{sub}/{ses}/"
# f"{modality}/{atlas}/topology/*_neat.csv"),
# f"{sub}_{ses}", None, False)
from pynets.stats.netstats import \
collect_pandas_df_make
collect_pandas_df_make(
glob.glob(f"{working_path}/{sub}/{ses}/"
f"{modality}/{atlas}/topology/*_neat.csv"),
f"{sub}_{ses}", None, False)
try:
frame.loc[lab, bad_col] = df_tmp.filter(
regex=bad_col.split('auc_')[1:][0]
).values.tolist()[0][0]
print(f"Recovered missing data from {sub}, {ses} for "
f"{bad_col}...")
except:
# from pynets.stats.netstats import \
# collect_pandas_df_make
# collect_pandas_df_make(
# glob.glob(f"{working_path}/{sub}/{ses}/"
# f"{modality}/{atlas}/topology/*_neat.csv"),
# f"{sub}_{ses}", None, False)
from pynets.stats.netstats import \
collect_pandas_df_make
collect_pandas_df_make(
glob.glob(f"{working_path}/{sub}/{ses}/"
f"{modality}/{atlas}/topology/*_neat.csv"),
f"{sub}_{ses}", None, False)
continue
del df_tmp
else:
# from pynets.stats.netstats import collect_pandas_df_make
# collect_pandas_df_make(glob.glob(f"{working_path}/{sub}/{ses}/"
# f"{modality}/{atlas}/
# topology/*_neat.csv"),
# f"{sub}_{ses}", None, False)
from pynets.stats.netstats import collect_pandas_df_make
collect_pandas_df_make(glob.glob(f"{working_path}/{sub}/{ses}/"
f"{modality}/{atlas}/topology/*_neat.csv"),
f"{sub}_{ses}", None, False)
rerun_dict[sub][ses][modality][atlas].append(bad_col)
continue
elif len(outs) > 1:
for out in outs:
try:
df_tmp = pd.read_csv(
out, chunksize=100000, compression="gzip",
encoding="utf-8",
nrows=1, skip_blank_lines=False,
warn_bad_lines=True, error_bad_lines=False,
memory_map=True, engine='python').read()
encoding="utf-8", engine='python').read()
except:
try:
df_tmp = pd.read_csv(
out, chunksize=100000, compression="gzip",
encoding="utf-8",
nrows=1, skip_blank_lines=False,
warn_bad_lines=True, error_bad_lines=False,
memory_map=True, engine='c').read()
encoding="utf-8", engine='c').read()
except:
print(f"Cannot load {out}")
continue
Expand All @@ -458,24 +443,23 @@ def recover_missing(bad_col, bad_cols_dict, rerun_dict, modality,
regex=bad_col.split('auc_')[1:][0]
).values.tolist()[0][0]
except:
# from pynets.stats.netstats import \
# collect_pandas_df_make
# collect_pandas_df_make(
# glob.glob(f"{working_path}/{sub}/{ses}/"
# f"{modality}/{atlas}/topology/
# *_neat.csv"),
# f"{sub}_{ses}", None, False)
from pynets.stats.netstats import \
collect_pandas_df_make
collect_pandas_df_make(
glob.glob(f"{working_path}/{sub}/{ses}/"
f"{modality}/{atlas}/topology/*_neat.csv"),
f"{sub}_{ses}", None, False)
continue
del df_tmp
else:
# Add to missingness inventory if not found
rerun_dict[sub][ses][modality][atlas].append(bad_col)
# from pynets.stats.netstats import \
# collect_pandas_df_make
# collect_pandas_df_make(
# glob.glob(f"{working_path}/{sub}/{ses}/"
# f"{modality}/{atlas}/topology/*_neat.csv"),
# f"{sub}_{ses}", None, False)
from pynets.stats.netstats import \
collect_pandas_df_make
collect_pandas_df_make(
glob.glob(f"{working_path}/{sub}/{ses}/"
f"{modality}/{atlas}/topology/*_neat.csv"),
f"{sub}_{ses}", None, False)
return rerun_dict, rerun


Expand Down Expand Up @@ -504,18 +488,12 @@ def load_pd_dfs_auc(atlas_name, prefix, auc_file, modality, drop_cols):
try:
df = pd.read_csv(
auc_file, chunksize=100000, compression="gzip",
encoding="utf-8",
nrows=1, skip_blank_lines=False,
warn_bad_lines=True, error_bad_lines=False,
memory_map=True, engine='c').read()
encoding="utf-8", engine='c').read()
except:
try:
df = pd.read_csv(
auc_file, chunksize=100000, compression="gzip",
encoding="utf-8",
nrows=1, skip_blank_lines=False,
warn_bad_lines=True, error_bad_lines=False,
memory_map=True, engine='python').read()
encoding="utf-8", engine='python').read()
except:
df_pref = pd.DataFrame()
return df_pref
Expand Down Expand Up @@ -667,7 +645,6 @@ def collect_all(working_path, modality, drop_cols):
'warnings.filterwarnings("ignore")',
"import os",
"import numpy as np",
"import indexed_gzip",
"import nibabel as nib",
"import glob",
"import pandas as pd",
Expand Down Expand Up @@ -896,7 +873,7 @@ def main():
import sys
import glob
from pynets.cli.pynets_collect import build_collect_workflow
# from types import SimpleNamespace
from types import SimpleNamespace
from pathlib import Path

try:
Expand All @@ -912,28 +889,28 @@ def main():
" flag.\n")
sys.exit()

args = get_parser().parse_args()
# args_dict_all = {}
# args_dict_all['plug'] = 'MultiProc'
# args_dict_all['v'] = False
# args_dict_all['pm'] = '24,57'
# #args_dict_all['basedir'] = '/working/tuning_set/outputs_shaeffer/pynets'
# #args_dict_all['basedir'] = '/scratch/04171/dpisner/HNU/HNU_outs/triple/pynets'
# args_dict_all['basedir'] = '/scratch/04171/dpisner/HNU/HNU_outs/visual/pynets'
# args_dict_all['work'] = '/tmp/work/dwi'
# args_dict_all['modality'] = 'dwi'
# args_dict_all['dc'] = ['diversity_coefficient',
# 'participation_coefficient',
# 'average_local_efficiency',
# 'average_clustering',
# 'average_local_clustering_nodewise',
# 'average_local_efficiency_nodewise',
# 'degree_centrality',
# 'weighted_transitivity',
# # "_minlength-0",
# "_minlength-20", "_minlength-30", "variance",
# "res-1000"]
# args = SimpleNamespace(**args_dict_all)
# args = get_parser().parse_args()
args_dict_all = {}
args_dict_all['plug'] = 'MultiProc'
args_dict_all['v'] = False
args_dict_all['pm'] = '48,57'
#args_dict_all['basedir'] = '/working/tuning_set/outputs_shaeffer/pynets'
#args_dict_all['basedir'] = '/scratch/04171/dpisner/HNU/HNU_outs/triple/pynets'
#args_dict_all['basedir'] = '/scratch/04171/dpisner/HNU/HNU_outs/visual/pynets'
args_dict_all['basedir'] = '/scratch/04171/dpisner/tuning_set/outputs_shaeffer/pynets'
args_dict_all['work'] = '/tmp/work/func'
args_dict_all['modality'] = 'func'
args_dict_all['dc'] = ['diversity_coefficient',
'participation_coefficient',
'average_local_efficiency',
'average_clustering',
'average_local_clustering_nodewise',
'average_local_efficiency_nodewise',
'degree_centrality',
# "_minlength-0",
"_minlength-20", "_minlength-30", "variance",
"res-1000"]
args = SimpleNamespace(**args_dict_all)

from multiprocessing import set_start_method, Process, Manager

Expand All @@ -956,12 +933,12 @@ def main():
gc.collect()
mgr.shutdown()

# working_path = args_dict_all['basedir']
# modality = args_dict_all['modality']
# drop_cols = args_dict_all['dc']
working_path = args.basedir
modality = args.modality
drop_cols = args.dc
working_path = args_dict_all['basedir']
modality = args_dict_all['modality']
drop_cols = args_dict_all['dc']
# working_path = args.basedir
# modality = args.modality
# drop_cols = args.dc

all_files = glob.glob(
f"{str(Path(working_path))}/{modality}_group_topology_auc/*.csv"
Expand All @@ -974,15 +951,11 @@ def main():
for file_ in files_:
try:
df = pd.read_csv(file_, chunksize=100000, encoding="utf-8",
nrows=1, skip_blank_lines=False,
warn_bad_lines=True, error_bad_lines=False,
memory_map=True, engine='python').read()
engine='python').read()
except:
try:
df = pd.read_csv(file_, chunksize=100000, encoding="utf-8",
nrows=1, skip_blank_lines=False,
warn_bad_lines=True, error_bad_lines=False,
memory_map=True, engine='c').read()
engine='c').read()
except:
print(f"Cannot load {file_}...")
continue
Expand Down
8 changes: 4 additions & 4 deletions pynets/cli/pynets_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,9 @@ def get_parser():
metavar="Graph threshold",
default=1.00,
help="Optionally specify a threshold indicating a proportion of "
"weights to preserve in the graph. Default is proportional "
"thresholding. If omitted, no thresholding will be applied. \n",
"weights to preserve in the graph. Default is no thresholding. "
"If `-mst`, `-dt`, or `-df` flags are not included, than "
"proportional thresholding will be performed\n",
)
parser.add_argument(
"-min_thr",
Expand Down Expand Up @@ -2434,7 +2435,6 @@ def init_wf_single_subject(
"import os",
"import numpy as np",
"import networkx as nx",
"import indexed_gzip",
"import nibabel as nib",
"import warnings",
'warnings.filterwarnings("ignore")',
Expand Down Expand Up @@ -3541,7 +3541,7 @@ def main():
if len(sys.argv) < 1:
print("\nMissing command-line inputs! See help options with the -h"
" flag.\n")
sys.exit(0)
sys.exit(1)

args = get_parser().parse_args()

Expand Down
4 changes: 3 additions & 1 deletion pynets/core/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
"""
import warnings
import numpy as np
import indexed_gzip
import sys
if sys.platform.startswith('win') is False:
import indexed_gzip
import nibabel as nib
from nipype.interfaces.base import (
BaseInterface,
Expand Down
Loading

0 comments on commit 9a4daf9

Please sign in to comment.