|
20 | 20 | from datetime import datetime
|
21 | 21 | from datetime import timedelta
|
22 | 22 |
|
23 |
| -import httplib2 |
24 |
| -import json |
25 | 23 | import logging
|
26 | 24 | from operator import itemgetter
|
27 | 25 | from operator import attrgetter
|
28 | 26 | import os
|
29 | 27 | import time
|
30 | 28 |
|
31 |
| -from google import auth |
32 | 29 | from turbinia import config
|
33 | 30 | from turbinia.config import logger
|
34 | 31 | from turbinia.config import DATETIME_FORMAT
|
@@ -67,9 +64,7 @@ def get_turbinia_client():
|
67 | 64 | """
|
68 | 65 | # pylint: disable=no-else-return
|
69 | 66 | setup(is_client=True)
|
70 |
| - if config.TASK_MANAGER.lower() == 'psq': |
71 |
| - return BaseTurbiniaClient() |
72 |
| - elif config.TASK_MANAGER.lower() == 'celery': |
| 67 | + if config.TASK_MANAGER.lower() == 'celery': |
73 | 68 | return TurbiniaCeleryClient()
|
74 | 69 | else:
|
75 | 70 | msg = f'Task Manager type "{config.TASK_MANAGER:s}" not implemented'
|
@@ -340,123 +335,6 @@ def wait_for_request(
|
340 | 335 |
|
341 | 336 | log.info(f'All {len(task_results):d} Tasks completed')
|
342 | 337 |
|
343 |
| - def get_task_data( |
344 |
| - self, instance, project, region, days=0, task_id=None, request_id=None, |
345 |
| - group_id=None, user=None, function_name='gettasks', output_json=False): |
346 |
| - """Gets task data from Google Cloud Functions. |
347 |
| -
|
348 |
| - Args: |
349 |
| - instance (string): The Turbinia instance name (by default the same as the |
350 |
| - INSTANCE_ID in the config). |
351 |
| - project (string): The name of the project. |
352 |
| - region (string): The name of the region to execute in. |
353 |
| - days (int): The number of days we want history for. |
354 |
| - task_id (string): The Id of the task. |
355 |
| - group_id (string): The group Id of the requests. |
356 |
| - request_id (string): The Id of the request we want tasks for. |
357 |
| - user (string): The user of the request we want tasks for. |
358 |
| - function_name (string): The GCF function we want to call. |
359 |
| - output_json (bool): Whether to return JSON output. |
360 |
| -
|
361 |
| - Returns: |
362 |
| - (List|JSON string) of Task dict objects |
363 |
| - """ |
364 |
| - cloud_function = gcp_function.GoogleCloudFunction(project) |
365 |
| - func_args = {'instance': instance, 'kind': 'TurbiniaTask'} |
366 |
| - |
367 |
| - if days: |
368 |
| - start_time = datetime.now() - timedelta(days=days) |
369 |
| - # Format this like '1990-01-01T00:00:00z' so we can cast it directly to a |
370 |
| - # javascript Date() object in the cloud function. |
371 |
| - start_string = start_time.strftime(DATETIME_FORMAT) |
372 |
| - func_args.update({'start_time': start_string}) |
373 |
| - elif task_id: |
374 |
| - func_args.update({'task_id': task_id}) |
375 |
| - elif group_id: |
376 |
| - func_args.update({'group_id': group_id}) |
377 |
| - elif request_id: |
378 |
| - func_args.update({'request_id': request_id}) |
379 |
| - |
380 |
| - if user: |
381 |
| - func_args.update({'user': user}) |
382 |
| - |
383 |
| - response = {} |
384 |
| - retry_count = 0 |
385 |
| - credential_error_count = 0 |
386 |
| - while not response and retry_count < MAX_RETRIES: |
387 |
| - try: |
388 |
| - response = cloud_function.ExecuteFunction( |
389 |
| - function_name, region, func_args) |
390 |
| - except auth.exceptions.RefreshError as exception: |
391 |
| - if credential_error_count == 0: |
392 |
| - log.info( |
393 |
| - 'GCP Credentials need to be refreshed by running gcloud auth ' |
394 |
| - 'application-default login, please refresh in another terminal ' |
395 |
| - 'and run turbiniactl -w status -r {0!s} and this process will ' |
396 |
| - 'resume. Error: {1!s}'.format(request_id, exception)) |
397 |
| - else: |
398 |
| - log.debug( |
399 |
| - 'GCP Credentials need to be refreshed by running gcloud auth ' |
400 |
| - 'application-default login, please refresh in another terminal ' |
401 |
| - 'and run turbiniactl -w status -r {0!s} and this process will ' |
402 |
| - 'resume. Attempt {1:d}. Error: ' |
403 |
| - '{2!s}'.format(request_id, credential_error_count + 1, exception)) |
404 |
| - # Note, we are intentionally not incrementing the retry_count here because |
405 |
| - # we will retry indefinitely while we wait for the user to reauth. |
406 |
| - credential_error_count += 1 |
407 |
| - except httplib2.ServerNotFoundError as exception: |
408 |
| - log.info( |
409 |
| - 'Error connecting to server, will retry [{0:d} of {1:d} retries]: ' |
410 |
| - '{2!s}'.format(retry_count, MAX_RETRIES, exception)) |
411 |
| - retry_count += 1 |
412 |
| - |
413 |
| - if not response: |
414 |
| - retry_count += 1 |
415 |
| - time.sleep(RETRY_SLEEP) |
416 |
| - elif response.get('error', {}).get('code') == 503: |
417 |
| - log.warning( |
418 |
| - 'Retriable error response from cloud functions: [{0!s}]'.format( |
419 |
| - response.get('error'))) |
420 |
| - retry_count += 1 |
421 |
| - response = {} |
422 |
| - time.sleep(RETRY_SLEEP) |
423 |
| - |
424 |
| - if not response or 'result' not in response: |
425 |
| - log.error('No results found') |
426 |
| - if response.get('error'): |
427 |
| - msg = f"Error executing Cloud Function: [{response.get('error')!s}]." |
428 |
| - log.error(msg) |
429 |
| - log.debug(f'Invalid or empty GCF response: {response!s}') |
430 |
| - raise TurbiniaException( |
431 |
| - f'Cloud Function {function_name:s} returned no results.') |
432 |
| - |
433 |
| - try: |
434 |
| - results = json.loads(response.get('result')) |
435 |
| - except (TypeError, ValueError) as exception: |
436 |
| - raise TurbiniaException( |
437 |
| - 'Could not deserialize result [{0!s}] from GCF: [{1!s}]'.format( |
438 |
| - response.get('result'), exception)) |
439 |
| - |
440 |
| - task_data = results[0] |
441 |
| - if output_json: |
442 |
| - try: |
443 |
| - json_data = json.dumps(task_data) |
444 |
| - except (TypeError, ValueError) as exception: |
445 |
| - raise TurbiniaException( |
446 |
| - 'Could not re-serialize result [{0!s}] from GCF: [{1!s}]'.format( |
447 |
| - str(task_data), exception)) |
448 |
| - return json_data |
449 |
| - |
450 |
| - # Convert run_time/last_update back into datetime objects |
451 |
| - for task in task_data: |
452 |
| - if task.get('run_time'): |
453 |
| - task['run_time'] = timedelta(seconds=task['run_time']) |
454 |
| - if task.get('last_update'): |
455 |
| - task['last_update'] = datetime.strptime( |
456 |
| - task['last_update'], DATETIME_FORMAT) |
457 |
| - |
458 |
| - return task_data |
459 |
| - |
460 | 338 | def format_task_detail(self, task, show_files=False):
|
461 | 339 | """Formats a single task in detail.
|
462 | 340 |
|
@@ -1019,36 +897,7 @@ def send_request(self, request):
|
1019 | 897 | Args:
|
1020 | 898 | request: A TurbiniaRequest object.
|
1021 | 899 | """
|
1022 |
| - self.task_manager.server_pubsub.send_request(request) |
1023 |
| - |
1024 |
| - def close_tasks( |
1025 |
| - self, instance, project, region, request_id=None, task_id=None, user=None, |
1026 |
| - requester=None): |
1027 |
| - """Close Turbinia Tasks based on Request ID. |
1028 |
| -
|
1029 |
| - Args: |
1030 |
| - instance (string): The Turbinia instance name (by default the same as the |
1031 |
| - INSTANCE_ID in the config). |
1032 |
| - project (string): The name of the project. |
1033 |
| - region (string): The name of the zone to execute in. |
1034 |
| - request_id (string): The Id of the request we want tasks for. |
1035 |
| - task_id (string): The Id of the request we want task for. |
1036 |
| - user (string): The user of the request we want tasks for. |
1037 |
| - requester (string): The user making the request to close tasks. |
1038 |
| -
|
1039 |
| - Returns: String of closed Task IDs. |
1040 |
| - """ |
1041 |
| - cloud_function = gcp_function.GoogleCloudFunction(project) |
1042 |
| - func_args = { |
1043 |
| - 'instance': instance, |
1044 |
| - 'kind': 'TurbiniaTask', |
1045 |
| - 'request_id': request_id, |
1046 |
| - 'task_id': task_id, |
1047 |
| - 'user': user, |
1048 |
| - 'requester': requester |
1049 |
| - } |
1050 |
| - response = cloud_function.ExecuteFunction('closetasks', region, func_args) |
1051 |
| - return f"Closed Task IDs: {response.get('result')}" |
| 900 | + pass |
1052 | 901 |
|
1053 | 902 |
|
1054 | 903 | class TurbiniaCeleryClient(BaseTurbiniaClient):
|
|
0 commit comments