From 18a048fa2565b04bafc64bf36b63a6fe417b2b08 Mon Sep 17 00:00:00 2001 From: augustak Date: Thu, 27 Jun 2024 11:26:49 +0200 Subject: [PATCH 1/3] Added workflows execute-all --- CHANGELOG.md | 4 ++++ lascli/__version__.py | 2 +- lascli/parser/workflows.py | 27 +++++++++++++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e9147a..551454d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Version 13.3.0 - 2024-06-27 + +- Added `workflows execute-all` which starts an execution on all documents in a dataset + ## Version 13.2.2 - 2024-06-13 - Bugfix `models update-training` now works as intended when specifying `--deployment-environment-id` diff --git a/lascli/__version__.py b/lascli/__version__.py index 07e6414..458dd32 100644 --- a/lascli/__version__.py +++ b/lascli/__version__.py @@ -7,4 +7,4 @@ __maintainer_email__ = 'magnus@lucidtech.ai' __title__ = 'lucidtech-las-cli' __url__ = 'https://github.com/LucidtechAI/las-cli' -__version__ = '13.2.2' +__version__ = '13.3.0' diff --git a/lascli/parser/workflows.py b/lascli/parser/workflows.py index f34f9a9..eaf6ee7 100644 --- a/lascli/parser/workflows.py +++ b/lascli/parser/workflows.py @@ -3,6 +3,7 @@ import pathlib import textwrap from argparse import RawTextHelpFormatter +from functools import partial import dateparser from las import Client @@ -32,6 +33,27 @@ def execute_workflow(las_client: Client, workflow_id, path): return las_client.execute_workflow(workflow_id, content) +def execute_all_workflow(las_client: Client, workflow_id, dataset_id): + list_fn = partial(las_client.list_documents, dataset_id=dataset_id) + list_response = list_fn() + documents = list_response['documents'] + while next_token := list_response['nextToken']: + list_response = list_fn(next_token=next_token) + documents.extend(list_response['documents']) + + executions = [] + for i, document in enumerate(documents): + content = {'documentId': document['documentId'], 'source': 'CLI', 'initialSleepInSeconds': i * 4} + if originalFilePath := document.get('metadata', {}).get('originalFilePath'): + file_path = pathlib.Path(originalFilePath) + content['title'] = file_path.name + execution = las_client.execute_workflow(workflow_id, content) + print(json.dumps(execution, indent=2)) + executions.append(execution) + + return f'Started {len(executions)} executions' + + def list_workflow_executions(las_client: Client, workflow_id, **optional_args): return las_client.list_workflow_executions(workflow_id, **optional_args) @@ -196,6 +218,11 @@ def create_workflows_parser(subparsers): execute_workflow_parser.add_argument('path', help='path to json-file with input to the first state of the workflow') execute_workflow_parser.set_defaults(cmd=execute_workflow) + execute_workflow_parser = subparsers.add_parser('execute-all') + execute_workflow_parser.add_argument('workflow_id') + execute_workflow_parser.add_argument('dataset_id', help='Start execution on all documents in dataset') + execute_workflow_parser.set_defaults(cmd=execute_all_workflow) + list_executions_parser = subparsers.add_parser('list-executions') list_executions_parser.add_argument('workflow_id') list_executions_parser.add_argument('--status', '-s', nargs='+', help='Only return those with the given status') From dfe5d38fda0c8c807a1c6c067cdd9325f1ab57fe Mon Sep 17 00:00:00 2001 From: augustak Date: Thu, 27 Jun 2024 12:34:02 +0200 Subject: [PATCH 2/3] Update --- lascli/parser/datasets.py | 6 +++--- lascli/parser/workflows.py | 16 +++++----------- tests/test_workflow_executions.py | 10 ++++++++++ 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/lascli/parser/datasets.py b/lascli/parser/datasets.py index bb3cf0a..dba9b2d 100644 --- a/lascli/parser/datasets.py +++ b/lascli/parser/datasets.py @@ -121,7 +121,7 @@ def _get_document_worker(las_client: Client, document_id, output_dir): return None -def _list_all_documents_in_dataset(las_client: Client, dataset_id): +def list_all_documents_in_dataset(las_client: Client, dataset_id): list_response = las_client.list_documents(dataset_id=dataset_id) yield from list_response['documents'] next_token = list_response.get('nextToken') @@ -332,7 +332,7 @@ def get_documents(las_client: Client, dataset_id, output_dir, num_threads, chunk already_downloaded_from_dataset = set() with ThreadPoolExecutor(max_workers=num_threads) as executor: documents = [] - for document in _list_all_documents_in_dataset(las_client, dataset_id): + for document in list_all_documents_in_dataset(las_client, dataset_id): if document['documentId'] in already_downloaded: already_downloaded_from_dataset.add(document['documentId']) else: @@ -480,7 +480,7 @@ def create_datasets_parser(subparsers): "options": {} (optional) }, ... - ] + ] Examples: [{"type": "remove-duplicates", "options": {}}] ''')) diff --git a/lascli/parser/workflows.py b/lascli/parser/workflows.py index eaf6ee7..050360c 100644 --- a/lascli/parser/workflows.py +++ b/lascli/parser/workflows.py @@ -8,8 +8,9 @@ import dateparser from las import Client -from lascli.util import nullable, NotProvided, json_path, json_or_json_path +from .datasets import list_all_documents_in_dataset from lascli.actions import workflows +from lascli.util import nullable, NotProvided, json_path, json_or_json_path def list_workflows(las_client: Client, **optional_args): @@ -34,18 +35,11 @@ def execute_workflow(las_client: Client, workflow_id, path): def execute_all_workflow(las_client: Client, workflow_id, dataset_id): - list_fn = partial(las_client.list_documents, dataset_id=dataset_id) - list_response = list_fn() - documents = list_response['documents'] - while next_token := list_response['nextToken']: - list_response = list_fn(next_token=next_token) - documents.extend(list_response['documents']) - executions = [] - for i, document in enumerate(documents): + for i, document in enumerate(list_all_documents_in_dataset(las_client, dataset_id)): content = {'documentId': document['documentId'], 'source': 'CLI', 'initialSleepInSeconds': i * 4} - if originalFilePath := document.get('metadata', {}).get('originalFilePath'): - file_path = pathlib.Path(originalFilePath) + if original_file_path := (document.get('metadata') or {}).get('originalFilePath'): + file_path = pathlib.Path(original_file_path) content['title'] = file_path.name execution = las_client.execute_workflow(workflow_id, content) print(json.dumps(execution, indent=2)) diff --git a/tests/test_workflow_executions.py b/tests/test_workflow_executions.py index 6520f41..7ea229f 100644 --- a/tests/test_workflow_executions.py +++ b/tests/test_workflow_executions.py @@ -12,6 +12,16 @@ def test_executions_create(parser, client): util.main_parser(parser, client, args) +def test_executions_create_all(parser, client): + args = [ + 'workflows', + 'execute-all', + service.create_workflow_id(), + service.create_dataset_id(), + ] + util.main_parser(parser, client, args) + + @pytest.mark.parametrize('sort_by', [ ('--sort-by', 'startTime'), ('--sort-by', 'endTime'), From a6eb3c74d1c47814579685ae16d25928ba86301c Mon Sep 17 00:00:00 2001 From: augustak Date: Thu, 27 Jun 2024 12:42:46 +0200 Subject: [PATCH 3/3] Update --- lascli/parser/workflows.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lascli/parser/workflows.py b/lascli/parser/workflows.py index 050360c..763123c 100644 --- a/lascli/parser/workflows.py +++ b/lascli/parser/workflows.py @@ -2,6 +2,7 @@ import json import pathlib import textwrap +import time from argparse import RawTextHelpFormatter from functools import partial @@ -42,8 +43,9 @@ def execute_all_workflow(las_client: Client, workflow_id, dataset_id): file_path = pathlib.Path(original_file_path) content['title'] = file_path.name execution = las_client.execute_workflow(workflow_id, content) - print(json.dumps(execution, indent=2)) executions.append(execution) + print(f'Execution {execution["executionId"]} started on {document["documentId"]}') + time.sleep(1) return f'Started {len(executions)} executions'