diff --git a/bfabric/experimental/app_interface/_workunit_definition.py b/bfabric/experimental/app_interface/_workunit_definition.py deleted file mode 100644 index 6b6e738f..00000000 --- a/bfabric/experimental/app_interface/_workunit_definition.py +++ /dev/null @@ -1,107 +0,0 @@ -# deprecated, new code will replace this -from __future__ import annotations -import warnings - -warnings.warn("This module is deprecated", DeprecationWarning) - - -from pathlib import Path -from typing import TYPE_CHECKING - -import polars as pl -from pydantic import BaseModel, ConfigDict - -from bfabric.entities import Workunit, Project, ExternalJob, Resource - -if TYPE_CHECKING: - from bfabric.bfabric import Bfabric - - -class InputResourceDefinition(BaseModel): - id: int - scp_address: str | None - app_id: int - app_name: str - - @classmethod - def from_resource(cls, resource: Resource) -> InputResourceDefinition: - # TODO optimize: find generic mechanism to preload entities with an arena-like cache - scp_address = ( - f"{resource.storage.scp_prefix}{resource['relativepath']}" if resource.storage.scp_prefix else None - ) - data = { - "id": resource.id, - "scp_address": scp_address, - "app_id": resource.workunit.application.id, - "app_name": resource.workunit.application["name"], - } - return cls.model_validate(data) - - -class WorkunitExecutionDefinition(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True) - - parameter_values: dict[str, str] - executable_path: Path - input_dataset: pl.DataFrame | None - input_resources: list[InputResourceDefinition] - - @classmethod - def from_workunit(cls, workunit: Workunit) -> WorkunitExecutionDefinition: - input_resources = [] - for resource in workunit.input_resources: - input_resources.append(InputResourceDefinition.from_resource(resource)) - - data = {} - data["parameter_values"] = workunit.parameter_values - data["executable_path"] = Path(workunit.application.executable["program"]) - data["input_dataset"] = workunit.input_dataset.to_polars() if workunit.input_dataset else None - data["input_resources"] = input_resources - return cls.model_validate(data) - - -class WorkunitRegistrationDefinition(BaseModel): - workunit_id: int - project_id: int - order_id: int | None - - @classmethod - def from_workunit(cls, workunit: Workunit) -> WorkunitRegistrationDefinition: - data = {"workunit_id": workunit.id} - if isinstance(workunit.container, Project): - data["project_id"] = workunit.container.id - data["order_id"] = None - else: - data["project_id"] = workunit.container.project.id - data["order_id"] = workunit.container.id - return cls.model_validate(data) - - -class WorkunitDefinition: - def __init__(self, executon: WorkunitExecutionDefinition, registration: WorkunitRegistrationDefinition) -> None: - self._execution = executon - self._registration = registration - - execution = property(lambda self: self._execution) - registration = property(lambda self: self._registration) - - # TODO keep these? - workunit_id = property(lambda self: self._registration.workunit_id) - project_id = property(lambda self: self._registration.project_id) - order_id = property(lambda self: self._registration.order_id) - parameter_values = property(lambda self: self._execution.parameter_values) - executable_path = property(lambda self: self._execution.executable_path) - input_resource_ids = property(lambda self: self._execution.input_resource_ids) - input_dataset_ids = property(lambda self: self._execution.input_dataset_ids) - - @classmethod - def from_workunit(cls, client: Bfabric, workunit: Workunit) -> WorkunitDefinition: - return cls( - executon=WorkunitExecutionDefinition.from_workunit(workunit), - registration=WorkunitRegistrationDefinition.from_workunit(workunit), - ) - - @classmethod - def from_external_job_id(cls, client: Bfabric, external_job_id: int) -> WorkunitDefinition: - external_job = ExternalJob.find(id=external_job_id, client=client) - return cls.from_workunit(client=client, workunit=external_job.workunit) diff --git a/bfabric/wrapper_creator/bfabric_wrapper_creator.py b/bfabric/wrapper_creator/bfabric_wrapper_creator.py index 7ff8444a..a982d549 100644 --- a/bfabric/wrapper_creator/bfabric_wrapper_creator.py +++ b/bfabric/wrapper_creator/bfabric_wrapper_creator.py @@ -13,7 +13,7 @@ from bfabric import Bfabric from bfabric.bfabric_legacy import bfabricEncoder from bfabric.entities import Workunit, ExternalJob, Application, Resource, Storage, Order, Project -from bfabric.experimental.app_interface._workunit_definition import WorkunitDefinition +from bfabric.experimental.app_interface.workunit.definition import WorkunitDefinition from bfabric.wrapper_creator.bfabric_external_job import BfabricExternalJob @@ -24,7 +24,7 @@ def __init__(self, client: Bfabric, external_job_id: int) -> None: @cached_property def workunit_definition(self) -> WorkunitDefinition: - return WorkunitDefinition.from_external_job_id(client=self._client, external_job_id=self._external_job_id) + return WorkunitDefinition.from_workunit(self._external_job.workunit) @cached_property def _external_job(self) -> ExternalJob: @@ -70,7 +70,7 @@ def create_log_resource(self, variant: Literal["out", "err"], output_resource: R "resource", { "name": f"slurm_std{variant}", - "workunitid": self.workunit_definition.workunit_id, + "workunitid": self.workunit_definition.registration.workunit_id, "storageid": self._log_storage.id, "relativepath": f"/workunitid-{self._workunit.id}_resourceid-{output_resource.id}.{variant}", }, @@ -80,10 +80,10 @@ def create_log_resource(self, variant: Literal["out", "err"], output_resource: R def get_application_section(self, output_resource: Resource) -> dict[str, Any]: output_url = f"bfabric@{self._application.storage.data_dict['host']}:{self._application.storage.data_dict['basepath']}{output_resource.data_dict['relativepath']}" inputs = defaultdict(list) - for resource in self.workunit_definition.execution.input_resources: - inputs[resource.app_name].append(f"bfabric@{resource.scp_address}") + for resource in Resource.find_all(self.workunit_definition.execution.resources, client=self._client).values(): + inputs[resource.application.name].append(f"bfabric@{resource.storage.scp_address}") return { - "parameters": self.workunit_definition.parameter_values, + "parameters": self.workunit_definition.execution.raw_parameters, "protocol": "scp", "input": dict(inputs), "output": [output_url], @@ -102,18 +102,18 @@ def get_job_configuration_section( } inputs = defaultdict(list) - for resource in self.workunit_definition.execution.input_resources: + for resource in Resource.find_all(self.workunit_definition.execution.resources, client=self._client).values(): web_url = Resource({"id": resource.id}, client=self._client).web_url - inputs[resource.app_name].append({"resource_id": resource.id, "resource_url": web_url}) + inputs[resource.storage.name].append({"resource_id": resource.id, "resource_url": web_url}) return { - "executable": str(self.workunit_definition.executable_path), + "executable": str(self.workunit_definition.execution.executable), "external_job_id": self._external_job_id, "fastasequence": self._fasta_sequence, "input": dict(inputs), "inputdataset": None, - "order_id": self.workunit_definition.order_id, - "project_id": self.workunit_definition.project_id, + "order_id": self._order.id, + "project_id": self._project.id, "output": { "protocol": "scp", "resource_id": output_resource.id, @@ -122,7 +122,7 @@ def get_job_configuration_section( "stderr": log_resource["stderr"], "stdout": log_resource["stdout"], "workunit_createdby": self._workunit.data_dict["createdby"], - "workunit_id": self.workunit_definition.workunit_id, + "workunit_id": self.workunit_definition.registration.workunit_id, "workunit_url": self._workunit.web_url, } @@ -147,7 +147,7 @@ def write_results(self, config_serialized: str) -> None: { "name": "job configuration (executable) in YAML", "context": "WORKUNIT", - "workunitid": self.workunit_definition.workunit_id, + "workunitid": self.workunit_definition.registration.workunit_id, "description": "This is a job configuration as YAML base64 encoded. It is configured to be executed by the B-Fabric yaml submitter.", "base64": base64.b64encode(config_serialized.encode()).decode(), "version": "10", @@ -156,7 +156,7 @@ def write_results(self, config_serialized: str) -> None: yaml_workunit_externaljob = self._client.save( "externaljob", { - "workunitid": self.workunit_definition.workunit_id, + "workunitid": self.workunit_definition.registration.workunit_id, "status": "new", "executableid": yaml_workunit_executable["id"], "action": "WORKUNIT",