Skip to content

Commit

Permalink
do not make providing both flows mandatory
Browse files Browse the repository at this point in the history
  • Loading branch information
leoschwarz committed Sep 23, 2024
1 parent c275260 commit a10e111
Showing 1 changed file with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from __future__ import annotations

from pathlib import Path
from typing import Any
from typing import Any, Self

import yaml
from loguru import logger
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, model_validator

from bfabric import Bfabric
from bfabric.entities import Resource, Dataset
Expand All @@ -24,11 +24,17 @@ class ConfigDatasetFlow(BaseModel):


class ConfigDispatchIndividualResources(BaseModel):
resource_flow: ConfigResourceFlow
dataset_flow: ConfigDatasetFlow
resource_flow: ConfigResourceFlow | None
dataset_flow: ConfigDatasetFlow | None

@model_validator(mode="after")
def check_at_least_one_flow(self) -> Self:
if self.resource_flow is None and self.dataset_flow is None:
raise ValueError("either resource_flow or dataset_flow must be provided")
return self

def config_msi_imzml():

def config_msi_imzml() -> ConfigDispatchIndividualResources:
return ConfigDispatchIndividualResources(
resource_flow=ConfigResourceFlow(filter_suffix=".imzML"),
dataset_flow=ConfigDatasetFlow(resource_column="Imzml", param_columns=[("PanelDataset", "mass_list_id")]),
Expand Down Expand Up @@ -69,6 +75,8 @@ def _write_chunks(self, chunks: list[Path]) -> None:
yaml.safe_dump(data, f)

def _dispatch_jobs_resource_flow(self, definition: WorkunitDefinition, params: dict[str, Any]) -> list[Path]:
if self._config.resource_flow is None:
raise ValueError("resource_flow is not configured")
resources = Resource.find_all(ids=definition.execution.resources, client=self._client)
paths = []
for resource in sorted(resources.values()):
Expand All @@ -81,6 +89,8 @@ def _dispatch_jobs_resource_flow(self, definition: WorkunitDefinition, params: d
return paths

def _dispatch_jobs_dataset_flow(self, definition: WorkunitDefinition, params: dict[str, Any]) -> list[Path]:
if self._config.dataset_flow is None:
raise ValueError("dataset_flow is not configured")
dataset = Dataset.find(id=definition.execution.dataset, client=self._client)
dataset_df = dataset.to_polars()
resources = Resource.find_all(
Expand Down

0 comments on commit a10e111

Please sign in to comment.