diff --git a/doc/source/dev/build_a_job_runner.rst b/doc/source/dev/build_a_job_runner.rst index 0f0bc9de63b8..4a0e053034bf 100644 --- a/doc/source/dev/build_a_job_runner.rst +++ b/doc/source/dev/build_a_job_runner.rst @@ -4,9 +4,9 @@ Build a job runner A walk through the steps of building a runner for Galaxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -In this tutorial, we will build a runner in a block by block fashion -(like building blocks), so we will divide the runner into -components based on their function. +In this tutorial, we will look at how to build a runner in a block by block +fashion (like building blocks), so we will divide the runner into components +based on their function. We assume you are familiar with setting up and managing a local installation of Galaxy. @@ -20,43 +20,53 @@ What is required to make a runner for Galaxy? ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ `galaxy.jobs.runners.\_\_init\_\_.py `__ -has the base runner implementation. To create a new runner, that base -runner must be inherited and only certain methods need to be +contains the base runner implementation. To create a new runner, the base +runner class (in most cases, ``AsynchronousJobRunner``) must be inherited and only certain methods need to be overridden with your logic. These are the methods that need to be implemented: -1. ``__init__(app, nworkers, **kwargs)`` +1. ``queue_job(job_wrapper)`` -2. ``queue_job(job_wrapper)`` +2. ``check_watched_item(job_state)`` -3. ``check_watched_item(job_state)`` +3. ``stop_job(job)`` -4. ``stop_job(job)`` +4. ``recover(job, job_wrapper)`` -5. ``recover(job, job_wrapper)`` +In addition, you will almost certainly override the ``__init__(app, nworkers, **kwargs)`` +method in order to add custom logic to initialize your runner. In +doing so, make sure to call the parent class constructor: + +.. code-block:: python + + super().__init__(app, nworkers, **kwargs) + +Keep in mind that when you override a method, you should not reimplement any of +the logic that is present in the base class: the above call to ``super()`` +ensures that all that such logic is handled automatically. The big picture --------------- The above methods are invoked at various stages of job execution in Galaxy. These methods will act as a mediator between the Galaxy -framework and the external execution platform. To know when and how +framework and the external execution platform. To learn when and how these methods are invoked, we will look at the implementation of the parent class and process lifecycle of a runner. Implementation of parent class (``galaxy.jobs.runners.__init__.py``) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -- .. rubric:: Class Inheritance structure - :name: class-inheritance-structure +Class Inheritance structure +--------------------------- - .. image:: inherit.png +.. image:: inherit.png -- .. rubric:: The big picture! - :name: the-big-picture-1 +The big picture +--------------- - .. image:: runner_diag.png +.. image:: runner_diag.png The whole process is divided into different stages for ease of understanding. @@ -65,7 +75,7 @@ Runner Methods in detail ~~~~~~~~~~~~~~~~~~~~~~~~ 1. ``__init__`` method - STAGE 1 -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +-------------------------------- Input params: @@ -107,28 +117,28 @@ Have a look at the sample ``job_conf.xml``: -The following steps are followed to manipulate the data in ``job_conf.xml`` +The data in ``job_conf.xml`` is manipulated through the following steps: -A: Define structure of data under plugin tag (plugin tag in +**Step 1:** Define structure of data under plugins tag (in ``job_conf.xml``) as a dictionary. .. code-block:: python runner_param_specs = dict(user=dict(map=str), key=dict(map=str)) -B: Update the dictionary structure in kwargs. +**Step 2:** Update the dictionary structure in kwargs. .. code-block:: python kwargs.update({'runner_param_specs': runner_param_specs}) -C: Now call the parent constructor to assign the values. +**Step 3:** Now call the parent constructor to assign the values. .. code-block:: python - super(GodockerJobRunner, self).__init__(app, nworkers, **kwargs) + super().__init__(app, nworkers, **kwargs) -D: The assigned values can be accessed in runner in the following way. +**Step 4:** The assigned values can be accessed in a runner in the following way. .. code-block:: python @@ -142,10 +152,10 @@ The output will be: gosc HELLOWORLD -E: Invoke the external API with the values obtained by the above method +**Step 5:** Invoke the external API with the values obtained by the above method for initialization. -Finally the worker threads and monitor threads are invoked for galaxy to +Finally, the worker threads and monitor threads are invoked for galaxy to listen for incoming tool submissions. .. code-block:: python @@ -154,30 +164,33 @@ listen for incoming tool submissions. self._init_worker_threads() 2. ``queue_job`` method - STAGE 2 -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +--------------------------------- -Input params: ``job_wrapper`` (Object of +Input params: ``job_wrapper`` (Object of type `galaxy.jobs.JobWrapper `__) Output params: None -``galaxy.jobs.JobWrapper`` is a Wrapper around 'model.Job' with convenience +``galaxy.jobs.JobWrapper`` is a wrapper around 'model.Job' with convenience methods for running processes and state management. -- Functioning of ``queue_job`` method. +Functioning of ``queue_job`` method +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The logic in the ``queue_job`` method follows these steps: - A. ``prepare_job()`` method is invoked to do some sanity checks that all runners' ``queue_job()`` methods are - likely to want to do and also to build runner command line for that - job. Initial state and configuration of the job are set and every - data is associated with **job\_wrapper**. +**Step 1.** ``prepare_job()`` method is invoked to do some sanity checks that all runners' ``queue_job()`` methods are +likely to want to do and also to build the runner command line for that +job. Initial state and configuration of the job are set and all +data is associated with **job\_wrapper**. - B. Submit job to the external runner and return the jobid. Accessing - jobs data (tool submitted in Galaxy webframework) is purely from - ``job_wrapper``. eg: ``job_wrapper.get_state()`` -> gives state of a job - (queued/running/failed/success/...) +**Step 2.** Submit job to the external runner and return the job id. Accessing +jobs data (tool submitted in Galaxy webframework) is purely from +``job_wrapper``. eg: ``job_wrapper.get_state()`` -> gives state of a job +(queued/running/failed/success/...) -Let us look at a means of accessing external runner's configuration -present under destination tag of ``job_conf.xml`` in the above example. +Let us look at how to access the external runner's configuration +present under the destination tag of ``job_conf.xml`` in the above example. .. code-block:: python @@ -185,20 +198,20 @@ present under destination tag of ``job_conf.xml`` in the above example. docker_cpu = int(job_destination.params["docker_cpu"]) docker_ram = int(job_destination.params["docker_memory"]) -A special case: User Story: A docker based external runner is present. A +A special case. User Story: a docker based external runner is present. A default docker image for execution is set in ``job_conf.xml``. A tool can -also specify the docker image for its execution. Specification in tool -is given more priority than the default specification. To achieve such a -functionality. We can use the following statement: +also specify the docker image for its execution. Specification in the tool +is given more priority than the default specification. For this functionality +we can use the following statement: .. code-block:: python docker_image = self._find_container(job_wrapper).container_id Note: This pre-written method is only for getting the external -image/container/os.. +image/container/os. -C. After successful submission of a job to the external runner, submit the +**Step 3.** After successful submission of a job to the external runner, submit the job to the Galaxy framework. To do that, make an object of type AsynchronousJobState and put it in the ``monitor_queue``. @@ -208,14 +221,14 @@ AsynchronousJobState and put it in the ``monitor_queue``. self.monitor_queue.put(ajs) 3. ``check_watched_item`` method - STAGE 3 -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +------------------------------------------ -Input params: ``job_state`` (Object of +Input params: ``job_state`` (Object of type `galaxy.jobs.runners.AsynchronousJobState `__) Output params: ``AsynchronousJobState`` object -Without going into much detail, assume there is a queue to track the status of every job. eg: +Without going into much detail, assume there is a queue to track the status of every job: .. image:: queue.png :align: center @@ -236,15 +249,15 @@ Note: Iterating through the queue is already taken care of by the framework. To inform Galaxy about the status of the job: -- Get the job status from external runner using the ``job_id``. +- Get the job status from the external runner using ``job_id``. -- Check if the job is queued/running/completed.. etc. A general structure is provided below. +- Check if the job is queued/running/completed, etc. A general structure is provided below. -- Call ``self.mark_as_finished(job_state)``, if the job has been successfully executed. +- Call ``self.mark_as_finished(job_state)`` if the job has been successfully executed. -- Call ``self.mark_as_failed(job_state)``, if the job has failed during execution. +- Call ``self.mark_as_failed(job_state)`` if the job has failed during execution. -- To change state of a job, change ``job_state.running`` and ``job_state.job_wrapper.change_state()`` +- To change the state of a job, change ``job_state.running`` and call ``job_state.job_wrapper.change_state()`` .. code-block:: python @@ -280,23 +293,23 @@ Note: jobs (i.e. if it is running or pending). If it no longer needs to be watched (e.g. it has terminated either successfully or with an error) it should return None. -``create_log_files()`` are nothing but copying the files (``error_file``, +``create_log_files()`` is nothing but copying the files (``error_file``, ``output_file``, ``exit_code_file``) from the external runner's directory to the working directory of Galaxy. -Source of the files are from the output directory of your external -runner. Destination of the files will be: +The source of the files is the output directory of your external runner. +The destination of the files will be: -- output file -> ``job_state.output_file``. +- output file -> ``job_state.output_file`` -- error file -> ``job_state.error_file``. +- error file -> ``job_state.error_file`` -- exit code file -> ``job_state.exit_code_file``. +- exit code file -> ``job_state.exit_code_file`` 4. ``stop_job`` method - STAGE 4 -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +-------------------------------- -Input params: job (Object of +Input params: job (Object of type `galaxy.model.Job `__) Output params: None @@ -314,22 +327,22 @@ The ``job_id`` of the job to be deleted is accessed by job.id 5. ``recover`` method - STAGE 5 -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +------------------------------- Input params: -- ``job`` (Object of `galaxy.model.Job `__). +- ``job`` (Object of type `galaxy.model.Job `__). -- ``job_wrapper`` (Object of `galaxy.jobs.JobWrapper `__). +- ``job_wrapper`` (Object of type `galaxy.jobs.JobWrapper `__). Output params: None Functionality: Recovers any jobs stuck in a queued/running state when -Galaxy was started. +Galaxy starts. This method is invoked by Galaxy at the time of startup. Jobs in Running -& Queued status in Galaxy are put in the ``monitor_queue`` by creating an +and Queued state in Galaxy are put in the ``monitor_queue`` by creating an ``AsynchronousJobState`` object. The following is a generic code snippet for the ``recover`` method.