Skip to content

Commit

Permalink
Merge pull request galaxyproject#16695 from davelopez/enhance_task_mo…
Browse files Browse the repository at this point in the history
…nitor_composable

Enhance task monitor composable
  • Loading branch information
mvdbeek committed Sep 20, 2023
2 parents 0daa4aa + a47abb9 commit d0c7f05
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,38 @@
import axios from "axios";
import MockAdapter from "axios-mock-adapter";
import { useTaskMonitor } from "composables/taskMonitor";
import flushPromises from "flush-promises";

import { useTaskMonitor } from "@/composables/taskMonitor";
import { mockFetcher } from "@/schema/__mocks__";

jest.mock("@/schema");

const PENDING_TASK_ID = "pending-fake-task-id";
const COMPLETED_TASK_ID = "completed-fake-task-id";
const FAILED_TASK_ID = "failed-fake-task-id";
const REQUEST_FAILED_TASK_ID = "request-failed-fake-task-id";

describe("useTaskMonitor", () => {
let axiosMock;
function getMockedTaskStatus({ task_id }: { task_id: string }) {
switch (task_id) {
case PENDING_TASK_ID:
return { data: "PENDING", status: 200 };

beforeEach(async () => {
axiosMock = new MockAdapter(axios);
});
case COMPLETED_TASK_ID:
return { data: "SUCCESS", status: 200 };

afterEach(async () => {
axiosMock.reset();
});
case FAILED_TASK_ID:
return { data: "FAILURE", status: 200 };

case REQUEST_FAILED_TASK_ID:
throw new Error("Request failed");

default:
return { data: "UNKNOWN", status: 404 };
}
}

mockFetcher.path("/api/tasks/{task_id}/state").method("get").mock(getMockedTaskStatus);

describe("useTaskMonitor", () => {
it("should indicate the task is running when it is still pending", async () => {
axiosMock.onGet(`/api/tasks/${PENDING_TASK_ID}/state`).reply(200, "PENDING");
const { waitForTask, isRunning } = useTaskMonitor();

expect(isRunning.value).toBe(false);
Expand All @@ -30,7 +42,6 @@ describe("useTaskMonitor", () => {
});

it("should indicate the task is successfully completed when the state is SUCCESS", async () => {
axiosMock.onGet(`/api/tasks/${COMPLETED_TASK_ID}/state`).reply(200, "SUCCESS");
const { waitForTask, isRunning, isCompleted } = useTaskMonitor();

expect(isCompleted.value).toBe(false);
Expand All @@ -41,7 +52,6 @@ describe("useTaskMonitor", () => {
});

it("should indicate the task has failed when the state is FAILED", async () => {
axiosMock.onGet(`/api/tasks/${FAILED_TASK_ID}/state`).reply(200, "FAILURE");
const { waitForTask, isRunning, hasFailed } = useTaskMonitor();

expect(hasFailed.value).toBe(false);
Expand All @@ -52,7 +62,6 @@ describe("useTaskMonitor", () => {
});

it("should indicate the task status request failed when the request failed", async () => {
axiosMock.onGet(`/api/tasks/${REQUEST_FAILED_TASK_ID}/state`).reply(400, "UNKNOWN");
const { waitForTask, requestHasFailed } = useTaskMonitor();

expect(requestHasFailed.value).toBe(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,57 +1,61 @@
import axios from "axios";
import { computed, readonly, ref } from "vue";

import { fetcher } from "@/schema";
import { errorMessageAsString } from "@/utils/simple-error";

const SUCCESS_STATE = "SUCCESS";
const FAILURE_STATE = "FAILURE";
const TASK_READY_STATES = [SUCCESS_STATE, FAILURE_STATE];
const DEFAULT_POLL_DELAY = 10000;

const getTaskStatus = fetcher.path("/api/tasks/{task_id}/state").method("get").create();

/**
* Composable for waiting on Galaxy background tasks.
*/
export function useTaskMonitor() {
let timeout = null;
let timeout: NodeJS.Timeout | null = null;
let pollDelay = DEFAULT_POLL_DELAY;

const isRunning = ref(false);
const status = ref(null);
const currentTaskId = ref(null);
const status = ref<string>();
const currentTaskId = ref<string>();
const requestHasFailed = ref(false);

const isCompleted = computed(() => status.value === SUCCESS_STATE);
const hasFailed = computed(() => status.value === FAILURE_STATE);
const queryStateUrl = computed(() => `/api/tasks/${currentTaskId.value}/state`);

function waitForTask(taskId, pollDelayInMs = DEFAULT_POLL_DELAY) {
async function waitForTask(taskId: string, pollDelayInMs = DEFAULT_POLL_DELAY) {
pollDelay = pollDelayInMs;
resetState();
currentTaskId.value = taskId;
isRunning.value = true;
fetchTaskStatus();
}

function fetchTaskStatus() {
axios.get(queryStateUrl.value).then(handleStatusResponse).catch(handleError);
return fetchTaskStatus(taskId);
}

function handleStatusResponse(response) {
status.value = response.data;
const isReady = TASK_READY_STATES.includes(status.value);
if (isReady) {
isRunning.value = false;
} else {
pollAfterDelay();
async function fetchTaskStatus(taskId: string) {
try {
const { data } = await getTaskStatus({ task_id: taskId });
status.value = data;
const isReady = TASK_READY_STATES.includes(status.value);
if (isReady) {
isRunning.value = false;
} else {
pollAfterDelay(taskId);
}
} catch (err) {
handleError(errorMessageAsString(err));
}
}

function pollAfterDelay(taskId) {
function pollAfterDelay(taskId: string) {
resetTimeout();
timeout = setTimeout(() => {
fetchTaskStatus(taskId);
}, pollDelay);
}

function handleError(err) {
function handleError(err: string) {
status.value = err;
requestHasFailed.value = true;
isRunning.value = false;
Expand All @@ -66,7 +70,7 @@ export function useTaskMonitor() {

function resetState() {
resetTimeout();
status.value = null;
status.value = undefined;
}

return {
Expand Down
8 changes: 7 additions & 1 deletion client/src/schema/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8688,6 +8688,12 @@ export interface components {
| "Page"
| "StoredWorkflow"
| "Visualization";
/**
* TaskState
* @description Enum representing the possible states of a task.
* @enum {string}
*/
TaskState: "PENDING" | "STARTED" | "RETRY" | "FAILURE" | "SUCCESS";
/** ToolDataDetails */
ToolDataDetails: {
/**
Expand Down Expand Up @@ -16677,7 +16683,7 @@ export interface operations {
/** @description String indicating task state. */
200: {
content: {
"application/json": string;
"application/json": components["schemas"]["TaskState"];
};
};
/** @description Validation Error */
Expand Down
27 changes: 24 additions & 3 deletions lib/galaxy/managers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,32 @@
ABCMeta,
abstractmethod,
)
from enum import Enum
from typing import cast
from uuid import UUID

from celery.result import AsyncResult


class TaskState(str, Enum):
"""Enum representing the possible states of a task."""

PENDING = "PENDING"
"""The task is waiting for execution."""

STARTED = "STARTED"
"""The task has been started."""

RETRY = "RETRY"
"""The task is to be retried, possibly because of failure."""

FAILURE = "FAILURE"
"""The task raised an exception, or has exceeded the retry limit."""

SUCCESS = "SUCCESS"
"""The task executed successfully."""


class AsyncTasksManager(metaclass=ABCMeta):
@abstractmethod
def is_ready(self, task_uuid: UUID) -> bool:
Expand All @@ -28,7 +49,7 @@ def has_failed(self, task_uuid: UUID) -> bool:
"""Return `True` if the task failed."""

@abstractmethod
def get_state(self, task_uuid: UUID) -> str:
def get_state(self, task_uuid: UUID) -> TaskState:
"""Returns the current state of the task as a string."""


Expand All @@ -50,9 +71,9 @@ def has_failed(self, task_uuid: UUID) -> bool:
"""Return `True` if the task failed."""
return self._get_result(task_uuid).failed()

def get_state(self, task_uuid: UUID) -> str:
def get_state(self, task_uuid: UUID) -> TaskState:
"""Returns the tasks current state as a string."""
return str(self._get_result(task_uuid).state)
return cast(TaskState, str(self._get_result(task_uuid).state))

def _get_result(self, task_uuid: UUID) -> AsyncResult:
return AsyncResult(str(task_uuid))
7 changes: 5 additions & 2 deletions lib/galaxy/webapps/galaxy/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import logging
from uuid import UUID

from galaxy.managers.tasks import AsyncTasksManager
from galaxy.managers.tasks import (
AsyncTasksManager,
TaskState,
)
from . import (
depends,
Router,
Expand All @@ -24,5 +27,5 @@ class FastAPITasks:
summary="Determine state of task ID",
response_description="String indicating task state.",
)
def state(self, task_id: UUID) -> str:
def state(self, task_id: UUID) -> TaskState:
return self.manager.get_state(task_id)
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def test_history_export_tracking(self):
history_export_tasks.toggle_options_link.wait_for_and_click()

# Export to FTP file source
history_export_tasks.file_source_tab.wait_for_present()
history_export_tasks.file_source_tab.wait_for_and_click()
self._export_to_ftp_with_filename("my_export.tar.gz")

Expand Down

0 comments on commit d0c7f05

Please sign in to comment.