diff --git a/bfabric/bfabric.py b/bfabric/bfabric.py index 88205f34..9401956d 100755 --- a/bfabric/bfabric.py +++ b/bfabric/bfabric.py @@ -16,6 +16,7 @@ import base64 import importlib.metadata import logging +import sys from contextlib import contextmanager from datetime import datetime from enum import Enum @@ -172,7 +173,7 @@ def read( page_offset = initial_offset for i_iter, i_page in enumerate(requested_pages): if not (i_iter == 0 and i_page == 1): - print("-- reading page", i_page, "of", n_available_pages) + print(f"-- reading page {i_page} of {n_available_pages}", file=sys.stderr) results = self.engine.read( endpoint=endpoint, obj=obj, auth=self.auth, page=i_page, return_id_only=return_id_only ) diff --git a/bfabric/scripts/bfabric_list_workunit_parameters.py b/bfabric/scripts/bfabric_list_workunit_parameters.py new file mode 100644 index 00000000..c15b299f --- /dev/null +++ b/bfabric/scripts/bfabric_list_workunit_parameters.py @@ -0,0 +1,107 @@ +import argparse +import json +import sys + +import polars as pl +import rich + +from bfabric import Bfabric + + +def bfabric_list_workunit_parameters(client: Bfabric, application_id: int, max_workunits: int, format: str) -> None: + """Lists the workunit parameters of the provided application. + :param client: The Bfabric client to use. + :param application_id: The application ID to list the workunit parameters for. + :param max_workunits: The maximum number of workunits to fetch. + :param format: The output format to use. + """ + workunits_table_full = get_workunits_table_full(application_id, client, max_workunits) + workunits_table_explode = workunits_table_full.explode("parameter").with_columns( + parameter_id=pl.col("parameter").struct[1] + ) + parameter_table_wide = get_parameter_table(client, workunits_table_explode) + + merged_result = workunits_table_full[ + ["workunit_id", "created", "createdby", "name", "container_id", "inputdataset_id", "resource_ids"] + ].join(parameter_table_wide, on="workunit_id", how="left") + + print_results(format, merged_result) + + +def get_workunits_table_full(application_id: int, client: Bfabric, max_workunits: int) -> pl.DataFrame: + """Returns a table with the workunits for the specified application.""" + # read the workunit data + workunits_table_full = ( + client.read("workunit", {"applicationid": application_id}, max_results=max_workunits) + .to_polars() + .rename({"id": "workunit_id"}) + ) + # add some extra columns flattening the structure for the output + workunits_table_full = workunits_table_full.with_columns( + container_id=pl.col("container").struct[1], + resource_ids=pl.col("resource").map_elements( + lambda x: json.dumps([xx["id"] for xx in x]), return_dtype=pl.String + ), + ) + if "inputdataset" in workunits_table_full.columns: + workunits_table_full = workunits_table_full.with_columns( + inputdataset_id=pl.col("inputdataset").struct[1], + ) + else: + workunits_table_full = workunits_table_full.with_columns( + inputdataset_id=pl.lit(None) + ) + return workunits_table_full + + +def print_results(format: str, merged_result: pl.DataFrame) -> None: + """Prints the results to the console, in the requested format.""" + if format == "tsv": + print(merged_result.write_csv(file=None, separator="\t")) + elif format == "json": + print(merged_result.write_json(file=None)) + elif format == "pretty": + # use rich + rich_table = rich.table.Table() + for column in merged_result.columns: + rich_table.add_column(column) + for row in merged_result.iter_rows(): + rich_table.add_row(*map(str, row)) + console = rich.console.Console() + console.print(rich_table) + else: + raise ValueError("Unsupported format") + + +def get_parameter_table(client: Bfabric, workunits_table_explode: pl.DataFrame) -> pl.DataFrame: + """Returns a wide format table for the specified parameters, with the key `workunit_id` indicating the source.""" + # load the parameters table + collect = [] + for i_frame, frame in enumerate(workunits_table_explode.iter_slices(100)): + print(f"-- Reading parameters chunk {i_frame + 1} of {len(workunits_table_explode) // 100 + 1}", file=sys.stderr) + chunk = ( + client.read("parameter", {"id": frame["parameter_id"].to_list()}).to_polars().rename({"id": "parameter_id"}) + ) + collect.append(chunk) + parameter_table_full = pl.concat(collect, how="align")[["parameter_id", "key", "value"]] + # add workunit id to parameter table + parameter_table_full = parameter_table_full.join( + workunits_table_explode[["workunit_id", "parameter_id"]], on="parameter_id", how="left" + ) + # convert to wide format + return parameter_table_full.pivot(values="value", index="workunit_id", columns="key") + + +def main() -> None: + """Parses command line arguments and calls `bfabric_list_workunit_parameters`.""" + client = Bfabric.from_config(verbose=True) + parser = argparse.ArgumentParser() + parser.add_argument("application_id", type=int, help="The application ID to list the workunit parameters for.") + parser.add_argument("--max-workunits", type=int, help="The maximum number of workunits to fetch.", default=200) + parser.add_argument("--format", choices=["tsv", "json", "pretty"], default="tsv") + args = vars(parser.parse_args()) + bfabric_list_workunit_parameters(client, **args) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index d0fcd25c..52b451e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ Repository = "https://github.com/fgcz/bfabricPy" #bfabric_feeder_resource_autoQC="bfabric.scripts.bfabric_feeder_resource_autoQC:main" "bfabric_list_not_existing_storage_directories.py"="bfabric.scripts.bfabric_list_not_existing_storage_directories:main" "bfabric_list_not_available_proteomics_workunits.py"="bfabric.scripts.bfabric_list_not_available_proteomics_workunits:main" +"bfabric_list_workunit_parameters.py"="bfabric.scripts.bfabric_list_workunit_parameters:main" "bfabric_upload_resource.py"="bfabric.scripts.bfabric_upload_resource:main" "bfabric_logthis.py"="bfabric.scripts.bfabric_logthis:main" "bfabric_setResourceStatus_available.py"="bfabric.scripts.bfabric_setResourceStatus_available:main"