Skip to content

Commit

Permalink
Added multi threading
Browse files Browse the repository at this point in the history
Added multi threading for convert method. 
Each patient will be converted in a single thread.
  • Loading branch information
ArthurRochette authored Jan 6, 2025
1 parent 337d837 commit eafb464
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions pydicer/convert/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import shutil
from pathlib import Path
from typing import Union
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
import numpy as np
import SimpleITK as sitk
import pydicom
import matplotlib
from tqdm import tqdm

from platipy.dicom.io.rtdose_to_nifti import convert_rtdose
from pydicer.config import PyDicerConfig
Expand Down Expand Up @@ -319,14 +321,16 @@ def add_entry(self, entry: dict):
df_pat_data = df_pat_data.reset_index(drop=True)
df_pat_data.to_csv(converted_df_path)

def convert(self, patient: Union[str, list]=None, force: bool=True):
def convert(self, patient: Union[str, list]=None, force: bool=True, max_workers: int=1):
"""Converts the DICOM which was preprocessed into the pydicer output directory.
Args:
patient (str|list, optional): Patient ID or list of patient IDs to convert. Defaults to
None.
force (bool, optional): When True objects will be converted even if the output files
already exist. Defaults to True.
max_workers (int, optional): The maximum number of workers to use for the conversion.
Defaults to 1.
"""

# Create the output directory if it hasn't already been created
Expand All @@ -342,12 +346,22 @@ def convert(self, patient: Union[str, list]=None, force: bool=True):
patient = [patient]

df_preprocess = df_preprocess[df_preprocess["patient_id"].isin(patient)]

for key, df_files in get_iterator(
df_preprocess.groupby(["patient_id", "modality", "series_uid"]),
unit="objects",
name="convert",
):


with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
total_tasks = len(df_preprocess.groupby(["patient_id", "modality", "series_uid"]))

with tqdm(total=total_tasks, desc="Conversion Progress") as pbar:
for key, df_files in df_preprocess.groupby(["patient_id", "modality", "series_uid"]):
futures.append(executor.submit(
self.__convert_task, key, df_files, df_preprocess, force, config
))

for future in as_completed(futures):
pbar.update(1)

def __convert_task(self, key, df_files, df_preprocess, force, config):
patient_id, _, series_uid = key

logger.info("Converting data for patient: %s", patient_id)
Expand Down

0 comments on commit eafb464

Please sign in to comment.