diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py b/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py new file mode 100644 index 00000000..a38c394e --- /dev/null +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/execution_result_store_chooser.py @@ -0,0 +1,43 @@ +from typing import Optional + +from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store +from osprey.worker.lib.singletons import CONFIG +from osprey.worker.lib.storage import ExecutionResultStorageBackendType +from osprey.worker.lib.storage.stored_execution_result import ( + ExecutionResultStore, + StoredExecutionResultBigTable, + StoredExecutionResultGCS, + StoredExecutionResultMinIO, +) + + +def get_rules_execution_result_storage_backend( + backend_type: ExecutionResultStorageBackendType, +) -> Optional[ExecutionResultStore]: + """Based on the `backend_type` constructs a configured execution result store that can be used to store execution + results. For more details, see `ExecutionResultStore`.""" + + config = CONFIG.instance() + + if backend_type == ExecutionResultStorageBackendType.BIGTABLE: + return StoredExecutionResultBigTable() + elif backend_type == ExecutionResultStorageBackendType.GCS: + return StoredExecutionResultGCS() + elif backend_type == ExecutionResultStorageBackendType.MINIO: + endpoint = config.get_str('OSPREY_MINIO_ENDPOINT', 'minio:9000') + access_key = config.get_str('OSPREY_MINIO_ACCESS_KEY', 'minioadmin') + secret_key = config.get_str('OSPREY_MINIO_SECRET_KEY', 'minioadmin123') + secure = config.get_bool('OSPREY_MINIO_SECURE', False) + bucket_name = config.get_str('OSPREY_MINIO_EXECUTION_RESULTS_BUCKET', 'execution-output') + + return StoredExecutionResultMinIO( + endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, bucket_name=bucket_name + ) + elif backend_type == ExecutionResultStorageBackendType.PLUGIN: + store = bootstrap_execution_result_store(config=config) + if store is None: + raise AssertionError('No execution result store registered') + elif backend_type == ExecutionResultStorageBackendType.NONE: + return None + + return None diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py index 4597f7b0..20a790a7 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py @@ -1,8 +1,10 @@ from typing import List, Sequence from kafka import KafkaProducer -from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store, hookimpl_osprey +from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend +from osprey.worker.adaptor.plugin_manager import hookimpl_osprey from osprey.worker.lib.config import Config +from osprey.worker.lib.storage import ExecutionResultStorageBackendType from osprey.worker.sinks.sink.kafka_output_sink import KafkaOutputSink from osprey.worker.sinks.sink.output_sink import BaseOutputSink, StdoutOutputSink from osprey.worker.sinks.sink.stored_execution_result_output_sink import StoredExecutionResultOutputSink @@ -23,7 +25,14 @@ def register_output_sinks(config: Config) -> Sequence[BaseOutputSink]: kafka_producer=KafkaProducer(bootstrap_servers=bootstrap_servers, client_id=client_id), ) ) - execution_result_store = bootstrap_execution_result_store(config=config) - if execution_result_store is not None: + + storage_backend_type = ExecutionResultStorageBackendType( + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower() + ) + storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) + + # There may not be an execution result store configured, so check before adding the output sink + if storage_backend is not None: sinks.append(StoredExecutionResultOutputSink()) + return sinks diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py index f870691c..b860efb3 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/storage_register.py @@ -1,18 +1,17 @@ from typing import Optional +from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend from osprey.worker.adaptor.plugin_manager import hookimpl_osprey from osprey.worker.lib.config import Config -from osprey.worker.lib.storage.stored_execution_result import ExecutionResultStore, StoredExecutionResultMinIO +from osprey.worker.lib.storage import ExecutionResultStorageBackendType +from osprey.worker.lib.storage.stored_execution_result import ExecutionResultStore @hookimpl_osprey(trylast=True) def register_execution_result_store(config: Config) -> Optional[ExecutionResultStore]: - endpoint = config.get_str('OSPREY_MINIO_ENDPOINT', 'minio:9000') - access_key = config.get_str('OSPREY_MINIO_ACCESS_KEY', 'minioadmin') - secret_key = config.get_str('OSPREY_MINIO_SECRET_KEY', 'minioadmin123') - secure = config.get_bool('OSPREY_MINIO_SECURE', False) - bucket_name = config.get_str('OSPREY_MINIO_EXECUTION_RESULTS_BUCKET', 'execution-output') - - return StoredExecutionResultMinIO( - endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, bucket_name=bucket_name + storage_backend_type = ExecutionResultStorageBackendType( + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower() ) + storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) + + return storage_backend diff --git a/osprey_worker/src/osprey/worker/lib/storage/__init__.py b/osprey_worker/src/osprey/worker/lib/storage/__init__.py index 7c8c166b..444b418e 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/__init__.py +++ b/osprey_worker/src/osprey/worker/lib/storage/__init__.py @@ -1,6 +1,37 @@ +from enum import StrEnum, auto + # Import all models to ensure they're registered with SQLAlchemy # This is required for metadata.create_all() to create all tables from .bulk_action_task import BulkActionJob, BulkActionTask # noqa: F401 from .bulk_label_task import BulkLabelTask # noqa: F401 from .queries import Query, SavedQuery # noqa: F401 from .temporary_ability_token import TemporaryAbilityToken # noqa: F401 + + +class ExecutionResultStorageBackendType(StrEnum): + """Type of store used for execution results.""" + + BIGTABLE = auto() + """ + Bigtable execution result store + """ + + GCS = auto() + """ + Google Cloud Storage execution result store + """ + + MINIO = auto() + """ + Minio execution result store + """ + + PLUGIN = auto() + """ + Execution result store that is defined via register_execution_result_store + """ + + NONE = auto() + """ + Disable execution results from being stored. This may cause certain elements of Osprey to break, such as the events stream and individual event details in the UI + """ diff --git a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py index ea082c73..93251fa0 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py +++ b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py @@ -19,6 +19,7 @@ from osprey.worker.lib.instruments import metrics from osprey.worker.lib.osprey_shared.logging import get_logger from osprey.worker.lib.snowflake import Snowflake +from osprey.worker.lib.storage import ExecutionResultStorageBackendType from osprey.worker.lib.storage.bigtable import osprey_bigtable from pydantic.main import BaseModel @@ -497,9 +498,17 @@ def get_many( def bootstrap_execution_result_storage_service() -> ExecutionResultStorageService: """Create an ExecutionResultStorageService with the configured storage backend.""" - from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store + from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend from osprey.worker.lib.singletons import CONFIG config = CONFIG.instance() - storage_backend = bootstrap_execution_result_store(config) + + storage_backend_type = ExecutionResultStorageBackendType( + config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower() + ) + storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type) + + if storage_backend is None: + raise AssertionError('No storage backend registered') + return ExecutionResultStorageService(storage_backend)