Skip to content

Commit

Permalink
Refactor cluster_view to make it usable without context manager
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Kümmerer <matthias@matthias-k.org>
  • Loading branch information
matthias-k authored and roryk committed May 20, 2015
1 parent a3ce4a5 commit aa49638
Showing 1 changed file with 144 additions and 107 deletions.
251 changes: 144 additions & 107 deletions cluster_helper/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,145 @@ def _stop(profile, cluster_id):
args += _get_profile_args(profile)
subprocess.check_call(args)


class ClusterView(object):
"""Provide a view on an ipython cluster for processing.
- scheduler: The type of cluster to start (lsf, sge, pbs, torque).
- num_jobs: Number of jobs to start.
- cores_per_job: The number of cores to use for each job.
- start_wait: How long to wait for the cluster to startup, in minutes.
Defaults to 16 minutes. Set to longer for slow starting clusters.
- retries: Number of retries to allow for failed tasks.
"""
def __init__(self, scheduler, queue, num_jobs, cores_per_job=1, profile=None,
start_wait=16, extra_params=None, retries=None, direct=False):
self.stopped = False
self.profile = profile
num_jobs = int(num_jobs)
cores_per_job = int(cores_per_job)
start_wait = int(start_wait)

if extra_params is None:
extra_params = {}
max_delay = start_wait * 60
delay = 5 if extra_params.get("run_local") else 30
max_tries = 10
_create_base_ipython_dirs()
if self.profile is None:
self.has_throwaway = True
self.profile = create_throwaway_profile()
else:
# ensure we have an .ipython directory to prevent issues
# creating it during parallel startup
cmd = [sys.executable, "-E", "-c", "from IPython import start_ipython; start_ipython()",
"profile", "create", "--parallel"] + _get_profile_args(self.profile)
subprocess.check_call(cmd)
self.has_throwaway = False
num_tries = 0

self.cluster_id = str(uuid.uuid4())
url_file = get_url_file(self.profile, self.cluster_id)

while 1:
try:
if extra_params.get("run_local"):
_start_local(num_jobs, self.profile, self.cluster_id)
else:
_start(scheduler, self.profile, queue, num_jobs, cores_per_job, self.cluster_id, extra_params)
break
except subprocess.CalledProcessError:
if num_tries > max_tries:
raise
num_tries += 1
time.sleep(delay)

try:
self.client = None
need_engines = 1 # Start using cluster when this many engines are up
slept = 0
max_up = 0
up = 0
while up < need_engines:
up = _nengines_up(url_file)
if up < max_up:
print ("Engine(s) that were up have shutdown prematurely. "
"Aborting cluster startup.")
_stop(self.profile, self.cluster_id)
sys.exit(1)
max_up = up
time.sleep(delay)
slept += delay
if slept > max_delay:
raise IOError("""
The cluster startup timed out. This could be for a couple of reasons. The
most common reason is that the queue you are submitting jobs to is
oversubscribed. You can check if this is what is happening by trying again,
and watching to see if jobs are in a pending state or a running state when
the startup times out. If they are in the pending state, that means we just
need to wait longer for them to start, which you can specify by passing
the --timeout parameter, in minutes.
The second reason is that there is a problem with the controller and engine
jobs being submitted to the scheduler. In the directory you ran from,
you should see files that are named YourScheduler_enginesABunchOfNumbers and
YourScheduler_controllerABunchOfNumbers. If you submit one of those files
manually to your scheduler (for example bsub < YourScheduler_controllerABunchOfNumbers)
You will get a more helpful error message that might help you figure out what
is going wrong.
The third reason is that you need to submit your bcbio_nextgen.py job itself as a job;
bcbio-nextgen needs to run on a compute node, not the login node. So the
command you use to run bcbio-nextgen should be submitted as a job to
the scheduler. You can diagnose this because the controller and engine
jobs will be in the running state, but the cluster will still timeout.
Finally, it may be an issue with how the cluster is configured-- the controller
and engine jobs are unable to talk to each other. They need to be able to open
ports on the machines each of them are running on in order to work. You
can diagnose this as the possible issue by if you have submitted the bcbio-nextgen
job to the scheduler, the bcbio-nextgen main job and the controller and
engine jobs are all in a running state and the cluster still times out. This will
likely to be something that you'll have to talk to the administrators of the cluster
you are using about.
If you need help debugging, please post an issue here and we'll try to help you
with the detective work:
https://github.com/roryk/ipython-cluster-helper/issues
""")
self.client = Client(url_file, timeout=60)
if direct:
self.view = _get_direct_view(self.client, retries)
else:
self.view = _get_balanced_blocked_view(self.client, retries)
self.view.clusterhelper = {"profile": self.profile,
"cluster_id": self.cluster_id}
if dill:
pickleutil.use_dill()
self.view.apply(pickleutil.use_dill)
except:
self.stop()
raise

def stop(self):
if not self.stopped:
if self.client:
_shutdown(self.client)
_stop(self.profile, self.cluster_id)
if self.has_throwaway:
delete_profile(self.profile)
self.stopped = True

def __enter__(self):
yield self

def __exit__(self):
self.stop()


@contextlib.contextmanager
def cluster_view(scheduler, queue, num_jobs, cores_per_job=1, profile=None,
start_wait=16, extra_params=None, retries=None, direct=False):
Expand All @@ -872,115 +1011,13 @@ def cluster_view(scheduler, queue, num_jobs, cores_per_job=1, profile=None,
Defaults to 16 minutes. Set to longer for slow starting clusters.
- retries: Number of retries to allow for failed tasks.
"""
num_jobs = int(num_jobs)
cores_per_job = int(cores_per_job)
start_wait = int(start_wait)

if extra_params is None:
extra_params = {}
max_delay = start_wait * 60
delay = 5 if extra_params.get("run_local") else 30
max_tries = 10
_create_base_ipython_dirs()
if profile is None:
has_throwaway = True
profile = create_throwaway_profile()
else:
# ensure we have an .ipython directory to prevent issues
# creating it during parallel startup
cmd = [sys.executable, "-E", "-c", "from IPython import start_ipython; start_ipython()",
"profile", "create", "--parallel"] + _get_profile_args(profile)
subprocess.check_call(cmd)
has_throwaway = False
num_tries = 0

cluster_id = str(uuid.uuid4())
url_file = get_url_file(profile, cluster_id)

while 1:
try:
if extra_params.get("run_local"):
_start_local(num_jobs, profile, cluster_id)
else:
_start(scheduler, profile, queue, num_jobs, cores_per_job, cluster_id, extra_params)
break
except subprocess.CalledProcessError:
if num_tries > max_tries:
raise
num_tries += 1
time.sleep(delay)
cluster_view = ClusterView(scheduler, queue, num_jobs, cores_per_job=cores_per_job,
profile=profile, start_wait=start_wait, extra_params=extra_params,
retries=retries, direct=False)
try:
need_engines = 1 # Start using cluster when this many engines are up
client = None
slept = 0
max_up = 0
up = 0
while up < need_engines:
up = _nengines_up(url_file)
if up < max_up:
print ("Engine(s) that were up have shutdown prematurely. "
"Aborting cluster startup.")
_stop(profile, cluster_id)
sys.exit(1)
max_up = up
time.sleep(delay)
slept += delay
if slept > max_delay:
raise IOError("""
The cluster startup timed out. This could be for a couple of reasons. The
most common reason is that the queue you are submitting jobs to is
oversubscribed. You can check if this is what is happening by trying again,
and watching to see if jobs are in a pending state or a running state when
the startup times out. If they are in the pending state, that means we just
need to wait longer for them to start, which you can specify by passing
the --timeout parameter, in minutes.
The second reason is that there is a problem with the controller and engine
jobs being submitted to the scheduler. In the directory you ran from,
you should see files that are named YourScheduler_enginesABunchOfNumbers and
YourScheduler_controllerABunchOfNumbers. If you submit one of those files
manually to your scheduler (for example bsub < YourScheduler_controllerABunchOfNumbers)
You will get a more helpful error message that might help you figure out what
is going wrong.
The third reason is that you need to submit your bcbio_nextgen.py job itself as a job;
bcbio-nextgen needs to run on a compute node, not the login node. So the
command you use to run bcbio-nextgen should be submitted as a job to
the scheduler. You can diagnose this because the controller and engine
jobs will be in the running state, but the cluster will still timeout.
Finally, it may be an issue with how the cluster is configured-- the controller
and engine jobs are unable to talk to each other. They need to be able to open
ports on the machines each of them are running on in order to work. You
can diagnose this as the possible issue by if you have submitted the bcbio-nextgen
job to the scheduler, the bcbio-nextgen main job and the controller and
engine jobs are all in a running state and the cluster still times out. This will
likely to be something that you'll have to talk to the administrators of the cluster
you are using about.
If you need help debugging, please post an issue here and we'll try to help you
with the detective work:
https://github.com/roryk/ipython-cluster-helper/issues
""")
client = Client(url_file, timeout=60)
if direct:
view = _get_direct_view(client, retries)
else:
view = _get_balanced_blocked_view(client, retries)
view.clusterhelper = {"profile": profile, "cluster_id": cluster_id}
if dill:
pickleutil.use_dill()
view.apply(pickleutil.use_dill)
yield view
yield cluster_view.view
finally:
if client:
_shutdown(client)
_stop(profile, cluster_id)
if has_throwaway:
delete_profile(profile)
cluster_view.stop()

def _nengines_up(url_file):
"return the number of engines up"
Expand Down

0 comments on commit aa49638

Please sign in to comment.