diff --git a/python/morpheus/morpheus/controllers/file_to_df_controller.py b/python/morpheus/morpheus/controllers/file_to_df_controller.py index c5b7e9a6c2..c8478c6ce1 100644 --- a/python/morpheus/morpheus/controllers/file_to_df_controller.py +++ b/python/morpheus/morpheus/controllers/file_to_df_controller.py @@ -32,6 +32,7 @@ from morpheus.utils.column_info import PreparedDFInfo from morpheus.utils.column_info import process_dataframe from morpheus.utils.downloader import Downloader +from morpheus.utils.downloader import DownloadMethods logger = logging.getLogger(__name__) @@ -105,6 +106,9 @@ class FileToDFController: Directory where cache will be stored. timestamp_column_name : str Name of the timestamp column. + download_method : typing.Union[DownloadMethods, str], optional, default = DownloadMethods.DASK_THREAD + The download method to use, if the `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable is set, it takes + presedence. """ def __init__(self, @@ -113,7 +117,8 @@ def __init__(self, file_type: FileTypes, parser_kwargs: dict, cache_dir: str, - timestamp_column_name: str): + timestamp_column_name: str, + download_method: typing.Union[DownloadMethods, str] = DownloadMethods.DASK_THREAD): self._schema = schema self._file_type = file_type @@ -122,7 +127,7 @@ def __init__(self, self._cache_dir = os.path.join(cache_dir, "file_cache") self._timestamp_column_name = timestamp_column_name - self._downloader = Downloader() + self._downloader = Downloader(download_method=download_method) def _get_or_create_dataframe_from_batch( self, file_object_batch: typing.Tuple[fsspec.core.OpenFiles, int]) -> typing.Tuple[cudf.DataFrame, bool]: diff --git a/python/morpheus/morpheus/loaders/file_to_df_loader.py b/python/morpheus/morpheus/loaders/file_to_df_loader.py index 27b162d130..de4495496a 100644 --- a/python/morpheus/morpheus/loaders/file_to_df_loader.py +++ b/python/morpheus/morpheus/loaders/file_to_df_loader.py @@ -24,6 +24,7 @@ from morpheus.controllers.file_to_df_controller import FileToDFController from morpheus.messages import ControlMessage from morpheus.messages.message_meta import MessageMeta +from morpheus.utils.downloader import DownloadMethods from morpheus.utils.loader_ids import FILE_TO_DF_LOADER from morpheus.utils.loader_utils import register_loader @@ -97,7 +98,8 @@ def file_to_df_loader(control_message: ControlMessage, task: dict): file_type=file_type, parser_kwargs=parser_kwargs, cache_dir=cache_dir, - timestamp_column_name=timestamp_column_name) + timestamp_column_name=timestamp_column_name, + download_method=DownloadMethods.SINGLE_THREAD) pdf = controller.convert_to_dataframe(file_object_batch=(fsspec.open_files(files), n_groups)) df = cudf.from_pandas(pdf)