diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index 41b8fdfd3..464f8c4f5 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -606,27 +606,43 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No df.loc[i, "status"] = "skipped" stats["start_job skipped"] += 1 + def on_job_done(self, job: BatchJob, row): """ Handles jobs that have finished. Can be overridden to provide custom behaviour. Default implementation downloads the results into a folder containing the title. + Default implementation runs the download in a separate thread. :param job: The job that has finished. :param row: DataFrame row containing the job's metadata. """ - # TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use? + _log.info(f"Job {job.job_id} completed. Preparing to handle completion.") job_metadata = job.describe() job_dir = self.get_job_dir(job.job_id) metadata_path = self.get_job_metadata_path(job.job_id) - self.ensure_job_dir_exists(job.job_id) - job.get_results().download_files(target=job_dir) + # Save metadata + _log.info(f"Saving metadata for job {job.job_id} to {metadata_path}") with metadata_path.open("w", encoding="utf-8") as f: json.dump(job_metadata, f, ensure_ascii=False) + # Define download logic inline + def download_task(): + try: + _log.info(f"Starting download for job {job.job_id} to directory {job_dir}") + job.get_results().download_files(target=job_dir) + _log.info(f"Successfully downloaded job {job.job_id} results to {job_dir}") + except Exception as e: + _log.error(f"Error downloading job {job.job_id}: {e}") + + # Start the download in a separate thread + _log.info(f"Starting download thread for job {job.job_id}") + downloader = Thread(target=download_task, daemon=True) + downloader.start() + def on_job_error(self, job: BatchJob, row): """ Handles jobs that stopped with errors. Can be overridden to provide custom behaviour. @@ -722,8 +738,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = if new_status == "finished": stats["job finished"] += 1 - self.on_job_done(the_job, active.loc[i]) - + self.on_job_done(the_job, active.loc[i]) + if previous_status != "error" and new_status == "error": stats["job failed"] += 1 self.on_job_error(the_job, active.loc[i])