Skip to content

Commit

Permalink
Merge pull request #19 from pepkit/dev
Browse files Browse the repository at this point in the history
Dev
xuebingjie1990 authored Aug 18, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 62e6db0 + ec2085d commit d28d9d5
Showing 8 changed files with 366 additions and 166 deletions.
1 change: 1 addition & 0 deletions .github/workflows/run-pytest.yml
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ on:
branches: [master]
workflow_dispatch:
inputs: null

jobs:
pytest:
strategy:
91 changes: 7 additions & 84 deletions README.md
Original file line number Diff line number Diff line change
@@ -4,100 +4,23 @@

<img src="https://raw.githubusercontent.com/pepkit/pipestat/master/docs/img/pipestat_logo.svg?sanitize=true" alt="pipestat" height="70"/><br>

# What is this?
Pipestat standardizes reporting of pipeline results. It provides 1) a standard specification for how pipeline outputs should be stored; and 2) an implementation to easily write results to that format from within Python or from the command line. A pipeline author defines all the outputs produced by a pipeline by writing a JSON-schema. The pipeline then uses pipestat to report pipeline outputs as the pipeline runs, either via the Python API or command line interface. The user configures results to be stored either in a [YAML-formatted file](https://yaml.org/spec/1.2/spec.html) or a [PostgreSQL database](https://www.postgresql.org/).

Pipestat standardizes reporting of pipeline results. It provides 1) a standard specification for how pipeline outputs should be stored; and 2) an implementation to easily write results to that format from within Python or from the command line.
See [Pipestat documentation](https://pipestat.databio.org) for complete details.

# How does it work?

A pipeline author defines all the outputs produced by a pipeline by writing a JSON-schema. The pipeline then uses pipestat to report pipeline outputs as the pipeline runs, either via the Python API or command line interface. The user configures results to be stored either in a [YAML-formatted file](https://yaml.org/spec/1.2/spec.html) or a [PostgreSQL database](https://www.postgresql.org/). The results are recorded according to the pipestat specification, in a standard, pipeline-agnostic way. This way, downstream software can use this specification to create universal tools for analyzing, monitoring, and visualizing pipeline results that will work with any pipeline or workflow.
## Developer tests

First you need a local demo instance of posgres running to test the database back-end. you can get one using docker matching the included config file like this:

# Quick start

## Install pipestat

```console
pip install pipestat
```

## Set environment variables (optional)

```console
export PIPESTAT_RESULTS_SCHEMA=output_schema.yaml
export PIPESTAT_RECORD_ID=my_record
export PIPESTAT_RESULTS_FILE=results_file.yaml
export PIPESTAT_NAMESPACE=my_namespace
```

## Pipeline results reporting and retrieval

### Report a result

From command line:

```console
pipestat report -i result_name -v 1.1
```

From Python:

```python
import pipestat

psm = pipestat.PipestatManager()
psm.report(values={"result_name": 1.1})
```

### Retrieve a result

From command line:

```console
pipestat retrieve -i result_name
```

From Python:

```python
import pipestat

psm = pipestat.PipestatManager()
psm.retrieve(result_identifier="result_name")
```
docker run --rm -it -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=pipestat-password -e POSTGRES_DB=pipestat-test -p 5432:5432 postgres
## Pipeline status management

## Set status

From command line:

```console
pipestat status set running
```

From Python:

```python
import pipestat
Then, run tests:

psm = pipestat.PipestatManager()
psm.set_status(status_identifier="running")
```

## Get status

From command line:

```console
pipestat status get
pytest
```

From Python:

```python
import pipestat

psm = pipestat.PipestatManager()
psm.get_status()
```
126 changes: 72 additions & 54 deletions docs/img/pipestat_logo.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
137 changes: 137 additions & 0 deletions docs/img/pipestat_logo_light.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
119 changes: 119 additions & 0 deletions docs/img/pipestat_logo_old.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 5 additions & 9 deletions pipestat/cli.py
Original file line number Diff line number Diff line change
@@ -40,15 +40,11 @@ def main():
if psm.schema is None:
raise SchemaNotFoundError(msg="report", cli=True)
result_metadata = psm.schema[args.result_identifier]
if (
result_metadata[SCHEMA_TYPE_KEY]
in [
"object",
"image",
"file",
]
and os.path.exists(expandpath(value))
):
if result_metadata[SCHEMA_TYPE_KEY] in [
"object",
"image",
"file",
] and os.path.exists(expandpath(value)):
from json import load

_LOGGER.info(
39 changes: 24 additions & 15 deletions pipestat/pipestat.py
Original file line number Diff line number Diff line change
@@ -471,10 +471,19 @@ def session(self):
"""
if not self.is_db_connected():
self.establish_db_connection()
with self[DB_SESSION_KEY]() as session:
_LOGGER.debug("Created session")
# with self[DB_SESSION_KEY]() as session:
session = self[DB_SESSION_KEY]()
_LOGGER.debug("Created session")
try:
yield session
_LOGGER.debug("Ending session")
except:
_LOGGER.info("session.rollback")
session.rollback()
raise
finally:
_LOGGER.info("session.close")
session.close()
_LOGGER.debug("Ending session")

def _get_flag_file(
self, record_identifier: str = None
@@ -896,7 +905,7 @@ def _table_to_dict(self) -> None:
"""
Create a dictionary from the database table data
"""
with self.session as s:
with self[DB_SESSION_KEY]() as s:
records = s.query(self.get_orm(self.namespace)).all()
_LOGGER.debug(f"Reading data from database for '{self.namespace}' namespace")
for record in records:
@@ -937,7 +946,7 @@ def _check_table_exists(self, table_name: str) -> bool:
"""
from sqlalchemy import inspect

with self.session as s:
with self[DB_SESSION_KEY]() as s:
return inspect(s.bind).has_table(table_name=table_name)

def _count_rows(self, table_name: str) -> int:
@@ -947,7 +956,7 @@ def _count_rows(self, table_name: str) -> int:
:param str table_name: table to count rows for
:return int: number of rows in the selected table
"""
with self.session as s:
with self[DB_SESSION_KEY]() as s:
return s.query(self[DB_ORMS_KEY][table_name].id).count()

def get_orm(self, table_name: str = None) -> Any:
@@ -981,7 +990,7 @@ def check_record_exists(
:return bool: whether the record exists in the table
"""
if self.file is None:
with self.session as s:
with self[DB_SESSION_KEY]() as s:
return (
s.query(self.get_orm(table_name).id)
.filter_by(record_identifier=record_identifier)
@@ -1039,7 +1048,7 @@ def _check_which_results_exist_db(
"""
table_name = table_name or self.namespace
rid = self._strict_record_id(rid)
with self.session as s:
with self[DB_SESSION_KEY]() as s:
record = (
s.query(self.get_orm(table_name))
.filter_by(record_identifier=rid)
@@ -1102,7 +1111,7 @@ def select(
"""

ORM = self.get_orm(table_name or self.namespace)
with self.session as s:
with self[DB_SESSION_KEY]() as s:
if columns is not None:
query = s.query(*[getattr(ORM, column) for column in columns])
else:
@@ -1129,7 +1138,7 @@ def select_distinct(self, table_name, columns) -> List[Any]:
"""

ORM = self.get_orm(table_name or self.namespace)
with self.session as s:
with self[DB_SESSION_KEY]() as s:
query = s.query(*[getattr(ORM, column) for column in columns])
query = query.distinct()
result = query.all()
@@ -1202,7 +1211,7 @@ def _retrieve_db(
f"'{record_identifier}'"
)

with self.session as s:
with self[DB_SESSION_KEY]() as s:
record = (
s.query(self.get_orm(table_name))
.filter_by(record_identifier=record_identifier)
@@ -1249,7 +1258,7 @@ def select_txt(
f"This operation is not supported for file backend."
)
ORM = self.get_orm(table_name or self.namespace)
with self.session as s:
with self[DB_SESSION_KEY]() as s:
if columns is not None:
q = (
s.query(*[getattr(ORM, column) for column in columns])
@@ -1378,12 +1387,12 @@ def _report_db(
record_identifier=record_identifier, table_name=table_name
):
new_record = ORMClass(**values)
with self.session as s:
with self[DB_SESSION_KEY]() as s:
s.add(new_record)
s.commit()
returned_id = new_record.id
else:
with self.session as s:
with self[DB_SESSION_KEY]() as s:
record_to_update = (
s.query(ORMClass)
.filter(getattr(ORMClass, RECORD_ID) == record_identifier)
@@ -1501,7 +1510,7 @@ def _remove_db(
if self.check_record_exists(
record_identifier=record_identifier, table_name=table_name
):
with self.session as s:
with self[DB_SESSION_KEY]() as s:
records = s.query(ORMClass).filter(
getattr(ORMClass, RECORD_ID) == record_identifier
)
5 changes: 1 addition & 4 deletions setup.py
Original file line number Diff line number Diff line change
@@ -19,9 +19,6 @@
# DEPENDENCIES.append(line.split("=")[0].rstrip("<>"))
DEPENDENCIES.append(line)

# 2to3
if sys.version_info >= (3,):
extra["use_2to3"] = True
extra["install_requires"] = DEPENDENCIES


@@ -73,7 +70,7 @@ def get_static(name, condition=None):
],
keywords="project, metadata, bioinformatics, sequencing, ngs, workflow",
url="https://github.com/pepkit/" + PACKAGE,
author=u"Michal Stolarczyk, Nathan Sheffield",
author="Michal Stolarczyk, Nathan Sheffield",
license="BSD2",
entry_points={
"console_scripts": ["pipestat = pipestat.__main__:main"],

0 comments on commit d28d9d5

Please sign in to comment.