diff --git a/cluster_parameter_sweep.py b/cluster_parameter_sweep.py index 389f980..3fb7ec5 100644 --- a/cluster_parameter_sweep.py +++ b/cluster_parameter_sweep.py @@ -39,13 +39,6 @@ def run_async(self, mapper=None, aggregator=None, reducer=None, number_of_trajec store_realizations=True, add_realizations=False, realizations_storage_directory=None): """ Creates a new remote_job and deploys it on the cluster. Returns RemoteJob deployed. """ - # Verify that given parameters are not referenced from other modules, as that produces referenced cloudpickling. - calling_module = inspect.getmodule(inspect.stack()[1][0]) - logging.info("Caller module: {0}".format(calling_module)) - # calling_module_name = calling_module.__name__ if calling_module is not None else None - # ClusterParameterSweep.check_ingredients_to_be_pickled(self.model_cls, mapper, aggregator, reducer, - # module_name=calling_module_name) - # Create new remote job. job_id = create_new_id() @@ -132,12 +125,19 @@ def clean_up(self, remote_job): def get_results(self, remote_job, add_realizations=False): import time + import paramiko + while True: try: results = self.get_sweep_result(remote_job, add_realizations=add_realizations) if add_realizations is False: self.clean_up(remote_job) return results + except paramiko.SSHException as e: + logging.error(e) + logging.info("Backing off... will try to connect to {0}@{1} again in {2} minutes." + .format(remote_job.remote_host.username, remote_job.remote_host.ip_address, constants.BACK_OFF_TIME / 60)) + time.sleep(constants.BACK_OFF_TIME) except cluster_execution_exceptions.RemoteJobNotFinished: time.sleep(1) diff --git a/constants.py b/constants.py index e6ec917..ee5f6bc 100644 --- a/constants.py +++ b/constants.py @@ -16,3 +16,4 @@ RemoteJobRunning = 0 RemoteJobCompleted = 1 RemoteJobFailed = 2 +BACK_OFF_TIME = 30 * 60