Skip to content

Commit 13e334d

Browse files
committed
Updates to task manager
1 parent 7199a50 commit 13e334d

File tree

6 files changed

+63
-60
lines changed

6 files changed

+63
-60
lines changed

turbinia/state_manager.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,6 @@ def update_request_task(self, task) -> None:
269269
task (TurbiniaTask): Turbinia task object.
270270
"""
271271
request_key = self.redis_client.build_key_name('request', task.request_id)
272-
task_key = self.redis_client.build_key_name('task', task.id)
273272
try:
274273
self.redis_client.add_to_list(request_key, 'task_ids', task.id)
275274
request_last_update = datetime.strptime(
@@ -291,12 +290,12 @@ def update_request_task(self, task) -> None:
291290
elif task.result.successful is False:
292291
self.redis_client.add_to_list(request_key, 'failed_tasks', task.id)
293292
statuses_to_remove.remove('failed_tasks')
294-
task_status = self.redis_client.get_attribute(task_key, 'status')
295-
if task_status and 'Task is running' in task_status:
293+
task_status = task.celery_state
294+
if task_status == 'STARTED':
296295
self.redis_client.add_to_list(request_key, 'running_tasks', task.id)
297296
statuses_to_remove.remove('running_tasks')
298-
elif (task_status is None or task_status == 'queued' or
299-
task_status == 'pending'):
297+
elif (task_status is None or task_status == 'RECEIVED' or
298+
task_status == 'PENDING'):
300299
self.redis_client.add_to_list(request_key, 'queued_tasks', task.id)
301300
statuses_to_remove.remove('queued_tasks')
302301
for status_name in statuses_to_remove:

turbinia/task_manager.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,14 @@ def _backend_setup(self, *args, **kwargs):
687687
self.celery_runner = self.celery.app.task(
688688
task_utils.task_runner, name='task_runner')
689689

690+
def _get_worker_name(self, celery_stub):
691+
"""Gets the Celery worker name from the AsyncResult object."""
692+
worker_name = celery_stub.result.get('hostname', None)
693+
if worker_name:
694+
# example hostname: celery@turbinia-hostname
695+
worker_name = worker_name.split('@')[1]
696+
return worker_name
697+
690698
def process_tasks(self):
691699
"""Determine the current state of our tasks.
692700
@@ -700,28 +708,33 @@ def process_tasks(self):
700708
celery_task = task.stub
701709
# ref: https://docs.celeryq.dev/en/stable/reference/celery.states.html
702710
if not celery_task:
703-
log.debug(f'Task {task.stub.task_id:s} not yet created.')
711+
log.info(f'Task {task.stub.task_id:s} not yet created.')
704712
check_timeout = True
705713
elif celery_task.status == celery_states.STARTED:
706-
# Task status will be set to running when the worker executes run_wrapper()
707-
log.debug(f'Task {celery_task.id:s} not finished.')
714+
log.warning(f'Task {celery_task.id:s} {task.id} started.')
715+
task.worker_name = self._get_worker_name(celery_task)
716+
task.celery_state = celery_states.STARTED
708717
check_timeout = True
709718
elif celery_task.status == celery_states.FAILURE:
710-
log.warning(f'Task {celery_task.id:s} failed.')
719+
log.info(f'Task {celery_task.id:s} failed.')
720+
task.celery_state = celery_states.FAILURE
711721
self.close_failed_task(task)
712722
completed_tasks.append(task)
713723
elif celery_task.status == celery_states.SUCCESS:
724+
task.celery_state = celery_states.SUCCESS
714725
task.result = workers.TurbiniaTaskResult.deserialize(celery_task.result)
715726
completed_tasks.append(task)
716727
elif celery_task.status == celery_states.PENDING:
717-
task.status = 'pending'
728+
log.info(f'Task {celery_task.id:s} is pending.')
729+
task.celery_state = celery_states.PENDING
718730
check_timeout = True
719-
log.debug(f'Task {celery_task.id:s} is pending.')
720731
elif celery_task.status == celery_states.RECEIVED:
721-
task.status = 'queued'
732+
log.info(f'Task {celery_task.id:s} is queued.')
733+
task.worker_name = self._get_worker_name(celery_task)
734+
task.celery_state = celery_states.RECEIVED
722735
check_timeout = True
723-
log.debug(f'Task {celery_task.id:s} is queued.')
724736
elif celery_task.status == celery_states.REVOKED:
737+
task.celery_state = celery_states.REVOKED
725738
message = (
726739
f'Celery task {celery_task.id:s} associated with Turbinia '
727740
f'task {task.id} was revoked. This could be caused if the task is '
@@ -746,6 +759,9 @@ def process_tasks(self):
746759
task = self.timeout_task(task, timeout)
747760
completed_tasks.append(task)
748761

762+
# Update task metadata so we have an up to date state.
763+
self.state_manager.update_task(task)
764+
749765
outstanding_task_count = len(self.tasks) - len(completed_tasks)
750766
if outstanding_task_count > 0:
751767
log.info(f'{outstanding_task_count:d} Tasks still outstanding.')

turbinia/tcelery.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def setup(self):
5757
task_default_queue=config.INSTANCE_ID,
5858
# Re-queue task if Celery worker abruptly exists
5959
task_reject_on_worker_lost=True,
60+
task_track_started=True,
6061
worker_cancel_long_running_tasks_on_connection_loss=True,
6162
worker_concurrency=1,
6263
worker_prefetch_multiplier=1,

turbinia/workers/__init__.py

Lines changed: 8 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ class TurbiniaTaskResult:
129129

130130
# The list of attributes that we will persist into storage
131131
STORED_ATTRIBUTES = [
132-
'worker_name', 'report_data', 'report_priority', 'run_time', 'status',
133-
'saved_paths', 'successful', 'evidence_size'
132+
'report_data', 'report_priority', 'run_time', 'status', 'saved_paths',
133+
'successful', 'evidence_size'
134134
]
135135

136136
def __init__(
@@ -454,7 +454,7 @@ class TurbiniaTask:
454454
STORED_ATTRIBUTES = [
455455
'id', 'job_id', 'job_name', 'start_time', 'last_update', 'name',
456456
'evidence_name', 'evidence_id', 'request_id', 'requester', 'group_name',
457-
'reason', 'group_id', 'celery_id'
457+
'reason', 'group_id', 'celery_id', 'celery_state', 'worker_name'
458458
]
459459

460460
# The list of evidence states that are required by a Task in order to run.
@@ -483,6 +483,7 @@ def __init__(
483483

484484
self.id = uuid.uuid4().hex
485485
self.celery_id = None
486+
self.celery_state = None
486487
self.is_finalize_task = False
487488
self.job_id = None
488489
self.job_name = None
@@ -508,6 +509,7 @@ def __init__(
508509
self.group_name = group_name
509510
self.reason = reason
510511
self.group_id = group_id
512+
self.worker_name = None
511513

512514
def serialize(self):
513515
"""Converts the TurbiniaTask object into a serializable dict.
@@ -821,6 +823,7 @@ def setup(self, evidence):
821823
TurbiniaException: If the evidence can not be found.
822824
"""
823825
self.setup_metrics()
826+
self.state_manager = state_manager.get_state_manager()
824827
self.output_manager.setup(self.name, self.id, self.request_id)
825828
self.tmp_dir, self.output_dir = self.output_manager.get_local_output_dirs()
826829
if not self.result:
@@ -1039,21 +1042,10 @@ def run_wrapper(self, evidence):
10391042

10401043
log.debug(f'Task {self.name:s} {self.id:s} awaiting execution')
10411044
failure_message = None
1042-
worker_name = platform.node()
1045+
self.worker_name = platform.node()
10431046
try:
10441047
evidence = evidence_decode(evidence)
10451048
self.result = self.setup(evidence)
1046-
# Call update_task_status to update status
1047-
# We cannot call update_task() here since it will clobber previously
1048-
# stored data by the Turbinia server when the task was created, which is
1049-
# not present in the TurbiniaTask object the worker currently has in its
1050-
# runtime.
1051-
self.update_task_status(self, 'queued')
1052-
# Beucase of the same reason, we perform a single attribute update
1053-
# for the worker name.
1054-
task_key = self.state_manager.redis_client.build_key_name('task', self.id)
1055-
self.state_manager.redis_client.set_attribute(
1056-
task_key, 'worker_name', json.dumps(worker_name))
10571049
turbinia_worker_tasks_queued_total.inc()
10581050
task_runtime_metrics = self.get_metrics()
10591051
except TurbiniaException as exception:
@@ -1108,9 +1100,6 @@ def run_wrapper(self, evidence):
11081100
self._evidence_config = evidence.config
11091101
self.task_config = self.get_task_recipe(evidence.config)
11101102
self.worker_start_time = datetime.now()
1111-
# Update task status so we know which worker the task executed on.
1112-
updated_status = f'Task is running on {worker_name}'
1113-
self.update_task_status(self, updated_status)
11141103
self.result = self.run(evidence, self.result)
11151104

11161105
# pylint: disable=broad-except
@@ -1161,7 +1150,7 @@ def run_wrapper(self, evidence):
11611150
failure_message = (
11621151
'Task Result was auto-closed from task executor on {0:s} likely '
11631152
'due to previous failures. Previous status: [{1:s}]'.format(
1164-
self.result.worker_name, status))
1153+
self.worker_name, status))
11651154
self.result.log(failure_message)
11661155
try:
11671156
self.result.close(self, False, failure_message)
@@ -1197,23 +1186,3 @@ def run(self, evidence, result):
11971186
TurbiniaTaskResult object.
11981187
"""
11991188
raise NotImplementedError
1200-
1201-
def update_task_status(self, task, status=None):
1202-
"""Updates the task status and pushes it directly to datastore.
1203-
1204-
Args:
1205-
task (TurbiniaTask): The calling Task object
1206-
status (str): Brief word or phrase for Task state. If not supplied, the
1207-
existing Task status will be used.
1208-
"""
1209-
if not status:
1210-
return
1211-
if not self.state_manager:
1212-
self.state_manager = state_manager.get_state_manager()
1213-
if self.state_manager:
1214-
task_key = self.state_manager.redis_client.build_key_name('task', task.id)
1215-
self.state_manager.redis_client.set_attribute(
1216-
task_key, 'status', json.dumps(status))
1217-
self.state_manager.update_request_task(task)
1218-
else:
1219-
log.info('No state_manager initialized, not updating Task info')

web/src/components/TaskDetails.vue

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,17 @@ limitations under the License.
3939
<v-alert v-if="taskDetails.successful === true" type="success" prominent>
4040
{{ taskDetails.status }}
4141
</v-alert>
42-
<v-alert v-else-if="taskDetails.successful === null" type="info" prominent>
43-
<div v-if="taskDetails.status">
44-
{{ taskDetails.status }}
45-
</div>
46-
<div v-else>Task {{ taskDetails.id }} is pending</div>
42+
<v-alert v-else-if="taskDetails.successful === false" type="error" prominent>
43+
{{ taskDetails.status }}
44+
</v-alert>
45+
<v-alert v-else-if="taskDetails.celery_state === 'STARTED'" type="info" prominent>
46+
Task {{ taskDetails.id }} is running on {{ taskDetails.worker_name }}
47+
</v-alert>
48+
<v-alert v-else-if="taskDetails.celery_state === 'PENDING'" type="info" prominent>
49+
Task {{ taskDetails.id }} is pending.
50+
</v-alert>
51+
<v-alert v-else-if="taskDetails.celery_state === 'RECEIVED'" type="info" prominent>
52+
Task {{ taskDetails.id }} is queued.
4753
</v-alert>
4854
<v-alert v-else type="error" prominent>
4955
{{ taskDetails.status }}
@@ -72,6 +78,12 @@ limitations under the License.
7278
</div>
7379
<div v-else>N/A</div>
7480
</v-list-item>
81+
<v-list-item title="Celery State:">
82+
<div v-if="taskDetails.celery_state">
83+
{{ taskDetails.celery_state }}
84+
</div>
85+
<div v-else>N/A</div>
86+
</v-list-item>
7587
<v-list-item title="Evidence ID:">
7688
<div v-if="taskDetails.evidence_id">
7789
{{ taskDetails.evidence_id }}

web/src/components/TaskList.vue

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,20 @@ export default {
7373
let data = response.data['tasks']
7474
for (const task in data) {
7575
let task_dict = data[task]
76-
let taskStatusTemp = task_dict.status
76+
let taskStatusTemp = task_dict.celery_state
7777
// As pending status requests show as null or pending
78-
if (taskStatusTemp === null || taskStatusTemp === "pending") {
78+
if (taskStatusTemp === null || taskStatusTemp === "PENDING") {
7979
taskStatusTemp = 'is pending on server.'
8080
}
81-
else if (taskStatusTemp == "queued") {
81+
else if (taskStatusTemp == "RECEIVED") {
8282
taskStatusTemp = 'is queued for execution.'
8383
}
84+
else if (taskStatusTemp == "STARTED") {
85+
taskStatusTemp = 'is running on ' + task_dict.worker_name
86+
}
87+
else {
88+
taskStatusTemp = task_dict.status
89+
}
8490
if (this.filterJobs.length > 0) {
8591
let jobName = task_dict.job_name.toLowerCase()
8692
if (this.radioFilter && !this.filterJobs.includes(jobName)) {

0 commit comments

Comments
 (0)