From 9d1debee4e305ca9cd6bd1ba7b68fe827a1da6c8 Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Tue, 22 Apr 2025 13:08:30 -0700 Subject: [PATCH 1/6] update versions --- Dockerfile | 2 +- requirements.txt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index ad36fc3..a5d3d66 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,7 +37,7 @@ RUN mkdir ~/.aws ~/.gen3 /root/studies RUN git clone https://github.com/bmeg/iceberg.git && \ cd iceberg && \ - git checkout feature/FHIR-resource-type + git checkout 7f6cfdb558d05370fc645b5ab894b98b38a01e1b COPY . /root diff --git a/requirements.txt b/requirements.txt index 7adde48..8123e45 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,7 +35,7 @@ fastapi uvicorn[standard] #aced submission -aced-submission==0.0.9rc37 +aced-submission==0.0.10rc11 #gen3 tracker -gen3-tracker==0.0.5rc11 +gen3-tracker==0.0.7rc13 From 557d1ec64100cb8d8bf7472877d9b6ee82912e1a Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Tue, 22 Apr 2025 16:08:51 -0700 Subject: [PATCH 2/6] fix chunk size arg --- bundle_service/processing/process_bundle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bundle_service/processing/process_bundle.py b/bundle_service/processing/process_bundle.py index d219a6c..e6f3ec0 100644 --- a/bundle_service/processing/process_bundle.py +++ b/bundle_service/processing/process_bundle.py @@ -116,7 +116,7 @@ async def process(rows: List[dict], project_id: str, access_token: str) -> list[ try: db = LocalFHIRDatabase(db_name=f"{temp_dir}/local_fhir.db") - db.bulk_insert_data(resources=get_project_data(await _get_grip_service_name(), await _get_grip_graph_name(), project_id, logs, access_token)) + db.bulk_insert_data(resources=get_project_data(await _get_grip_service_name(), await _get_grip_graph_name(), project_id, logs, access_token, 1024*1024)) index_generator_dict = { 'researchsubject': db.flattened_research_subjects, From 03bf0b83841db81326e89e712c2f063bdf5975fd Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Wed, 23 Apr 2025 08:55:02 -0700 Subject: [PATCH 3/6] update jsonschemagraph binary --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index a5d3d66..e059892 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,7 +42,7 @@ RUN git clone https://github.com/bmeg/iceberg.git && \ COPY . /root #Add jsonschemagraph exe to image -RUN wget https://github.com/bmeg/jsonschemagraph/releases/download/v0.0.2/jsonschemagraph-linux.amd64 -P /usr/local/bin/ +RUN wget https://github.com/bmeg/jsonschemagraph/releases/download/v0.0.3/jsonschemagraph-linux.amd64 -P /usr/local/bin/ RUN mv /usr/local/bin/jsonschemagraph-linux.amd64 /usr/local/bin/jsonschemagraph RUN chmod +x /usr/local/bin/jsonschemagraph ENV PATH="/usr/local/bin:$PATH" From 9cae42c3e65c38f72d3fa3b73194a811c3e030e0 Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Wed, 23 Apr 2025 11:02:29 -0700 Subject: [PATCH 4/6] update jsonschemagraph subprocess command --- bundle_service/processing/process_bundle.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bundle_service/processing/process_bundle.py b/bundle_service/processing/process_bundle.py index e6f3ec0..c64d2fe 100644 --- a/bundle_service/processing/process_bundle.py +++ b/bundle_service/processing/process_bundle.py @@ -109,7 +109,9 @@ async def process(rows: List[dict], project_id: str, access_token: str) -> list[ temp_file.close() if files_written: - subprocess.run(["jsonschemagraph", "gen-dir", "iceberg/schemas/graph", f"{temp_dir}", f"{temp_dir}/OUT", "--project_id", f"{project_id}", "--gzip_files"]) + program, project =project_id.split("-") + project_str_dict = f'{{"auth_resource_path":"/programs/{program}/projects/{project}"}}' + subprocess.run(["jsonschemagraph", "gen-dir", "iceberg/schemas/graph", f"{temp_dir}", f"{temp_dir}/OUT", "--extraArgs", f"{project_str_dict}", "--gzip_files"]) res = bulk_load(await _get_grip_service_name(), await _get_grip_graph_name(), project_id, f"{temp_dir}/OUT", logs, access_token) if int(res[0]["status"]) != 200: server_errors.append(res[0]["message"]) From 4e214a21b5c16f8eec8be9f2893347ca8dc427cf Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Wed, 23 Apr 2025 12:09:29 -0700 Subject: [PATCH 5/6] fix tests --- bundle_service/processing/process_bundle.py | 4 ++-- tests/server/test_bundle_service.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bundle_service/processing/process_bundle.py b/bundle_service/processing/process_bundle.py index c64d2fe..85106de 100644 --- a/bundle_service/processing/process_bundle.py +++ b/bundle_service/processing/process_bundle.py @@ -73,7 +73,7 @@ async def _can_create(access_token: str, project_id: str) -> bool | str | int: return True, f"HAS SERVICE create on resource {required_service}", None -async def process(rows: List[dict], project_id: str, access_token: str) -> list[str]: +async def process(rows: List[dict], project_id: str, access_token: str) -> List[str] | None: """Processes a bundle into a temp directory of NDJSON files that are compatible with existing loading functions @@ -111,7 +111,7 @@ async def process(rows: List[dict], project_id: str, access_token: str) -> list[ if files_written: program, project =project_id.split("-") project_str_dict = f'{{"auth_resource_path":"/programs/{program}/projects/{project}"}}' - subprocess.run(["jsonschemagraph", "gen-dir", "iceberg/schemas/graph", f"{temp_dir}", f"{temp_dir}/OUT", "--extraArgs", f"{project_str_dict}", "--gzip_files"]) + subprocess.run(["jsonschemagraph", "gen-dir", "iceberg/schemas/graph", f"{temp_dir}", f"{temp_dir}/OUT", "--extraArgs", project_str_dict, "--gzip_files"]) res = bulk_load(await _get_grip_service_name(), await _get_grip_graph_name(), project_id, f"{temp_dir}/OUT", logs, access_token) if int(res[0]["status"]) != 200: server_errors.append(res[0]["message"]) diff --git a/tests/server/test_bundle_service.py b/tests/server/test_bundle_service.py index f6b53e3..1ee7b1f 100644 --- a/tests/server/test_bundle_service.py +++ b/tests/server/test_bundle_service.py @@ -227,13 +227,13 @@ def test_write_bundle_simple_ok(valid_bundle, valid_patient): vertex_id = request_bundle["entry"][0]["resource"]["id"] project_id = request_bundle["identifier"]["value"] endpoint = endpoint_from_token(ACCESS_TOKEN) - result = requests.get(f"{endpoint}/grip/writer/graphql/CALIPER/get-vertex/{vertex_id}/{project_id}", + result = requests.get(f"{endpoint}/grip/writer/CALIPER/get-vertex/{vertex_id}/{project_id}", headers=HEADERS ).json() print("RESULT: ", result) print("ENTRY: ", request_bundle["entry"][0]["resource"]) - assert result['data']['gid'] == vertex_id + assert result['data']['id'] == vertex_id def test_write_bundle_missing_type(valid_bundle, valid_patient): From 0bb798528f20ed44100188ef3790128a1dc6068b Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Wed, 23 Apr 2025 12:43:14 -0700 Subject: [PATCH 6/6] Add more specific logging around subprocess command --- bundle_service/processing/process_bundle.py | 25 ++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/bundle_service/processing/process_bundle.py b/bundle_service/processing/process_bundle.py index 85106de..71dfb01 100644 --- a/bundle_service/processing/process_bundle.py +++ b/bundle_service/processing/process_bundle.py @@ -109,12 +109,27 @@ async def process(rows: List[dict], project_id: str, access_token: str) -> List[ temp_file.close() if files_written: - program, project =project_id.split("-") + program, project = project_id.split("-") project_str_dict = f'{{"auth_resource_path":"/programs/{program}/projects/{project}"}}' - subprocess.run(["jsonschemagraph", "gen-dir", "iceberg/schemas/graph", f"{temp_dir}", f"{temp_dir}/OUT", "--extraArgs", project_str_dict, "--gzip_files"]) - res = bulk_load(await _get_grip_service_name(), await _get_grip_graph_name(), project_id, f"{temp_dir}/OUT", logs, access_token) - if int(res[0]["status"]) != 200: - server_errors.append(res[0]["message"]) + print(f"Using project: {project_str_dict}") + result = subprocess.run( + ["jsonschemagraph", "gen-dir", "iceberg/schemas/graph", f"{temp_dir}", f"{temp_dir}/OUT", "--extraArgs", project_str_dict, "--gzip_files"], + capture_output=True, + text=True, + check=False + ) + if result.returncode == 0: + print("jsonschemagraph ran successfully.") + res = bulk_load(await _get_grip_service_name(), await _get_grip_graph_name(), project_id, f"{temp_dir}/OUT", logs, access_token) + if int(res[0]["status"]) != 200: + server_errors.append(res[0]["message"]) + else: + print(f"jsonschemagraph failed with exit code: {result.returncode}") + print("Stdout:") + print(result.stdout) + print("Stderr:") + print(result.stderr) + server_errors.append(f"jsonschemagraph failed: {result.stderr.strip()}") try: db = LocalFHIRDatabase(db_name=f"{temp_dir}/local_fhir.db")