Skip to content

Commit

Permalink
Simple assemblies ingestion and json dump
Browse files Browse the repository at this point in the history
  • Loading branch information
mberacochea committed Jul 28, 2023
1 parent 2975c91 commit a05454b
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 112 deletions.
100 changes: 57 additions & 43 deletions dump/cli.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
from typing import Optional
import csv
import os
import shutil
import json
import os

import typer
from sqlalchemy.future.engine import Engine
from sqlmodel import Session, SQLModel, create_engine
from sqlmodel import Session, SQLModel, create_engine, select
from typing_extensions import Annotated

from rich.progress import track

from dump.models import Assembly, CopyStatus, File
from dump.models import Assembly, File

app = typer.Typer()


FTP_PATH = "/nfs/ftp/public/databases/metagenomics/temp/protein-db-dump-files"

__version__ = "0.1.0"


Expand All @@ -33,50 +30,63 @@ def create_database(db: str) -> Engine:
return engine


@app.command(help=("Copy the files for an assembly."))
def copy(database: str, assembly_accession: str):
@app.command(help=("Dump all the assemblies in the db."))
def dump_all(database: str, output_folder: str):
engine = create_database(database)
with Session(engine) as session:
assembly = session.get(Assembly, assembly_accession)

if assembly.ready:
print(f"Assembly {assembly.accession} done")
return
assemblies = session.exec(select(Assembly))
for assembly in track(
assemblies, description="Exporting the assemblies json files"
):
dump_json(database, assembly.accession, output_folder)

if not os.path.exists(assembly.dest_folder):
os.makedirs(assembly.dest_folder)

pending_files = [
f for f in assembly.highest_pipeline_files if f.status == CopyStatus.PENDING
]

for file_ in pending_files:
file_.copy()
session.add(file_)
session.commit()
@app.command(help=("Dump the assembly json files."))
def dump_json(database: str, assembly_accession: str, output_folder: str):
engine = create_database(database)
with Session(engine) as session:
assembly = session.get(Assembly, assembly_accession)

session.refresh(assembly)
if assembly is None:
raise Exception(f"Assembly {assembly_accession} not found")

# JSON with the description of the files #
mgya = assembly.highest_pipeline_files[0].mgya
pipeline_version = assembly.highest_pipeline_files[0].pipeline_version
json_content = {
"z": mgya,
"pipeline_version": pipeline_version,
"files": [],
}
for file_ in assembly.highest_pipeline_files:
json_content["files"].append({
"file": file_.file_alias,
"description": file_.file_description,
"copied": file_.status == CopyStatus.COMPLETED,
})

with open(f"{assembly.dest_folder}/metadata.json", "w", encoding="utf-8") as f:
json.dump(json_content, f, indent=4)

shutil.make_archive(assembly.tarball_path, "gztar", assembly.dest_folder)
shutil.rmtree(assembly.dest_folder)
pipelines = set(f.pipeline_version for f in assembly.files)

for pipeline_version in pipelines:
files_for_pipeline: File = assembly.files_for_pipeline_version(
pipeline_version
)

is_private = all(
[
f.job_is_private
or f.sample_is_private
or f.study_is_private
or f.assembly_is_private
for f in files_for_pipeline
]
)

json_content = {
"assembly": assembly.accession,
"pipeline_version": pipeline_version,
"is_private": is_private,
"files": [],
}

for file_ in files_for_pipeline:
json_content["files"].append(
{
"file_path": file_.file_path,
"description": file_.file_description,
}
)
output = f"{assembly.dest_folder(output_folder)}"
os.makedirs(output, exist_ok=True)
ouput_json = f"{output}/{assembly.accession}_{pipeline_version}.json"
with open(ouput_json, "w", encoding="utf-8") as f:
json.dump(json_content, f, indent=4)


@app.command(help="Add entries to the, template and entries.")
Expand Down Expand Up @@ -111,6 +121,10 @@ def init(
file_alias=assembly_file_row[3],
file_description=assembly_file_row[4],
pipeline_version=float(assembly_file_row[5]),
job_is_private=row[6] == "1",
sample_is_private=row[7] == "1",
study_is_private=row[8] == "1",
assembly_is_private=row[9] == "1",
)
session.add(file_)
session.commit()
Expand Down
77 changes: 8 additions & 69 deletions dump/models.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import subprocess
from datetime import datetime
from enum import Enum
from typing import Optional
import os

from sqlmodel import Field, Relationship, SQLModel

FTP_PATH = "/nfs/ftp/public/databases/metagenomics/temp/protein-db-dump-files"


class TimeStampedMixin(SQLModel):
created_at: datetime = Field(default=datetime.utcnow(), nullable=False)
Expand All @@ -18,48 +14,17 @@ class Assembly(TimeStampedMixin, table=True):
accession: str = Field(primary_key=True)
files: list["File"] = Relationship(back_populates="assembly")

@property
def dest_folder(self):
return f"{FTP_PATH}/{self.accession[:6]}/{self.accession}_data"

@property
def tarball_path(self):
return f"{FTP_PATH}/{self.accession[:6]}/{self.accession}"
def dest_folder(self, output_folder):
return f"{output_folder}/{self.accession[:6]}/{self.accession}"

@property
def highest_pipeline_files(self):
pipelines = set(f.pipeline_version for f in self.files)
largest_pipeline_version = sorted(pipelines, reverse=True)[0]
return [f for f in self.files if f.pipeline_version == largest_pipeline_version]

@property
def ready(self):
total = len(self.files)
pending = sum(map(lambda x: x.status == CopyStatus.PENDING, self.files))
completed = sum(map(lambda x: x.status == CopyStatus.COMPLETED, self.files))
missing = sum(map(lambda x: x.status == CopyStatus.MISSING, self.files))
error = sum(map(lambda x: x.status == CopyStatus.ERROR, self.files))

if completed == total:
print("Done")
return True
if missing == total or error == total:
print(f"Assembly {self.accession} completly failed.")
return False
if pending:
print("Files copy pending.")
return False
if missing:
print("Missing files.")
return False
if error:
print("Some failed")
return False
print("WUT?")
return False

def dump_metadata(self):
pass
def files_for_pipeline_version(self, pipeline_version):
return [f for f in self.files if f.pipeline_version == pipeline_version]


class CopyStatus(str, Enum):
Expand All @@ -82,33 +47,7 @@ class File(TimeStampedMixin, table=True):
file_alias: str = Field(nullable=False)
file_description: str = Field(nullable=False)
pipeline_version: float = Field(nullable=False)

status: CopyStatus = Field(default=CopyStatus.PENDING, nullable=False)

copy_exitcode: int = Field(nullable=True)
copy_stdout: str = Field(nullable=True)
copy_stderr: str = Field(nullable=True)

def copy(self):
dest_file = f"{self.assembly.dest_folder}/{self.file_alias}"
print(f"cp {self.file_path} {dest_file}")
if not os.path.exists(self.file_path):
self.status = CopyStatus.MISSING
else:
print(f"cp {self.file_path} {dest_file}")
run_return = subprocess.run(
f"cp {self.file_path} {dest_file}",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if run_return.returncode == 0:
self.status = CopyStatus.COMPLETED
else:
self.status = CopyStatus.ERROR
self.copy_exitcode = run_return.returncode
self.copy_stdout = run_return.stdout
self.copy_stderr = run_return.stderr
print(f"{self.status}")
return self
job_is_private: bool = Field(nullable=False)
sample_is_private: bool = Field(nullable=False)
study_is_private: bool = Field(nullable=False)
assembly_is_private: bool = Field(nullable=False)

0 comments on commit a05454b

Please sign in to comment.