Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactoring] More ruff/mypy/djlint fixes #38

Merged
merged 11 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 81 additions & 90 deletions python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
import datetime
import hashlib
import json
import logging
import os
import re
import secrets
import shutil
import sys
import time
from functools import lru_cache
from glob import glob
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand All @@ -22,11 +22,11 @@
import numpy as np
import pandas as pd
import pydicom
import tensorflow as tf
import torch
from fastapi import Body, FastAPI, UploadFile
from fastapi.responses import FileResponse
from fastapi import Body, FastAPI, Request, UploadFile
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from PIL import Image
from pydantic import BaseModel
from pydicom.errors import InvalidDicomError
Expand Down Expand Up @@ -120,12 +120,12 @@ def clean_imgs() -> None:
dp, _, fps = next(iter(os.walk("./tmp/session-data/raw")))
for fp in fps:
if fp != ".gitkeep":
os.remove(dp + "/" + fp)
fps = glob("./tmp/session-data/clean/*")
Path(dp + "/" + fp).unlink()
fps = list(Path("./tmp/session-data/clean").glob("*")) # type: ignore[arg-type]
for fp in fps:
if fp.split(".")[-1] == "png":
os.remove(fp)
if os.path.exists("./tmp/session-data/clean/de-identified-files"):
if str(fp).split(".")[-1] == "png":
Path(fp).unlink()
if Path("./tmp/session-data/clean/de-identified-files").exists():
shutil.rmtree("./tmp/session-data/clean/de-identified-files")


Expand Down Expand Up @@ -156,13 +156,14 @@ def dcm2dictmetadata(ds: pydicom.dataset.Dataset) -> dict[str, dict[str, str]]:


app = FastAPI()
app.mount(path="/static", app=StaticFiles(directory="static"), name="sttc")
templates = Jinja2Templates(directory="templates")
app.mount(path="/static", app=StaticFiles(directory="static"), name="static")


@app.get("/")
async def get_root() -> FileResponse:
@app.get("/", response_class=HTMLResponse)
async def get_root(request: Request) -> HTMLResponse:
clean_all()
return FileResponse("./templates/index.html")
return templates.TemplateResponse("index.html", {"request": request})


class MedsamLite(nn.Module):
Expand Down Expand Up @@ -292,18 +293,18 @@ def image_preprocessing(


@lru_cache(maxsize=2**32)
def cache_bbox_img(dcm_hash):
fp = os.path.join("./tmp/session-data/clean", dcm_hash + "_bbox.png")
if not os.path.exists(fp):
return None
def cache_bbox_img(dcm_hash: str) -> str:
fp = Path("./tmp/session-data/clean") / (dcm_hash + "_bbox.png")
if not fp.exists():
return None # type: ignore[return-value]
bbox_pil_img = Image.open(fp)
bbox_img_buf = BytesIO()
bbox_pil_img.save(bbox_img_buf, format="PNG")
return base64.b64encode(bbox_img_buf.getvalue()).decode("utf-8")


@app.post("/conversion_info")
async def conversion_info(dicom_pair_fp: list[str] = Body(...)):
async def conversion_info(dicom_pair_fp: list[str]) -> dict: # type: ignore[type-arg]
dcm_hash = dicom_pair_fp[1].split("/")[-1].split(".")[0]
downscale_dimensionality = 1024
raw_dcm = pydicom.dcmread(dicom_pair_fp[0])
Expand Down Expand Up @@ -361,7 +362,7 @@ async def modify_dicom(data: DicomData) -> ModifyResponse:
return ModifyResponse(success=True)


@app.post("/upload_files/")
@app.post("/upload_files/", name="upload_files")
async def get_files(files: list[UploadFile]) -> UploadFilesResponse:
clean_imgs()
proper_dicom_paths = []
Expand Down Expand Up @@ -437,7 +438,8 @@ def segment_sequence_homogeneity_check(fps: list[str]) -> bool:
found_classes = dcm.SegmentSequence[0].SegmentDescription.split(";")
if len(found_classes) != (len(np.unique(mask))):
return False
except: # noqa: E722
except Exception:
logging.exception("Exception occurred")
return False
if (
len(set(found_classes)) > 1
Expand Down Expand Up @@ -484,14 +486,14 @@ async def align_classes(classes: list[str]) -> None:
renew_segm_seq(fps, classes) # type: ignore[arg-type]


@app.post("/session")
@app.post("/session", name="session")
async def handle_session_button_click(session_dict: dict[str, Any]) -> None:
session_fp = Path("./tmp/session-data/session.json")
with session_fp.open("w") as file:
json.dump(session_dict, file)


@app.post("/custom_config/")
@app.post("/custom_config/", name="custom_config")
async def custom_config(config_file: UploadFile) -> None:
contents = await config_file.read()
custom_fp = Path("./tmp/session-data/custom-config.csv")
Expand Down Expand Up @@ -658,22 +660,22 @@ def bbox_area_remover(
x2, y2 = bbox[2, 0 : (1 + 1)]
x3, y3 = bbox[3, 0 : (1 + 1)]
rectangle = np.array([[[x0, y0], [x1, y1], [x2, y2], [x3, y3]]], dtype=np.int32)
cv2.polylines(bbox_mask, rectangle, True, 1, 2)
cv2.fillPoly(multiplicative_mask, rectangle, 0)
multiplicative_mask = cv2.resize(
cv2.polylines(bbox_mask, rectangle, isClosed=True, color=1, thickness=2) # type: ignore[call-overload]
cv2.fillPoly(multiplicative_mask, rectangle, 0) # type: ignore[call-overload]
multiplicative_mask = cv2.resize( # type: ignore[assignment]
multiplicative_mask,
(initial_array_shape[1], initial_array_shape[0]), # type: ignore[misc]
interpolation=cv2.INTER_NEAREST,
)
bbox_mask = cv2.resize(
bbox_mask = cv2.resize( # type: ignore[assignment]
bbox_mask,
(initial_array_shape[1], initial_array_shape[0]),
(initial_array_shape[1], initial_array_shape[0]), # type: ignore[misc]
interpolation=cv2.INTER_NEAREST,
)
additive_mask = reducted_region_color * (multiplicative_mask == 0)
cleaned_img = img * multiplicative_mask + additive_mask
bbox_img = np.maximum(bbox_mask * np.max(img), img)
return cleaned_img, bbox_img
return cleaned_img, bbox_img # type: ignore[return-value]


def image_deintentifier(
Expand Down Expand Up @@ -714,44 +716,46 @@ def image_deintentifier(
dcm.PixelData = cleaned_img.tobytes()
else:
pass
return dcm, bbox_img
return dcm, bbox_img # type: ignore[return-value]


def merge_action(
primary_srs: pd.core.series.Series,
action2beassigned_srs: pd.core.series.Series,
) -> pd.core.series.Series:
return primary_srs.where(
cond=action2beassigned_srs.isna(),
other=action2beassigned_srs,
axis=0,
inplace=False,
)


def merge_with_custom_user_config_file(
requested_action_group_df: pd.core.frame.DataFrame,
custom_config_df: pd.core.frame.DataFrame,
) -> pd.core.frame.DataFrame:
valid_actions = {"X", "K", "C"}
if not set(custom_config_df["Action"]).issubset(valid_actions):
sys.exit()
requested_action_group_df = requested_action_group_df.merge(
custom_config_df[["Action"]],
left_index=True,
right_index=True,
how="left",
)
requested_action_group_df.loc[
requested_action_group_df["Action"].isin(["X", "K", "C"]),
"Requested Action Group",
] = requested_action_group_df["Action"]
return requested_action_group_df.drop(columns=["Action"])

def get_action_group( # noqa: C901

def get_action_group(
user_input: dict[str, Any],
action_groups_df: pd.core.frame.DataFrame,
custom_config_df: pd.core.frame.DataFrame | None,
) -> pd.core.frame.DataFrame:
def merge_action(
primary_srs: pd.core.series.Series,
action2beassigned_srs: pd.core.series.Series,
) -> pd.core.series.Series:
return primary_srs.where(
cond=action2beassigned_srs.isna(),
other=action2beassigned_srs,
axis=0,
inplace=False,
)

def merge_with_custom_user_config_file(
requested_action_group_df: pd.core.frame.DataFrame,
custom_config_df: pd.core.frame.DataFrame,
) -> pd.core.frame.DataFrame:
valid_actions = {"X", "K", "C"}
if not set(custom_config_df["Action"]).issubset(valid_actions):
sys.exit()
requested_action_group_df = requested_action_group_df.merge(
custom_config_df[["Action"]],
left_index=True,
right_index=True,
how="left",
)
requested_action_group_df.loc[
requested_action_group_df["Action"].isin(["X", "K", "C"]),
"Requested Action Group",
] = requested_action_group_df["Action"]
return requested_action_group_df.drop(columns=["Action"])

requested_action_group_df = pd.DataFrame(
data=action_groups_df["Default"].to_list(),
columns=["Requested Action Group"],
Expand Down Expand Up @@ -908,7 +912,7 @@ def __init__(
in_dp = in_dp + "/"
self.raw_data_dp = in_dp
self.raw_dicom_paths = sorted(self.get_dicom_paths(data_dp=self.raw_data_dp))
self.dicom_pair_fps = []
self.dicom_pair_fps = [] # type: ignore[var-annotated]
self.out_dp = out_dp
self.clean_data_dp = out_dp + "/" + "de-identified-files/"
already_cleaned_dicom_paths = str(
Expand Down Expand Up @@ -937,11 +941,9 @@ def get_dicom_paths(
dicom_paths = list(Path(data_dp).rglob("*"))
proper_dicom_paths = []
for dicom_path in dicom_paths:
try:
pydicom.dcmread(dicom_path)
ds = pydicom.dcmread(dicom_path, stop_before_pixels=True)
if ds:
proper_dicom_paths.append(dicom_path)
except InvalidDicomError: # noqa: PERF203
continue
return proper_dicom_paths

def parse_file(
Expand All @@ -957,7 +959,7 @@ def parse_file(
def export_processed_file(
self, # noqa: ANN101
dcm: pydicom.dataset.FileDataset,
bbox_img,
bbox_img: NDArray[Any],
) -> None:
self.clean_dicom_dp = (
self.clean_data_dp
Expand All @@ -967,8 +969,8 @@ def export_processed_file(
+ "/"
+ str(dcm[0x0020, 0x0011].value)
)
if not os.path.exists(self.clean_dicom_dp):
os.makedirs(self.clean_dicom_dp)
if not Path(self.clean_dicom_dp).exists():
Path(self.clean_dicom_dp).mkdir(parents=True)
if bbox_img is not None:
bbox_img_fp = self.out_dp + "/" + self.input_dicom_hash + "_bbox" + ".png"
Image.fromarray(bbox_img).save(bbox_img_fp)
Expand All @@ -986,19 +988,10 @@ def export_session(
json.dump(session, file)


def dicom_deidentifier( # noqa: C901, PLR0912, PLR0915
def dicom_deidentifier( # noqa: PLR0912, PLR0915
session_filepath: None | str = None,
) -> tuple[dict[str, dict[str, str]], list[tuple[str]]]:
gpu = True
if not gpu:
tf.config.set_visible_devices([], "GPU")
elif (
len(tf.config.list_physical_devices("GPU")) == 0
or tf.config.list_physical_devices("GPU")[0][1] == "GPU"
):
pass
custom_fp = Path("./tmp/session-data/custom-config.csv")
if custom_fp.is_file():
if Path("./tmp/session-data/custom-config.csv").is_file():
custom_config_df = pd.read_csv(
filepath_or_buffer="./tmp/session-data/custom-config.csv",
index_col=0,
Expand All @@ -1013,12 +1006,10 @@ def dicom_deidentifier( # noqa: C901, PLR0912, PLR0915
if session_filepath is None or not Path(session_filepath).is_file():
session = {}
else:
session_fp = Path("./tmp/session-data/session.json")
with session_fp.open() as file:
with Path("./tmp/session-data/session.json").open() as file:
session = json.load(file)
user_fp = Path("./tmp/session-data/user-options.json")
if user_fp.is_file():
with user_fp.open() as file:
if Path("./tmp/session-data/user-options.json").is_file():
with Path("./tmp/session-data/user-options.json").open() as file:
user_input = json.load(file)
else:
sys.exit("E: No client de-identification configuration was provided")
Expand All @@ -1041,7 +1032,7 @@ def dicom_deidentifier( # noqa: C901, PLR0912, PLR0915
dcm = rw_obj.parse_file()
if dcm is False:
msg = "E: DICOM file is corrupted or missing"
raise Exception(msg)
raise ValueError(msg)
date_processing_choices = {"keep", "offset", "remove"}
if user_input["date_processing"] not in date_processing_choices:
msg = "E: Invalid date processing input"
Expand Down Expand Up @@ -1075,16 +1066,16 @@ def dicom_deidentifier( # noqa: C901, PLR0912, PLR0915
]
dcm = deidentification_attributes(user_input=user_input, dcm=dcm)
if user_input["clean_image"]:
dcm, bbox_img = image_deintentifier(dcm=dcm)
bbox_img = image_preprocessing(
bbox_img,
downscale_dimensionality=max(bbox_img.shape),
dcm, bbox_img = image_deintentifier(dcm=dcm) # type: ignore[assignment]
bbox_img = image_preprocessing( # type: ignore[assignment]
bbox_img, # type: ignore[arg-type]
downscale_dimensionality=max(bbox_img.shape), # type: ignore[attr-defined]
multichannel=True,
retain_aspect_ratio=True,
)
else:
bbox_img = None
rw_obj.export_processed_file(dcm=dcm, bbox_img=bbox_img)
rw_obj.export_processed_file(dcm=dcm, bbox_img=bbox_img) # type: ignore[arg-type]
rw_obj.export_session(session=session)
return session, rw_obj.dicom_pair_fps

Expand Down
Loading