Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dask error in DFP Integrated training pipeline #1931

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from morpheus.utils.column_info import PreparedDFInfo
from morpheus.utils.column_info import process_dataframe
from morpheus.utils.downloader import Downloader
from morpheus.utils.downloader import DownloadMethods

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -105,6 +106,9 @@ class FileToDFController:
Directory where cache will be stored.
timestamp_column_name : str
Name of the timestamp column.
download_method : typing.Union[DownloadMethods, str], optional, default = DownloadMethods.DASK_THREAD
The download method to use, if the `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable is set, it takes
presedence.
"""

def __init__(self,
Expand All @@ -113,7 +117,8 @@ def __init__(self,
file_type: FileTypes,
parser_kwargs: dict,
cache_dir: str,
timestamp_column_name: str):
timestamp_column_name: str,
download_method: typing.Union[DownloadMethods, str] = DownloadMethods.DASK_THREAD):

self._schema = schema
self._file_type = file_type
Expand All @@ -122,7 +127,7 @@ def __init__(self,
self._cache_dir = os.path.join(cache_dir, "file_cache")
self._timestamp_column_name = timestamp_column_name

self._downloader = Downloader()
self._downloader = Downloader(download_method=download_method)

def _get_or_create_dataframe_from_batch(
self, file_object_batch: typing.Tuple[fsspec.core.OpenFiles, int]) -> typing.Tuple[cudf.DataFrame, bool]:
Expand Down
4 changes: 3 additions & 1 deletion python/morpheus/morpheus/loaders/file_to_df_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from morpheus.controllers.file_to_df_controller import FileToDFController
from morpheus.messages import ControlMessage
from morpheus.messages.message_meta import MessageMeta
from morpheus.utils.downloader import DownloadMethods
from morpheus.utils.loader_ids import FILE_TO_DF_LOADER
from morpheus.utils.loader_utils import register_loader

Expand Down Expand Up @@ -97,7 +98,8 @@ def file_to_df_loader(control_message: ControlMessage, task: dict):
file_type=file_type,
parser_kwargs=parser_kwargs,
cache_dir=cache_dir,
timestamp_column_name=timestamp_column_name)
timestamp_column_name=timestamp_column_name,
download_method=DownloadMethods.SINGLE_THREAD)
pdf = controller.convert_to_dataframe(file_object_batch=(fsspec.open_files(files), n_groups))
df = cudf.from_pandas(pdf)

Expand Down
Loading