From aa49638a23f787dc74f3db930e77dd0a50b05576 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Matthias=20K=C3=BCmmerer?= <matthias@matthias-k.org>
Date: Mon, 18 May 2015 17:22:31 +0200
Subject: [PATCH] Refactor cluster_view to make it usable without context
 manager
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Matthias Kümmerer <matthias@matthias-k.org>
---
 cluster_helper/cluster.py | 251 ++++++++++++++++++++++----------------
 1 file changed, 144 insertions(+), 107 deletions(-)

diff --git a/cluster_helper/cluster.py b/cluster_helper/cluster.py
index cdfb661..bf683c6 100644
--- a/cluster_helper/cluster.py
+++ b/cluster_helper/cluster.py
@@ -860,6 +860,145 @@ def _stop(profile, cluster_id):
     args += _get_profile_args(profile)
     subprocess.check_call(args)
 
+
+class ClusterView(object):
+    """Provide a view on an ipython cluster for processing.
+
+      - scheduler: The type of cluster to start (lsf, sge, pbs, torque).
+      - num_jobs: Number of jobs to start.
+      - cores_per_job: The number of cores to use for each job.
+      - start_wait: How long to wait for the cluster to startup, in minutes.
+        Defaults to 16 minutes. Set to longer for slow starting clusters.
+      - retries: Number of retries to allow for failed tasks.
+    """
+    def __init__(self, scheduler, queue, num_jobs, cores_per_job=1, profile=None,
+                 start_wait=16, extra_params=None, retries=None, direct=False):
+        self.stopped = False
+        self.profile = profile
+        num_jobs = int(num_jobs)
+        cores_per_job = int(cores_per_job)
+        start_wait = int(start_wait)
+
+        if extra_params is None:
+            extra_params = {}
+        max_delay = start_wait * 60
+        delay = 5 if extra_params.get("run_local") else 30
+        max_tries = 10
+        _create_base_ipython_dirs()
+        if self.profile is None:
+            self.has_throwaway = True
+            self.profile = create_throwaway_profile()
+        else:
+            # ensure we have an .ipython directory to prevent issues
+            # creating it during parallel startup
+            cmd = [sys.executable, "-E", "-c", "from IPython import start_ipython; start_ipython()",
+                   "profile", "create", "--parallel"] + _get_profile_args(self.profile)
+            subprocess.check_call(cmd)
+            self.has_throwaway = False
+        num_tries = 0
+
+        self.cluster_id = str(uuid.uuid4())
+        url_file = get_url_file(self.profile, self.cluster_id)
+
+        while 1:
+            try:
+                if extra_params.get("run_local"):
+                    _start_local(num_jobs, self.profile, self.cluster_id)
+                else:
+                    _start(scheduler, self.profile, queue, num_jobs, cores_per_job, self.cluster_id, extra_params)
+                break
+            except subprocess.CalledProcessError:
+                if num_tries > max_tries:
+                    raise
+                num_tries += 1
+                time.sleep(delay)
+
+        try:
+            self.client = None
+            need_engines = 1  # Start using cluster when this many engines are up
+            slept = 0
+            max_up = 0
+            up = 0
+            while up < need_engines:
+                up = _nengines_up(url_file)
+                if up < max_up:
+                    print ("Engine(s) that were up have shutdown prematurely. "
+                           "Aborting cluster startup.")
+                    _stop(self.profile, self.cluster_id)
+                    sys.exit(1)
+                max_up = up
+                time.sleep(delay)
+                slept += delay
+                if slept > max_delay:
+                    raise IOError("""
+
+        The cluster startup timed out. This could be for a couple of reasons. The
+        most common reason is that the queue you are submitting jobs to is
+        oversubscribed. You can check if this is what is happening by trying again,
+        and watching to see if jobs are in a pending state or a running state when
+        the startup times out. If they are in the pending state, that means we just
+        need to wait longer for them to start, which you can specify by passing
+        the --timeout parameter, in minutes.
+
+        The second reason is that there is a problem with the controller and engine
+        jobs being submitted to the scheduler. In the directory you ran from,
+        you should see files that are named YourScheduler_enginesABunchOfNumbers and
+        YourScheduler_controllerABunchOfNumbers. If you submit one of those files
+        manually to your scheduler (for example bsub < YourScheduler_controllerABunchOfNumbers)
+        You will get a more helpful error message that might help you figure out what
+        is going wrong.
+
+        The third reason is that you need to submit your bcbio_nextgen.py job itself as a job;
+        bcbio-nextgen needs to run on a compute node, not the login node. So the
+        command you use to run bcbio-nextgen should be submitted as a job to
+        the scheduler. You can diagnose this because the controller and engine
+        jobs will be in the running state, but the cluster will still timeout.
+
+        Finally, it may be an issue with how the cluster is configured-- the controller
+        and engine jobs are unable to talk to each other. They need to be able to open
+        ports on the machines each of them are running on in order to work. You
+        can diagnose this as the possible issue by if you have submitted the bcbio-nextgen
+        job to the scheduler, the bcbio-nextgen main job and the controller and
+        engine jobs are all in a running state and the cluster still times out. This will
+        likely to be something that you'll have to talk to the administrators of the cluster
+        you are using about.
+
+        If you need help debugging, please post an issue here and we'll try to help you
+        with the detective work:
+
+        https://github.com/roryk/ipython-cluster-helper/issues
+
+                            """)
+            self.client = Client(url_file, timeout=60)
+            if direct:
+                self.view = _get_direct_view(self.client, retries)
+            else:
+                self.view = _get_balanced_blocked_view(self.client, retries)
+            self.view.clusterhelper = {"profile": self.profile,
+                                       "cluster_id": self.cluster_id}
+            if dill:
+                pickleutil.use_dill()
+                self.view.apply(pickleutil.use_dill)
+        except:
+            self.stop()
+            raise
+
+    def stop(self):
+        if not self.stopped:
+            if self.client:
+                _shutdown(self.client)
+            _stop(self.profile, self.cluster_id)
+            if self.has_throwaway:
+                delete_profile(self.profile)
+            self.stopped = True
+
+    def __enter__(self):
+        yield self
+
+    def __exit__(self):
+        self.stop()
+
+
 @contextlib.contextmanager
 def cluster_view(scheduler, queue, num_jobs, cores_per_job=1, profile=None,
                  start_wait=16, extra_params=None, retries=None, direct=False):
@@ -872,115 +1011,13 @@ def cluster_view(scheduler, queue, num_jobs, cores_per_job=1, profile=None,
         Defaults to 16 minutes. Set to longer for slow starting clusters.
       - retries: Number of retries to allow for failed tasks.
     """
-    num_jobs = int(num_jobs)
-    cores_per_job = int(cores_per_job)
-    start_wait = int(start_wait)
-
-    if extra_params is None:
-        extra_params = {}
-    max_delay = start_wait * 60
-    delay = 5 if extra_params.get("run_local") else 30
-    max_tries = 10
-    _create_base_ipython_dirs()
-    if profile is None:
-        has_throwaway = True
-        profile = create_throwaway_profile()
-    else:
-        # ensure we have an .ipython directory to prevent issues
-        # creating it during parallel startup
-        cmd = [sys.executable, "-E", "-c", "from IPython import start_ipython; start_ipython()",
-               "profile", "create", "--parallel"] + _get_profile_args(profile)
-        subprocess.check_call(cmd)
-        has_throwaway = False
-    num_tries = 0
-
-    cluster_id = str(uuid.uuid4())
-    url_file = get_url_file(profile, cluster_id)
-
-    while 1:
-        try:
-            if extra_params.get("run_local"):
-                _start_local(num_jobs, profile, cluster_id)
-            else:
-                _start(scheduler, profile, queue, num_jobs, cores_per_job, cluster_id, extra_params)
-            break
-        except subprocess.CalledProcessError:
-            if num_tries > max_tries:
-                raise
-            num_tries += 1
-            time.sleep(delay)
+    cluster_view = ClusterView(scheduler, queue, num_jobs, cores_per_job=cores_per_job,
+                               profile=profile, start_wait=start_wait, extra_params=extra_params,
+                               retries=retries, direct=False)
     try:
-        need_engines = 1  # Start using cluster when this many engines are up
-        client = None
-        slept = 0
-        max_up = 0
-        up = 0
-        while up < need_engines:
-            up = _nengines_up(url_file)
-            if up < max_up:
-                print ("Engine(s) that were up have shutdown prematurely. "
-                       "Aborting cluster startup.")
-                _stop(profile, cluster_id)
-                sys.exit(1)
-            max_up = up
-            time.sleep(delay)
-            slept += delay
-            if slept > max_delay:
-                raise IOError("""
-
-    The cluster startup timed out. This could be for a couple of reasons. The
-    most common reason is that the queue you are submitting jobs to is
-    oversubscribed. You can check if this is what is happening by trying again,
-    and watching to see if jobs are in a pending state or a running state when
-    the startup times out. If they are in the pending state, that means we just
-    need to wait longer for them to start, which you can specify by passing
-    the --timeout parameter, in minutes.
-
-    The second reason is that there is a problem with the controller and engine
-    jobs being submitted to the scheduler. In the directory you ran from,
-    you should see files that are named YourScheduler_enginesABunchOfNumbers and
-    YourScheduler_controllerABunchOfNumbers. If you submit one of those files
-    manually to your scheduler (for example bsub < YourScheduler_controllerABunchOfNumbers)
-    You will get a more helpful error message that might help you figure out what
-    is going wrong.
-
-    The third reason is that you need to submit your bcbio_nextgen.py job itself as a job;
-    bcbio-nextgen needs to run on a compute node, not the login node. So the
-    command you use to run bcbio-nextgen should be submitted as a job to
-    the scheduler. You can diagnose this because the controller and engine
-    jobs will be in the running state, but the cluster will still timeout.
-
-    Finally, it may be an issue with how the cluster is configured-- the controller
-    and engine jobs are unable to talk to each other. They need to be able to open
-    ports on the machines each of them are running on in order to work. You
-    can diagnose this as the possible issue by if you have submitted the bcbio-nextgen
-    job to the scheduler, the bcbio-nextgen main job and the controller and
-    engine jobs are all in a running state and the cluster still times out. This will
-    likely to be something that you'll have to talk to the administrators of the cluster
-    you are using about.
-
-    If you need help debugging, please post an issue here and we'll try to help you
-    with the detective work:
-
-    https://github.com/roryk/ipython-cluster-helper/issues
-
-                        """)
-        client = Client(url_file, timeout=60)
-        if direct:
-            view = _get_direct_view(client, retries)
-        else:
-            view = _get_balanced_blocked_view(client, retries)
-        view.clusterhelper = {"profile": profile, "cluster_id": cluster_id}
-        if dill:
-            pickleutil.use_dill()
-            view.apply(pickleutil.use_dill)
-        yield view
+        yield cluster_view.view
     finally:
-        if client:
-            _shutdown(client)
-        _stop(profile, cluster_id)
-        if has_throwaway:
-            delete_profile(profile)
+        cluster_view.stop()
 
 def _nengines_up(url_file):
     "return the number of engines up"