From ac05a60dc488e8655a7721d42f11914ac291ae06 Mon Sep 17 00:00:00 2001 From: Hailey Date: Thu, 16 Oct 2025 12:28:35 -0700 Subject: [PATCH 01/14] make execution store configurable --- .../execution_result_store_chooser.py | 43 +++++++++++++++++++ .../worker/_stdlibplugin/sink_register.py | 13 ++++-- .../src/osprey/worker/lib/storage/__init__.py | 30 +++++++++++++ .../lib/storage/stored_execution_result.py | 16 +++++-- .../src/osprey/worker/ui_api/osprey/cli.py | 3 ++ 5 files changed, 99 insertions(+), 6 deletions(-) create mode 100644 osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py b/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py new file mode 100644 index 00000000..708d3d83 --- /dev/null +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py @@ -0,0 +1,43 @@ +from typing import Optional + +from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store +from osprey.worker.lib.singletons import CONFIG +from osprey.worker.lib.storage import ExecutionResultStoreType +from osprey.worker.lib.storage.stored_execution_result import ( + ExecutionResultStore, + StoredExecutionResultBigTable, + StoredExecutionResultGCS, + StoredExecutionResultMinIO, +) + + +def get_rules_execution_result_store( + execution_result_store_type: ExecutionResultStoreType, +) -> Optional[ExecutionResultStore]: + """Based on the `execution_result_store_type` constructs a configured execution result store that can be used to store execution + results. For more details, see `ExecutionResultStore`.""" + + config = CONFIG.instance() + + if execution_result_store_type == ExecutionResultStoreType.BIGTABLE: + return StoredExecutionResultBigTable() + elif execution_result_store_type == ExecutionResultStoreType.GCS: + return StoredExecutionResultGCS() + elif execution_result_store_type == ExecutionResultStoreType.MINIO: + endpoint = config.get_str('OSPREY_MINIO_ENDPOINT', 'minio:9000') + access_key = config.get_str('OSPREY_MINIO_ACCESS_KEY', 'minioadmin') + secret_key = config.get_str('OSPREY_MINIO_SECRET_KEY', 'minioadmin123') + secure = config.get_bool('OSPREY_MINIO_SECURE', False) + bucket_name = config.get_str('OSPREY_MINIO_EXECUTION_RESULTS_BUCKET', 'execution-output') + + return StoredExecutionResultMinIO( + endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, bucket_name=bucket_name + ) + elif execution_result_store_type == ExecutionResultStoreType.PLUGIN: + store = bootstrap_execution_result_store(config=config) + if store is None: + raise AssertionError('No execution result store registered') + elif execution_result_store_type == ExecutionResultStoreType.NONE: + return None + + return None diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py index 4597f7b0..79f1a502 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py @@ -1,8 +1,10 @@ from typing import List, Sequence from kafka import KafkaProducer -from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store, hookimpl_osprey +from osprey.worker.adaptor.plugin_manager import hookimpl_osprey from osprey.worker.lib.config import Config +from osprey.worker.lib.storage import ExecutionResultStoreType +from osprey.worker.lib.storage.stored_execution_result import get_rules_execution_result_store from osprey.worker.sinks.sink.kafka_output_sink import KafkaOutputSink from osprey.worker.sinks.sink.output_sink import BaseOutputSink, StdoutOutputSink from osprey.worker.sinks.sink.stored_execution_result_output_sink import StoredExecutionResultOutputSink @@ -23,7 +25,12 @@ def register_output_sinks(config: Config) -> Sequence[BaseOutputSink]: kafka_producer=KafkaProducer(bootstrap_servers=bootstrap_servers, client_id=client_id), ) ) - execution_result_store = bootstrap_execution_result_store(config=config) - if execution_result_store is not None: + + store_backend = ExecutionResultStoreType(config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none')) + execution_result_store = get_rules_execution_result_store(execution_result_store_type=store_backend) + + # There may not be an execution result store configured, so check before adding the output sink + if execution_result_store: sinks.append(StoredExecutionResultOutputSink()) + return sinks diff --git a/osprey_worker/src/osprey/worker/lib/storage/__init__.py b/osprey_worker/src/osprey/worker/lib/storage/__init__.py index e69de29b..a367d4b5 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/__init__.py +++ b/osprey_worker/src/osprey/worker/lib/storage/__init__.py @@ -0,0 +1,30 @@ +from enum import StrEnum, auto + + +class ExecutionResultStoreType(StrEnum): + """Type of store used for execution results.""" + + BIGTABLE = auto() + """ + Bigtable execution result store + """ + + GCS = auto() + """ + Google Cloud Storage execution result store + """ + + MINIO = auto() + """ + Minio execution result store + """ + + PLUGIN = auto() + """ + Execution result store that is sdefined via register_execution_result_store + """ + + NONE = auto() + """ + Disable execution results from being stored + """ diff --git a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py index 0e990421..e2443126 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py +++ b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py @@ -16,9 +16,11 @@ from minio import Minio from minio.error import S3Error from osprey.engine.executor.execution_context import ExecutionResult +from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_store from osprey.worker.lib.instruments import metrics from osprey.worker.lib.osprey_shared.logging import get_logger from osprey.worker.lib.snowflake import Snowflake +from osprey.worker.lib.storage import ExecutionResultStoreType from osprey.worker.lib.storage.bigtable import osprey_bigtable from pydantic.main import BaseModel @@ -480,11 +482,19 @@ def get_many( return StoredExecutionResult.get_many(action_ids, self._storage_backend, data_censor_abilities) -def bootstrap_execution_result_storage_service() -> ExecutionResultStorageService: +def bootstrap_execution_result_storage_service() -> Optional[ExecutionResultStorageService]: """Create an ExecutionResultStorageService with the configured storage backend.""" - from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store from osprey.worker.lib.singletons import CONFIG config = CONFIG.instance() - storage_backend = bootstrap_execution_result_store(config) + + storage_backend = get_rules_execution_result_store( + execution_result_store_type=ExecutionResultStoreType( + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none') + ) + ) + + if not storage_backend: + raise AssertionError('No storage backend registered') + return ExecutionResultStorageService(storage_backend) diff --git a/osprey_worker/src/osprey/worker/ui_api/osprey/cli.py b/osprey_worker/src/osprey/worker/ui_api/osprey/cli.py index 60b4274e..8fec101c 100644 --- a/osprey_worker/src/osprey/worker/ui_api/osprey/cli.py +++ b/osprey_worker/src/osprey/worker/ui_api/osprey/cli.py @@ -1,4 +1,5 @@ # ruff: noqa: E402 +from google.cloud import storage from osprey.worker.lib.patcher import patch_all # isort: skip patch_all() # please ensure this occurs before *any* other imports ! @@ -83,6 +84,8 @@ def row(event: StoredExecutionResult) -> Dict[str, str]: return {'id': d['id'], 'timestamp': d['timestamp'], 'action_data': d['action_data']} storage_service = bootstrap_execution_result_storage_service() + if not storage_service: + raise druid_result = query_druid() while druid_result.action_ids: events = storage_service.get_many(action_ids=druid_result.action_ids) From 1ea7404d17c9d3467b78cb40a3d395db65ce246e Mon Sep 17 00:00:00 2001 From: Hailey Date: Thu, 16 Oct 2025 12:31:47 -0700 Subject: [PATCH 02/14] rm unnecessary optional type --- .../src/osprey/worker/lib/storage/stored_execution_result.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py index e2443126..686884d3 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py +++ b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py @@ -482,7 +482,7 @@ def get_many( return StoredExecutionResult.get_many(action_ids, self._storage_backend, data_censor_abilities) -def bootstrap_execution_result_storage_service() -> Optional[ExecutionResultStorageService]: +def bootstrap_execution_result_storage_service() -> ExecutionResultStorageService: """Create an ExecutionResultStorageService with the configured storage backend.""" from osprey.worker.lib.singletons import CONFIG From d27951da6af4e0d81a396065222de47fb97e2ba6 Mon Sep 17 00:00:00 2001 From: Hailey Date: Thu, 16 Oct 2025 12:33:36 -0700 Subject: [PATCH 03/14] nit: naming --- .../execution_result_store_chooser.py | 16 ++++++++-------- .../osprey/worker/_stdlibplugin/sink_register.py | 10 ++++++---- .../src/osprey/worker/lib/storage/__init__.py | 2 +- .../lib/storage/stored_execution_result.py | 10 ++++------ 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py b/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py index 708d3d83..7a347813 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py @@ -2,7 +2,7 @@ from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store from osprey.worker.lib.singletons import CONFIG -from osprey.worker.lib.storage import ExecutionResultStoreType +from osprey.worker.lib.storage import ExecutionResultStoreBackendType from osprey.worker.lib.storage.stored_execution_result import ( ExecutionResultStore, StoredExecutionResultBigTable, @@ -11,19 +11,19 @@ ) -def get_rules_execution_result_store( - execution_result_store_type: ExecutionResultStoreType, +def get_rules_execution_result_store_backend( + backend_type: ExecutionResultStoreBackendType, ) -> Optional[ExecutionResultStore]: """Based on the `execution_result_store_type` constructs a configured execution result store that can be used to store execution results. For more details, see `ExecutionResultStore`.""" config = CONFIG.instance() - if execution_result_store_type == ExecutionResultStoreType.BIGTABLE: + if backend_type == ExecutionResultStoreBackendType.BIGTABLE: return StoredExecutionResultBigTable() - elif execution_result_store_type == ExecutionResultStoreType.GCS: + elif backend_type == ExecutionResultStoreBackendType.GCS: return StoredExecutionResultGCS() - elif execution_result_store_type == ExecutionResultStoreType.MINIO: + elif backend_type == ExecutionResultStoreBackendType.MINIO: endpoint = config.get_str('OSPREY_MINIO_ENDPOINT', 'minio:9000') access_key = config.get_str('OSPREY_MINIO_ACCESS_KEY', 'minioadmin') secret_key = config.get_str('OSPREY_MINIO_SECRET_KEY', 'minioadmin123') @@ -33,11 +33,11 @@ def get_rules_execution_result_store( return StoredExecutionResultMinIO( endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, bucket_name=bucket_name ) - elif execution_result_store_type == ExecutionResultStoreType.PLUGIN: + elif backend_type == ExecutionResultStoreBackendType.PLUGIN: store = bootstrap_execution_result_store(config=config) if store is None: raise AssertionError('No execution result store registered') - elif execution_result_store_type == ExecutionResultStoreType.NONE: + elif backend_type == ExecutionResultStoreBackendType.NONE: return None return None diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py index 79f1a502..b864a2c8 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py @@ -3,8 +3,8 @@ from kafka import KafkaProducer from osprey.worker.adaptor.plugin_manager import hookimpl_osprey from osprey.worker.lib.config import Config -from osprey.worker.lib.storage import ExecutionResultStoreType -from osprey.worker.lib.storage.stored_execution_result import get_rules_execution_result_store +from osprey.worker.lib.storage import ExecutionResultStoreBackendType +from osprey.worker.lib.storage.stored_execution_result import get_rules_execution_result_store_backend from osprey.worker.sinks.sink.kafka_output_sink import KafkaOutputSink from osprey.worker.sinks.sink.output_sink import BaseOutputSink, StdoutOutputSink from osprey.worker.sinks.sink.stored_execution_result_output_sink import StoredExecutionResultOutputSink @@ -26,8 +26,10 @@ def register_output_sinks(config: Config) -> Sequence[BaseOutputSink]: ) ) - store_backend = ExecutionResultStoreType(config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none')) - execution_result_store = get_rules_execution_result_store(execution_result_store_type=store_backend) + storage_backend_type = ExecutionResultStoreBackendType( + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none') + ) + execution_result_store = get_rules_execution_result_store_backend(backend_type=storage_backend_type) # There may not be an execution result store configured, so check before adding the output sink if execution_result_store: diff --git a/osprey_worker/src/osprey/worker/lib/storage/__init__.py b/osprey_worker/src/osprey/worker/lib/storage/__init__.py index a367d4b5..93ea3266 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/__init__.py +++ b/osprey_worker/src/osprey/worker/lib/storage/__init__.py @@ -1,7 +1,7 @@ from enum import StrEnum, auto -class ExecutionResultStoreType(StrEnum): +class ExecutionResultStoreBackendType(StrEnum): """Type of store used for execution results.""" BIGTABLE = auto() diff --git a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py index 686884d3..04c35a8d 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py +++ b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py @@ -16,11 +16,11 @@ from minio import Minio from minio.error import S3Error from osprey.engine.executor.execution_context import ExecutionResult -from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_store +from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_store_backend from osprey.worker.lib.instruments import metrics from osprey.worker.lib.osprey_shared.logging import get_logger from osprey.worker.lib.snowflake import Snowflake -from osprey.worker.lib.storage import ExecutionResultStoreType +from osprey.worker.lib.storage import ExecutionResultStoreBackendType from osprey.worker.lib.storage.bigtable import osprey_bigtable from pydantic.main import BaseModel @@ -488,10 +488,8 @@ def bootstrap_execution_result_storage_service() -> ExecutionResultStorageServic config = CONFIG.instance() - storage_backend = get_rules_execution_result_store( - execution_result_store_type=ExecutionResultStoreType( - config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none') - ) + storage_backend = get_rules_execution_result_store_backend( + backend_type=ExecutionResultStoreBackendType(config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none')) ) if not storage_backend: From cd650af96a0f4e378972f93f44e6bfff6793ff7e Mon Sep 17 00:00:00 2001 From: Hailey Date: Thu, 16 Oct 2025 12:35:10 -0700 Subject: [PATCH 04/14] tidy --- osprey_worker/src/osprey/worker/ui_api/osprey/cli.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/osprey_worker/src/osprey/worker/ui_api/osprey/cli.py b/osprey_worker/src/osprey/worker/ui_api/osprey/cli.py index 8fec101c..60b4274e 100644 --- a/osprey_worker/src/osprey/worker/ui_api/osprey/cli.py +++ b/osprey_worker/src/osprey/worker/ui_api/osprey/cli.py @@ -1,5 +1,4 @@ # ruff: noqa: E402 -from google.cloud import storage from osprey.worker.lib.patcher import patch_all # isort: skip patch_all() # please ensure this occurs before *any* other imports ! @@ -84,8 +83,6 @@ def row(event: StoredExecutionResult) -> Dict[str, str]: return {'id': d['id'], 'timestamp': d['timestamp'], 'action_data': d['action_data']} storage_service = bootstrap_execution_result_storage_service() - if not storage_service: - raise druid_result = query_druid() while druid_result.action_ids: events = storage_service.get_many(action_ids=druid_result.action_ids) From ff5834d32c7f5f69a06aea721fe431f7e9876d21 Mon Sep 17 00:00:00 2001 From: Hailey Date: Thu, 16 Oct 2025 12:41:06 -0700 Subject: [PATCH 05/14] nit: naming again --- .../execution_result_store_chooser.py | 18 +++++++++--------- .../worker/_stdlibplugin/sink_register.py | 8 ++++---- .../src/osprey/worker/lib/storage/__init__.py | 2 +- .../lib/storage/stored_execution_result.py | 10 ++++++---- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py b/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py index 7a347813..a38c394e 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py @@ -2,7 +2,7 @@ from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store from osprey.worker.lib.singletons import CONFIG -from osprey.worker.lib.storage import ExecutionResultStoreBackendType +from osprey.worker.lib.storage import ExecutionResultStorageBackendType from osprey.worker.lib.storage.stored_execution_result import ( ExecutionResultStore, StoredExecutionResultBigTable, @@ -11,19 +11,19 @@ ) -def get_rules_execution_result_store_backend( - backend_type: ExecutionResultStoreBackendType, +def get_rules_execution_result_storage_backend( + backend_type: ExecutionResultStorageBackendType, ) -> Optional[ExecutionResultStore]: - """Based on the `execution_result_store_type` constructs a configured execution result store that can be used to store execution + """Based on the `backend_type` constructs a configured execution result store that can be used to store execution results. For more details, see `ExecutionResultStore`.""" config = CONFIG.instance() - if backend_type == ExecutionResultStoreBackendType.BIGTABLE: + if backend_type == ExecutionResultStorageBackendType.BIGTABLE: return StoredExecutionResultBigTable() - elif backend_type == ExecutionResultStoreBackendType.GCS: + elif backend_type == ExecutionResultStorageBackendType.GCS: return StoredExecutionResultGCS() - elif backend_type == ExecutionResultStoreBackendType.MINIO: + elif backend_type == ExecutionResultStorageBackendType.MINIO: endpoint = config.get_str('OSPREY_MINIO_ENDPOINT', 'minio:9000') access_key = config.get_str('OSPREY_MINIO_ACCESS_KEY', 'minioadmin') secret_key = config.get_str('OSPREY_MINIO_SECRET_KEY', 'minioadmin123') @@ -33,11 +33,11 @@ def get_rules_execution_result_store_backend( return StoredExecutionResultMinIO( endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, bucket_name=bucket_name ) - elif backend_type == ExecutionResultStoreBackendType.PLUGIN: + elif backend_type == ExecutionResultStorageBackendType.PLUGIN: store = bootstrap_execution_result_store(config=config) if store is None: raise AssertionError('No execution result store registered') - elif backend_type == ExecutionResultStoreBackendType.NONE: + elif backend_type == ExecutionResultStorageBackendType.NONE: return None return None diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py index b864a2c8..0d3f0ed8 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py @@ -3,8 +3,8 @@ from kafka import KafkaProducer from osprey.worker.adaptor.plugin_manager import hookimpl_osprey from osprey.worker.lib.config import Config -from osprey.worker.lib.storage import ExecutionResultStoreBackendType -from osprey.worker.lib.storage.stored_execution_result import get_rules_execution_result_store_backend +from osprey.worker.lib.storage import ExecutionResultStorageBackendType +from osprey.worker.lib.storage.stored_execution_result import get_rules_execution_result_storage_backend from osprey.worker.sinks.sink.kafka_output_sink import KafkaOutputSink from osprey.worker.sinks.sink.output_sink import BaseOutputSink, StdoutOutputSink from osprey.worker.sinks.sink.stored_execution_result_output_sink import StoredExecutionResultOutputSink @@ -26,10 +26,10 @@ def register_output_sinks(config: Config) -> Sequence[BaseOutputSink]: ) ) - storage_backend_type = ExecutionResultStoreBackendType( + storage_backend_type = ExecutionResultStorageBackendType( config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none') ) - execution_result_store = get_rules_execution_result_store_backend(backend_type=storage_backend_type) + execution_result_store = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) # There may not be an execution result store configured, so check before adding the output sink if execution_result_store: diff --git a/osprey_worker/src/osprey/worker/lib/storage/__init__.py b/osprey_worker/src/osprey/worker/lib/storage/__init__.py index 93ea3266..fa6e69df 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/__init__.py +++ b/osprey_worker/src/osprey/worker/lib/storage/__init__.py @@ -1,7 +1,7 @@ from enum import StrEnum, auto -class ExecutionResultStoreBackendType(StrEnum): +class ExecutionResultStorageBackendType(StrEnum): """Type of store used for execution results.""" BIGTABLE = auto() diff --git a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py index 04c35a8d..0b467a68 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py +++ b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py @@ -16,11 +16,11 @@ from minio import Minio from minio.error import S3Error from osprey.engine.executor.execution_context import ExecutionResult -from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_store_backend +from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend from osprey.worker.lib.instruments import metrics from osprey.worker.lib.osprey_shared.logging import get_logger from osprey.worker.lib.snowflake import Snowflake -from osprey.worker.lib.storage import ExecutionResultStoreBackendType +from osprey.worker.lib.storage import ExecutionResultStorageBackendType from osprey.worker.lib.storage.bigtable import osprey_bigtable from pydantic.main import BaseModel @@ -488,8 +488,10 @@ def bootstrap_execution_result_storage_service() -> ExecutionResultStorageServic config = CONFIG.instance() - storage_backend = get_rules_execution_result_store_backend( - backend_type=ExecutionResultStoreBackendType(config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none')) + storage_backend = get_rules_execution_result_storage_backend( + backend_type=ExecutionResultStorageBackendType( + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none') + ) ) if not storage_backend: From 9a23a78d65c7c1a7ccbf6a02b58c7e1896c5eabb Mon Sep 17 00:00:00 2001 From: Hailey Date: Thu, 16 Oct 2025 12:43:42 -0700 Subject: [PATCH 06/14] type --- osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py | 2 +- .../src/osprey/worker/lib/storage/stored_execution_result.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py index 0d3f0ed8..79a9fbd0 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py @@ -32,7 +32,7 @@ def register_output_sinks(config: Config) -> Sequence[BaseOutputSink]: execution_result_store = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) # There may not be an execution result store configured, so check before adding the output sink - if execution_result_store: + if execution_result_store is not None: sinks.append(StoredExecutionResultOutputSink()) return sinks diff --git a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py index 0b467a68..27348672 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py +++ b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py @@ -494,7 +494,7 @@ def bootstrap_execution_result_storage_service() -> ExecutionResultStorageServic ) ) - if not storage_backend: + if storage_backend is None: raise AssertionError('No storage backend registered') return ExecutionResultStorageService(storage_backend) From a2ff50cf0e323c2080a26922de767d99c96ea4b8 Mon Sep 17 00:00:00 2001 From: Hailey Date: Thu, 16 Oct 2025 12:53:59 -0700 Subject: [PATCH 07/14] nit: naming yet again --- .../src/osprey/worker/_stdlibplugin/sink_register.py | 4 ++-- .../osprey/worker/lib/storage/stored_execution_result.py | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py index 79a9fbd0..6e78030c 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py @@ -29,10 +29,10 @@ def register_output_sinks(config: Config) -> Sequence[BaseOutputSink]: storage_backend_type = ExecutionResultStorageBackendType( config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none') ) - execution_result_store = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) + storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) # There may not be an execution result store configured, so check before adding the output sink - if execution_result_store is not None: + if storage_backend is not None: sinks.append(StoredExecutionResultOutputSink()) return sinks diff --git a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py index 27348672..843cd2f6 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py +++ b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py @@ -488,11 +488,10 @@ def bootstrap_execution_result_storage_service() -> ExecutionResultStorageServic config = CONFIG.instance() - storage_backend = get_rules_execution_result_storage_backend( - backend_type=ExecutionResultStorageBackendType( - config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none') - ) + storage_backend_type = ExecutionResultStorageBackendType( + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none') ) + storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) if storage_backend is None: raise AssertionError('No storage backend registered') From 715a3324c561f58fc49d10f1470b577d0db36b61 Mon Sep 17 00:00:00 2001 From: Hailey Date: Thu, 16 Oct 2025 13:02:42 -0700 Subject: [PATCH 08/14] circular import --- .../src/osprey/worker/lib/storage/stored_execution_result.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py index 843cd2f6..55844ce2 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py +++ b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py @@ -16,7 +16,6 @@ from minio import Minio from minio.error import S3Error from osprey.engine.executor.execution_context import ExecutionResult -from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend from osprey.worker.lib.instruments import metrics from osprey.worker.lib.osprey_shared.logging import get_logger from osprey.worker.lib.snowflake import Snowflake @@ -484,6 +483,7 @@ def get_many( def bootstrap_execution_result_storage_service() -> ExecutionResultStorageService: """Create an ExecutionResultStorageService with the configured storage backend.""" + from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend from osprey.worker.lib.singletons import CONFIG config = CONFIG.instance() From 29b89ec2eb16caf1a093f10d2f629a5e3d2e45d1 Mon Sep 17 00:00:00 2001 From: Hailey Date: Thu, 16 Oct 2025 13:08:54 -0700 Subject: [PATCH 09/14] type --- osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py index 6e78030c..2f4c851a 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py @@ -1,10 +1,10 @@ from typing import List, Sequence from kafka import KafkaProducer +from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend from osprey.worker.adaptor.plugin_manager import hookimpl_osprey from osprey.worker.lib.config import Config from osprey.worker.lib.storage import ExecutionResultStorageBackendType -from osprey.worker.lib.storage.stored_execution_result import get_rules_execution_result_storage_backend from osprey.worker.sinks.sink.kafka_output_sink import KafkaOutputSink from osprey.worker.sinks.sink.output_sink import BaseOutputSink, StdoutOutputSink from osprey.worker.sinks.sink.stored_execution_result_output_sink import StoredExecutionResultOutputSink From c27f0b825c86d4ac34bf2b1ac7cde429aeda663c Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 16 Oct 2025 14:09:58 -0700 Subject: [PATCH 10/14] Update osprey_worker/src/osprey/worker/lib/storage/__init__.py Co-authored-by: ayu --- osprey_worker/src/osprey/worker/lib/storage/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osprey_worker/src/osprey/worker/lib/storage/__init__.py b/osprey_worker/src/osprey/worker/lib/storage/__init__.py index fa6e69df..72275d7f 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/__init__.py +++ b/osprey_worker/src/osprey/worker/lib/storage/__init__.py @@ -21,7 +21,7 @@ class ExecutionResultStorageBackendType(StrEnum): PLUGIN = auto() """ - Execution result store that is sdefined via register_execution_result_store + Execution result store that is defined via register_execution_result_store """ NONE = auto() From b50c895aedc46314566ae4c7ff4ab8cc174b9187 Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 16 Oct 2025 14:10:18 -0700 Subject: [PATCH 11/14] Update osprey_worker/src/osprey/worker/lib/storage/__init__.py Co-authored-by: ayu --- osprey_worker/src/osprey/worker/lib/storage/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osprey_worker/src/osprey/worker/lib/storage/__init__.py b/osprey_worker/src/osprey/worker/lib/storage/__init__.py index 72275d7f..8661a2ad 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/__init__.py +++ b/osprey_worker/src/osprey/worker/lib/storage/__init__.py @@ -26,5 +26,5 @@ class ExecutionResultStorageBackendType(StrEnum): NONE = auto() """ - Disable execution results from being stored + Disable execution results from being stored. This may cause certain elements of Osprey to break, such as the events stream and individual event details in the UI """ From 4833806d75c1ca5de391427b9f86512ef671a6db Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 16 Oct 2025 14:17:09 -0700 Subject: [PATCH 12/14] Update osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py --- .../src/osprey/worker/lib/storage/stored_execution_result.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py index 55844ce2..646e3dbc 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py +++ b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py @@ -489,7 +489,7 @@ def bootstrap_execution_result_storage_service() -> ExecutionResultStorageServic config = CONFIG.instance() storage_backend_type = ExecutionResultStorageBackendType( - config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none') + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower() ) storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) From 8d48923e1c1991c9fd455d221c660800b816d15c Mon Sep 17 00:00:00 2001 From: Hailey Date: Thu, 16 Oct 2025 14:38:52 -0700 Subject: [PATCH 13/14] more lower --- .../worker/_stdlibplugin/sink_register.py | 2 +- .../worker/_stdlibplugin/storage_register.py | 21 ++++++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py index 2f4c851a..20a790a7 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py @@ -27,7 +27,7 @@ def register_output_sinks(config: Config) -> Sequence[BaseOutputSink]: ) storage_backend_type = ExecutionResultStorageBackendType( - config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none') + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower() ) storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py index d6c21dc7..94e9fe4f 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py @@ -1,16 +1,17 @@ +from typing import Optional + +from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend from osprey.worker.adaptor.plugin_manager import hookimpl_osprey from osprey.worker.lib.config import Config -from osprey.worker.lib.storage.stored_execution_result import ExecutionResultStore, StoredExecutionResultMinIO +from osprey.worker.lib.storage import ExecutionResultStorageBackendType +from osprey.worker.lib.storage.stored_execution_result import ExecutionResultStore @hookimpl_osprey -def register_execution_result_store(config: Config) -> ExecutionResultStore: - endpoint = config.get_str('OSPREY_MINIO_ENDPOINT', 'minio:9000') - access_key = config.get_str('OSPREY_MINIO_ACCESS_KEY', 'minioadmin') - secret_key = config.get_str('OSPREY_MINIO_SECRET_KEY', 'minioadmin123') - secure = config.get_bool('OSPREY_MINIO_SECURE', False) - bucket_name = config.get_str('OSPREY_MINIO_EXECUTION_RESULTS_BUCKET', 'execution-output') - - return StoredExecutionResultMinIO( - endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, bucket_name=bucket_name +def register_execution_result_store(config: Config) -> Optional[ExecutionResultStore]: + storage_backend_type = ExecutionResultStorageBackendType( + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower() ) + storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) + + return storage_backend From c1d81385eb4aa6968787a0f7d07d832196526188 Mon Sep 17 00:00:00 2001 From: Hailey Date: Fri, 9 Jan 2026 14:55:02 -0800 Subject: [PATCH 14/14] add back `trylast` to register --- .../src/osprey/worker/_stdlibplugin/storage_register.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py index 94e9fe4f..b860efb3 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py @@ -7,7 +7,7 @@ from osprey.worker.lib.storage.stored_execution_result import ExecutionResultStore -@hookimpl_osprey +@hookimpl_osprey(trylast=True) def register_execution_result_store(config: Config) -> Optional[ExecutionResultStore]: storage_backend_type = ExecutionResultStorageBackendType( config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower()