Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 1 addition & 34 deletions gen3_tracker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
import typing
import uuid
from collections import OrderedDict
from typing import Union, Optional
from typing import Optional

import click
import pydantic
from click import Context, Command
from pydantic import BaseModel, field_validator


ACED_NAMESPACE = uuid.uuid3(uuid.NAMESPACE_DNS, b'aced-idp.org')
ENV_VARIABLE_PREFIX = 'G3T_'

Expand All @@ -24,28 +22,6 @@
}


def monkey_patch_url_validate():
# monkey patch to allow file: urls
import fhir.resources.fhirtypes
from pydantic import FileUrl

original_url_validate = fhir.resources.fhirtypes.Url.validate

@classmethod
def better_url_validate(cls, value: str, field: "ModelField", config: "BaseConfig") -> Union["AnyUrl", str]: # noqa
"""Allow file: urls. see https://github.com/pydantic/pydantic/issues/1983
bugfix: addresses issue introduced with `fhir.resources`==7.0.1
"""
if value.startswith("file:"):
_ = FileUrl(value)
return value
# return FileUrl.validate(value, field, config)
value = original_url_validate(value, field, config)
return value

fhir.resources.fhirtypes.Url.validate = better_url_validate


class LogConfig(BaseModel):
format: str
"""https://docs.python.org/3/library/logging.html#logging.Formatter"""
Expand Down Expand Up @@ -177,12 +153,3 @@ def resolve_command(
# os._exit(1) # noqa

raise e


# main
monkey_patch_url_validate()

# default initializers for path
pydantic.v1.json.ENCODERS_BY_TYPE[pathlib.PosixPath] = str
pydantic.v1.json.ENCODERS_BY_TYPE[pathlib.WindowsPath] = str
pydantic.v1.json.ENCODERS_BY_TYPE[pathlib.Path] = str
8 changes: 4 additions & 4 deletions gen3_tracker/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def create_resource_id(resource, project_id) -> str:
assert resource, "resource required"
assert project_id, "project_id required"
identifier_string = identifier_to_string(resource.identifier)
return str(uuid.uuid5(ACED_NAMESPACE, f"{project_id}/{resource.resource_type}/{identifier_string}"))
return str(uuid.uuid5(ACED_NAMESPACE, f"{project_id}/{resource.get_resource_type()}/{identifier_string}"))


def create_object_id(path: str, project_id: str) -> str:
Expand All @@ -344,7 +344,7 @@ def assert_valid_id(resource, project_id):
"""Ensure that the id is correct."""
assert resource, "resource required"
assert project_id, "project_id required"
if resource.resource_type == "DocumentReference":
if resource.get_resource_type() == "DocumentReference":
document_reference: DocumentReference = resource
official_identifier = document_reference.content[0].attachment.url
recreate_id = create_object_id(official_identifier, project_id)
Expand All @@ -354,7 +354,7 @@ def assert_valid_id(resource, project_id):
recreate_id = create_resource_id(resource, project_id)
if resource.id == recreate_id:
return
msg = f"The current {resource.resource_type}.id {resource.id} does not equal the calculated one {recreate_id}, has the project id changed? current:{project_id} {resource.resource_type}:{official_identifier}"
msg = f"The current {resource.get_resource_type()}.id {resource.id} does not equal the calculated one {recreate_id}, has the project id changed? current:{project_id} {resource.get_resource_type()}:{official_identifier}"
raise Exception(msg)


Expand Down Expand Up @@ -523,7 +523,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
elif hasattr(self.output.obj, 'model_dump'):
_.update(self.output.obj.model_dump())
else:
_.update(self.output.obj.dict())
_.update(self.output.obj.model_dump())
rc = self.output.exit_code
if exc_type is not None:
if isinstance(self.output.obj, dict):
Expand Down
128 changes: 83 additions & 45 deletions gen3_tracker/gen3/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,52 @@
from gen3_tracker import Config
from gen3_tracker.common import Push, Commit
from gen3_tracker.gen3.indexd import write_indexd
from gen3_tracker.git import calculate_hash, DVC, run_command, DVCMeta, DVCItem, modified_date
from gen3_tracker.git import (
calculate_hash,
DVC,
run_command,
DVCMeta,
DVCItem,
modified_date,
)


def _validate_parameters(from_: str) -> pathlib.Path:

assert len(urlparse(from_).scheme) == 0, f"{from_} appears to be an url. url to url cp not supported"
assert (
len(urlparse(from_).scheme) == 0
), f"{from_} appears to be an url. url to url cp not supported"

return from_


def cp(config: Config,
from_: str,
project_id: str,
ignore_state: bool,
auth=None,
user=None,
object_name=None,
bucket_name=None,
metadata: dict = {},
):
def cp(
config: Config,
from_: str,
project_id: str,
ignore_state: bool,
auth=None,
user=None,
object_name=None,
bucket_name=None,
metadata: dict = {},
):
"""Copy meta to bucket, used by etl_pod job"""
from_ = _validate_parameters(str(from_))
if not isinstance(from_, pathlib.Path):
from_ = pathlib.Path(from_)

assert auth, "auth is required"

metadata = dict({'submitter': None, 'metadata_version': '0.0.1', 'is_metadata': True} | metadata)
if not metadata['submitter']:
metadata = dict(
{"submitter": None, "metadata_version": "0.0.1", "is_metadata": True} | metadata
)
if not metadata["submitter"]:
if not user:
user = auth.curl('/user/user').json()
metadata['submitter'] = user['name']
user = auth.curl("/user/user").json()
metadata["submitter"] = user["name"]

program, project = project_id.split('-')
program, project = project_id.split("-")

assert bucket_name, f"could not find bucket for {program}"

Expand All @@ -57,27 +69,26 @@ def cp(config: Config,

if not object_name:
now = datetime.now().strftime("%Y%m%d-%H%M%S")
object_name = f'_{project_id}-{now}_meta.zip'
object_name = f"_{project_id}-{now}_meta.zip"

zipfile_path = temp_dir / object_name
with ZipFile(zipfile_path, 'w') as zip_object:
with ZipFile(zipfile_path, "w") as zip_object:
for _ in from_.glob("*.ndjson"):
zip_object.write(_)

stat = zipfile_path.stat()
md5_sum = calculate_hash('md5', zipfile_path)
md5_sum = calculate_hash("md5", zipfile_path)
my_dvc = DVC(
meta=DVCMeta(),
outs=[
DVCItem(
path=object_name,
md5=md5_sum,
hash='md5',
hash="md5",
modified=modified_date(zipfile_path),
size=stat.st_size,

)
]
],
)

metadata = write_indexd(
Expand All @@ -92,64 +103,91 @@ def cp(config: Config,
# document = file_client.upload_file_to_guid(guid=id_, file_name=object_name, bucket=bucket_name)
# print(document, file=sys.stderr)

run_command(f"gen3-client upload-single --bucket {bucket_name} --guid {my_dvc.object_id} --file {zipfile_path} --profile {config.gen3.profile}", no_capture=False)
run_command(
f"gen3-client upload-single --bucket {bucket_name} --guid {my_dvc.object_id} --file {zipfile_path} --profile {config.gen3.profile}",
no_capture=False,
)

return {'msg': f"Uploaded {zipfile_path} to {bucket_name}", "object_id": my_dvc.object_id, "object_name": object_name}
return {
"msg": f"Uploaded {zipfile_path} to {bucket_name}",
"object_id": my_dvc.object_id,
"object_name": object_name,
}


def publish_commits(config: Config, wait: bool, auth: Gen3Auth, bucket_name: str, spinner=None) -> dict:
def publish_commits(
config: Config, wait: bool, auth: Gen3Auth, bucket_name: str, spinner=None
) -> dict:
"""Publish commits to the portal."""

# TODO legacy fhir-import-export job: copies meta to bucket and triggers job,
# meta information is already in git REPO,
# we should consider changing the fhir_import_export job to use the git REPO

user = auth.curl('/user/user').json()
user = auth.curl("/user/user").json()

# copy meta to bucket
upload_result = cp(
config=config,
from_='META',
from_="META",
project_id=config.gen3.project_id,
ignore_state=True,
auth=auth,
user=user,
bucket_name=bucket_name
bucket_name=bucket_name,
)

object_id = upload_result['object_id']
object_id = upload_result["object_id"]

push = Push(config=config)
jobs_client = Gen3Jobs(auth_provider=auth)

# create "legacy" commit object, read by fhir-import-export job
push.commits.append(Commit(object_id=object_id, message='From g3t-git', meta_path=upload_result['object_name'], commit_id=object_id))
args = {'push': push.model_dump(), 'project_id': config.gen3.project_id, 'method': 'put'}
push.commits.append(
Commit(
object_id=object_id,
message="From g3t-git",
meta_path=upload_result["object_name"],
commit_id=object_id,
)
)
args = {
"push": push.model_dump(),
"project_id": config.gen3.project_id,
"method": "put",
}

# capture logging from gen3.jobs
from cdislogging import get_logger # noqa

cdis_logging = get_logger("__name__")
cdis_logging.setLevel(logging.WARN)

if wait:
# async_run_job_and_wait monkeypatched below
_ = asyncio.run(jobs_client.async_run_job_and_wait(job_name='fhir_import_export', job_input=args, spinner=spinner))
_ = asyncio.run(
jobs_client.async_run_job_and_wait(
job_name="fhir_import_export", job_input=args, spinner=spinner
)
)
else:
_ = jobs_client.create_job('fhir_import_export', args)
_ = jobs_client.create_job("fhir_import_export", args)

if not isinstance(_, dict):
_ = {'output': _}
if isinstance(_['output'], str):
_ = {"output": _}
if isinstance(_["output"], str):
try:
_['output'] = json.loads(_['output'])
_["output"] = json.loads(_["output"])
except json.JSONDecodeError:
pass
return _


# monkey patch for gen3.jobs.Gen3Jobs.async_run_job_and_wait
# make it less noisy and sleep less (max of 30 seconds)
async def async_run_job_and_wait(self, job_name, job_input, spinner=None, _ssl=None, **kwargs):
async def async_run_job_and_wait(
self, job_name, job_input, spinner=None, _ssl=None, **kwargs
):
"""
Asynchronous function to create a job, wait for output, and return. Will
sleep in a linear delay until the job is done, starting with 1 second.
Expand Down Expand Up @@ -188,12 +226,12 @@ async def async_run_job_and_wait(self, job_name, job_input, spinner=None, _ssl=N
if status.get("status") != "Completed":
# write failed output to log file before raising exception
response = await self.async_get_output(job_create_response.get("uid"))
with open("logs/publish.log", 'a') as f:
log_msg = {'timestamp': datetime.now(pytz.UTC).isoformat()}
log_msg.update(response)
f.write(json.dumps(log_msg, separators=(',', ':')))
f.write('\n')
with open("logs/publish.log", "a") as f:
log_msg = {"timestamp": datetime.now(pytz.UTC).isoformat()}
log_msg.update(response)
f.write(json.dumps(log_msg, separators=(",", ":")))
f.write("\n")

raise Exception(f"Job status not complete: {status.get('status')}")

response = await self.async_get_output(job_create_response.get("uid"))
Expand Down
Loading