Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Optional

from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store
from osprey.worker.lib.singletons import CONFIG
from osprey.worker.lib.storage import ExecutionResultStorageBackendType
from osprey.worker.lib.storage.stored_execution_result import (
ExecutionResultStore,
StoredExecutionResultBigTable,
StoredExecutionResultGCS,
StoredExecutionResultMinIO,
)


def get_rules_execution_result_storage_backend(
backend_type: ExecutionResultStorageBackendType,
) -> Optional[ExecutionResultStore]:
"""Based on the `backend_type` constructs a configured execution result store that can be used to store execution
results. For more details, see `ExecutionResultStore`."""

config = CONFIG.instance()

if backend_type == ExecutionResultStorageBackendType.BIGTABLE:
return StoredExecutionResultBigTable()
elif backend_type == ExecutionResultStorageBackendType.GCS:
return StoredExecutionResultGCS()
elif backend_type == ExecutionResultStorageBackendType.MINIO:
endpoint = config.get_str('OSPREY_MINIO_ENDPOINT', 'minio:9000')
access_key = config.get_str('OSPREY_MINIO_ACCESS_KEY', 'minioadmin')
secret_key = config.get_str('OSPREY_MINIO_SECRET_KEY', 'minioadmin123')
secure = config.get_bool('OSPREY_MINIO_SECURE', False)
bucket_name = config.get_str('OSPREY_MINIO_EXECUTION_RESULTS_BUCKET', 'execution-output')

return StoredExecutionResultMinIO(
endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, bucket_name=bucket_name
)
elif backend_type == ExecutionResultStorageBackendType.PLUGIN:
store = bootstrap_execution_result_store(config=config)
if store is None:
raise AssertionError('No execution result store registered')
elif backend_type == ExecutionResultStorageBackendType.NONE:
return None

return None
15 changes: 12 additions & 3 deletions osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import List, Sequence

from kafka import KafkaProducer
from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store, hookimpl_osprey
from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend
from osprey.worker.adaptor.plugin_manager import hookimpl_osprey
from osprey.worker.lib.config import Config
from osprey.worker.lib.storage import ExecutionResultStorageBackendType
from osprey.worker.sinks.sink.kafka_output_sink import KafkaOutputSink
from osprey.worker.sinks.sink.output_sink import BaseOutputSink, StdoutOutputSink
from osprey.worker.sinks.sink.stored_execution_result_output_sink import StoredExecutionResultOutputSink
Expand All @@ -23,7 +25,14 @@ def register_output_sinks(config: Config) -> Sequence[BaseOutputSink]:
kafka_producer=KafkaProducer(bootstrap_servers=bootstrap_servers, client_id=client_id),
)
)
execution_result_store = bootstrap_execution_result_store(config=config)
if execution_result_store is not None:

storage_backend_type = ExecutionResultStorageBackendType(
config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower()
)
storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type)

# There may not be an execution result store configured, so check before adding the output sink
if storage_backend is not None:
sinks.append(StoredExecutionResultOutputSink())

return sinks
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
from typing import Optional

from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend
from osprey.worker.adaptor.plugin_manager import hookimpl_osprey
from osprey.worker.lib.config import Config
from osprey.worker.lib.storage.stored_execution_result import ExecutionResultStore, StoredExecutionResultMinIO
from osprey.worker.lib.storage import ExecutionResultStorageBackendType
from osprey.worker.lib.storage.stored_execution_result import ExecutionResultStore


@hookimpl_osprey(trylast=True)
def register_execution_result_store(config: Config) -> Optional[ExecutionResultStore]:
endpoint = config.get_str('OSPREY_MINIO_ENDPOINT', 'minio:9000')
access_key = config.get_str('OSPREY_MINIO_ACCESS_KEY', 'minioadmin')
secret_key = config.get_str('OSPREY_MINIO_SECRET_KEY', 'minioadmin123')
secure = config.get_bool('OSPREY_MINIO_SECURE', False)
bucket_name = config.get_str('OSPREY_MINIO_EXECUTION_RESULTS_BUCKET', 'execution-output')

return StoredExecutionResultMinIO(
endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, bucket_name=bucket_name
storage_backend_type = ExecutionResultStorageBackendType(
config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower()
)
storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type)

return storage_backend
31 changes: 31 additions & 0 deletions osprey_worker/src/osprey/worker/lib/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,37 @@
from enum import StrEnum, auto

# Import all models to ensure they're registered with SQLAlchemy
# This is required for metadata.create_all() to create all tables
from .bulk_action_task import BulkActionJob, BulkActionTask # noqa: F401
from .bulk_label_task import BulkLabelTask # noqa: F401
from .queries import Query, SavedQuery # noqa: F401
from .temporary_ability_token import TemporaryAbilityToken # noqa: F401


class ExecutionResultStorageBackendType(StrEnum):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaict, the ones i added here are the only ones that exist

"""Type of store used for execution results."""

BIGTABLE = auto()
"""
Bigtable execution result store
"""

GCS = auto()
"""
Google Cloud Storage execution result store
"""

MINIO = auto()
"""
Minio execution result store
"""

PLUGIN = auto()
"""
Execution result store that is defined via register_execution_result_store
"""

NONE = auto()
"""
Disable execution results from being stored. This may cause certain elements of Osprey to break, such as the events stream and individual event details in the UI
"""
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from osprey.worker.lib.instruments import metrics
from osprey.worker.lib.osprey_shared.logging import get_logger
from osprey.worker.lib.snowflake import Snowflake
from osprey.worker.lib.storage import ExecutionResultStorageBackendType
from osprey.worker.lib.storage.bigtable import osprey_bigtable
from pydantic.main import BaseModel

Expand Down Expand Up @@ -497,9 +498,17 @@ def get_many(

def bootstrap_execution_result_storage_service() -> ExecutionResultStorageService:
"""Create an ExecutionResultStorageService with the configured storage backend."""
from osprey.worker.adaptor.plugin_manager import bootstrap_execution_result_store
from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend
from osprey.worker.lib.singletons import CONFIG

config = CONFIG.instance()
storage_backend = bootstrap_execution_result_store(config)

storage_backend_type = ExecutionResultStorageBackendType(
config.get_str('OSPREY_EXECUTION_RESULT_STORAGE_BACKEND', 'none').lower()
)
storage_backend = get_rules_execution_result_storage_backend(backend_type=storage_backend_type)

if storage_backend is None:
raise AssertionError('No storage backend registered')

Comment on lines +511 to +513
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaict, it makes sense to raise here. the codepaths i see calling bootstrap_execution_result_storage_service are all in the ui api or in one of the cli tools, and they probably should be raising an error when there isn't a configured backend.

return ExecutionResultStorageService(storage_backend)
Loading