From 304a760482455bd4e75e4078613474f3072785e4 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 18 Sep 2023 10:41:42 +0000 Subject: [PATCH 1/2] added parallel job for datapipe --- CHANGELOG.md | 1 + datapipe/store/filedir.py | 145 ++++++++++++++------------------------ 2 files changed, 52 insertions(+), 94 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ff06e72..c728dd38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # 0.13.2 * Add `GPU` support for RayExecutor +* Added parallel job for `TableStoreFiledir` # 0.13.1 diff --git a/datapipe/store/filedir.py b/datapipe/store/filedir.py index a8e62feb..e4a821e8 100644 --- a/datapipe/store/filedir.py +++ b/datapipe/store/filedir.py @@ -15,6 +15,7 @@ from datapipe.run_config import RunConfig from datapipe.store.table_store import TableStore from datapipe.types import DataDF, DataSchema, IndexDF, MetaSchema +from joblib import Parallel, delayed class ItemStoreFileAdapter(ABC): @@ -71,9 +72,7 @@ def _pattern_to_attrnames(pat: str) -> List[str]: assert len(attrnames) > 0, "The scheme is not valid." if len(attrnames) >= 2: duplicates_attrnames = list(duplicates(attrnames)) - assert ( - len(duplicates_attrnames) == 0 - ), f"Some keys are repeated: {duplicates_attrnames}. Rename them." + assert len(duplicates_attrnames) == 0, f"Some keys are repeated: {duplicates_attrnames}. Rename them." return attrnames @@ -81,26 +80,16 @@ def _pattern_to_attrnames(pat: str) -> List[str]: def _pattern_to_patterns_or(pat) -> List[str]: pattern_or = re.compile(r"(?P\(([a-zA-Z0-9]+\|)+[a-zA-Z0-9]+\))") # Ищем вхождения вида (aaa|bbb|ccc), в виду list of list [[aaa, bbb, ccc], [ddd, eee], ...] - values = [ - list(dict.fromkeys(match.group("or")[1:-1].split("|"))) - for match in pattern_or.finditer(pat) - ] + values = [list(dict.fromkeys(match.group("or")[1:-1].split("|"))) for match in pattern_or.finditer(pat)] # Всевозможные комбинации для замены [[aaa, ddd], [aaa, eee], [bbb, ddd], ...] - possible_combinatios_values = [ - list(combination) for combination in itertools.product(*values) - ] + possible_combinatios_values = [list(combination) for combination in itertools.product(*values)] # Получаем всевозможные списки шаблонов из комбинаций - filename_patterns = [ - re.sub(pattern_or, Replacer(combination), pat) - for combination in possible_combinatios_values - ] + filename_patterns = [re.sub(pattern_or, Replacer(combination), pat) for combination in possible_combinatios_values] return filename_patterns def _pattern_to_glob(pat: str) -> str: - return re.sub( - r"\{([^/]+?)\}", "*", pat - ) # Меняем все вхождения {id1}_{id2} в звездочки *_* + return re.sub(r"\{([^/]+?)\}", "*", pat) # Меняем все вхождения {id1}_{id2} в звездочки *_* def _pattern_to_match(pat: str) -> str: @@ -108,12 +97,8 @@ def _pattern_to_match(pat: str) -> str: # * -> r'[^/]+' # ** -> r'([^/]+/)*?[^/]+' - pat = re.sub( - r"\*\*?", r"([^/]+/)*[^/]+", pat - ) # Меняем все вхождения * и ** в произвольные символы - pat = re.sub( - r"\{([^/]+?)\}", r"(?P<\1>[^/]+?)", pat - ) # Меняем все вхождения вида {id} на непустые послед. символов + pat = re.sub(r"\*\*?", r"([^/]+/)*[^/]+", pat) # Меняем все вхождения * и ** в произвольные символы + pat = re.sub(r"\{([^/]+?)\}", r"(?P<\1>[^/]+?)", pat) # Меняем все вхождения вида {id} на непустые послед. символов pat = f"{pat}\\Z" # Учитываем конец строки return pat @@ -139,6 +124,7 @@ def __init__( readonly: Optional[bool] = None, enable_rm: bool = False, fsspec_kwargs: Dict[str, Any] = {}, + max_workers: int = 16, ): """ При построении `TableStoreFiledir` есть два способа указать схему @@ -176,6 +162,7 @@ def __init__( enable_rm -- если True, включить удаление файлов fsspec_kwargs -- kwargs для fsspec + max_workers -- число воркеров для чтения/записей/удаления файлов """ self.fsspec_kwargs = fsspec_kwargs @@ -184,9 +171,7 @@ def __init__( self.protocol = self.fsspec_kwargs["protocol"] self.filesystem = fsspec.filesystem(**self.fsspec_kwargs) else: - self.filesystem = fsspec.filesystem( - protocol=self.protocol, **self.fsspec_kwargs - ) + self.filesystem = fsspec.filesystem(protocol=self.protocol, **self.fsspec_kwargs) if self.protocol is None or self.protocol == "file": filename_pattern = str(Path(path).resolve()) @@ -208,17 +193,13 @@ def __init__( # Multiply extensions check if len(self.filename_glob) >= 2: if readonly is not None and not readonly: - raise ValueError( - "When `readonly=False`, in filename_pattern shouldn't be several extensions." - ) + raise ValueError("When `readonly=False`, in filename_pattern shouldn't be several extensions.") elif readonly: readonly = True # Any * and ** pattern check if "*" in path: if readonly is not None and not readonly: - raise ValueError( - "When `readonly=False`, in filename_pattern shouldn't be any `*` characters." - ) + raise ValueError("When `readonly=False`, in filename_pattern shouldn't be any `*` characters.") elif readonly is None: readonly = True elif readonly is None: @@ -235,22 +216,14 @@ def __init__( if primary_schema is not None: assert sorted(self.attrnames) == sorted(i.name for i in primary_schema) - assert all( - [ - isinstance(column.type, (String, Integer)) - for column in primary_schema - ] - ) + assert all([isinstance(column.type, (String, Integer)) for column in primary_schema]) self.primary_schema = primary_schema else: - self.primary_schema = [ - Column(attrname, String, primary_key=True) - for attrname in self.attrnames - ] + self.primary_schema = [Column(attrname, String, primary_key=True) for attrname in self.attrnames] self.attrname_to_cls = { - column.name: type_to_cls[type(column.type)] # type: ignore - for column in self.primary_schema + column.name: type_to_cls[type(column.type)] for column in self.primary_schema # type: ignore } + self.max_workers = max_workers def get_primary_schema(self) -> DataSchema: return self.primary_schema @@ -264,29 +237,27 @@ def delete_rows(self, idx: IndexDF) -> None: assert not self.readonly + filepaths = [] for row_idx in idx.index: attrnames_series = idx.loc[row_idx, self.attrnames] assert isinstance(attrnames_series, pd.Series) attrnames = cast(List[str], attrnames_series.tolist()) - _, path = fsspec.core.split_protocol( - self._filenames_from_idxs_values(attrnames)[0] - ) - self.filesystem.rm(path) + _, filepath = fsspec.core.split_protocol(self._filenames_from_idxs_values(attrnames)[0]) + filepaths.append(filepath) + + Parallel(n_jobs=self.max_workers, prefer="threads")( + delayed(self.filesystem.rm)(filepath) for filepath in filepaths + ) def _filenames_from_idxs_values(self, idxs_values: List[str]) -> List[str]: - return [ - re.sub(r"\{([^/]+?)\}", Replacer(idxs_values), pat) - for pat in self.filename_patterns - ] + return [re.sub(r"\{([^/]+?)\}", Replacer(idxs_values), pat) for pat in self.filename_patterns] def _idxs_values_from_filepath(self, filepath: str) -> Dict[str, Any]: _, filepath = fsspec.core.split_protocol(filepath) m = re.match(self.filename_match, filepath) - assert ( - m is not None - ), f"Filepath {filepath} does not match the pattern {self.filename_match}" + assert m is not None, f"Filepath {filepath} does not match the pattern {self.filename_match}" data = {} for attrname in self.attrnames: @@ -297,9 +268,7 @@ def _idxs_values_from_filepath(self, filepath: str) -> Dict[str, Any]: def _assert_key_values(self, filepath: str, idxs_values: List[str]): idx_data = self._idxs_values_from_filepath(filepath) idxs_values_np = np.array(idxs_values) - idxs_values_parsed_from_filepath = np.array( - [idx_data[attrname] for attrname in self.attrnames] - ) + idxs_values_parsed_from_filepath = np.array([idx_data[attrname] for attrname in self.attrnames]) assert len(idxs_values_np) == len(idxs_values_parsed_from_filepath) and np.all( idxs_values_np == idxs_values_parsed_from_filepath @@ -310,9 +279,7 @@ def _assert_key_values(self, filepath: str, idxs_values: List[str]): f"{idxs_values_np=} not equals {idxs_values_parsed_from_filepath=}", ) - def insert_rows( - self, df: pd.DataFrame, adapter: Optional[ItemStoreFileAdapter] = None - ) -> None: + def insert_rows(self, df: pd.DataFrame, adapter: Optional[ItemStoreFileAdapter] = None) -> None: if df.empty: return assert not self.readonly @@ -320,11 +287,10 @@ def insert_rows( adapter = self.adapter # WARNING: Здесь я поставил .drop(columns=self.attrnames), тк ключи будут хранится снаружи, в имени + filepaths, datas = [], [] for row_idx, data in zip( df.index, - cast( - List[Dict[str, Any]], df.drop(columns=self.attrnames).to_dict("records") - ), + cast(List[Dict[str, Any]], df.drop(columns=self.attrnames).to_dict("records")), ): attrnames_series = df.loc[row_idx, self.attrnames] assert isinstance(attrnames_series, pd.Series) @@ -334,10 +300,17 @@ def insert_rows( # Проверяем, что значения ключей не приведут к неоднозначному результату при парсинге регулярки self._assert_key_values(filepath, idxs_values) + filepaths.append(filepath) + datas.append(data) + def _write_filepath(filepath: str, data: Any): with fsspec.open(filepath, f"w{self.adapter.mode}") as f: self.adapter.dump(data, f) + Parallel(n_jobs=self.max_workers, prefer="threads")( + delayed(_write_filepath)(filepath, data) for filepath, data in zip(filepaths, datas) + ) + def _read_rows_fast( self, idx: IndexDF, @@ -351,9 +324,7 @@ def _read_rows_fast( attrnames = cast(List[str], attrnames_series.tolist()) - _, path = fsspec.core.split_protocol( - self._filenames_from_idxs_values(attrnames)[0] - ) + _, path = fsspec.core.split_protocol(self._filenames_from_idxs_values(attrnames)[0]) res.loc[row_idx, "filepath"] = f"{self.protocol_str}{path}" @@ -370,37 +341,25 @@ def read_rows( if adapter is None: adapter = self.adapter - if ( - (not read_data) - and (len(self.filename_patterns) == 1) - and (idx is not None) - and self.add_filepath_column - ): + if (not read_data) and (len(self.filename_patterns) == 1) and (idx is not None) and self.add_filepath_column: return self._read_rows_fast(idx) - def _iterate_files(): + def _iterate_files() -> fsspec.core.OpenFile: if idx is None: - for file_open in fsspec.open_files( - self.filename_glob, f"r{adapter.mode}", **self.fsspec_kwargs - ): + for file_open in fsspec.open_files(self.filename_glob, f"r{adapter.mode}", **self.fsspec_kwargs): yield file_open else: filepaths_extenstions = [ - self._filenames_from_idxs_values(idx.loc[row_idx, self.attrnames]) - for row_idx in idx.index + self._filenames_from_idxs_values(idx.loc[row_idx, self.attrnames]) for row_idx in idx.index ] for filepaths in filepaths_extenstions: found_files = [ file_open - for file_open in fsspec.open_files( - filepaths, f"r{adapter.mode}", **self.fsspec_kwargs - ) + for file_open in fsspec.open_files(filepaths, f"r{adapter.mode}", **self.fsspec_kwargs) if self.filesystem.exists(file_open.path) ] if len(found_files) == 0: - raise FileNotFoundError( - f"No such file: {' or '.join(filepaths)}" - ) + raise FileNotFoundError(f"No such file: {' or '.join(filepaths)}") elif len(found_files) > 1: raise ValueError( f"Some files are duplitcated as indexes in filepaths: {found_files}. " @@ -409,17 +368,14 @@ def _iterate_files(): for file_open in found_files: yield file_open - df_records = [] - for file_open in _iterate_files(): + def get_data(file_open) -> Dict[str, Any]: with file_open as f: data = {} if read_data: data = adapter.load(f) - attrnames_in_data = [ - attrname for attrname in self.attrnames if attrname in data - ] + attrnames_in_data = [attrname for attrname in self.attrnames if attrname in data] assert len(attrnames_in_data) == 0, ( f"Found repeated keys inside data that are already used (from scheme): " f"{attrnames_in_data}. " @@ -435,8 +391,11 @@ def _iterate_files(): "Switch argument add_filepath_column to False or rename this key in input data." ) data["filepath"] = f"{self.protocol_str}{file_open.path}" + return data - df_records.append(data) + df_records = Parallel(n_jobs=self.max_workers, prefer="threads")( + delayed(get_data)(file_open) for file_open in _iterate_files() + ) df = pd.DataFrame(df_records) @@ -466,9 +425,7 @@ def read_rows_meta_pseudo_df( ukeys.append(files.fs.ukey(f.path)) filepaths.append(f"{self.protocol_str}{f.path}") - keys_values = [ - (ids[attrname][i] for attrname in self.attrnames) for i in range(len(ukeys)) - ] + keys_values = [(ids[attrname][i] for attrname in self.attrnames) for i in range(len(ukeys))] duplicates_keys_values = list(duplicates(keys_values)) assert len(duplicates_keys_values) == 0, ( f"Some files are duplitcated as indexes in filepaths: {duplicates_keys_values}. " From d0db32c0aa69b87422e8ab7ba165aa03f8a720c6 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Wed, 20 Sep 2023 15:09:47 +0400 Subject: [PATCH 2/2] joblib extra --- pyproject.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index be885180..55ed55d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ sphinx = {version = ">=4.5,<6.0", optional = true} myst-parser = {version = ">=0.17.2,<1.1.0", optional = true} ray = {version = "^2.5.0", optional = true, extras = ["default"]} +joblib = {version = "^1.3.2", optional = true} [tool.poetry.extras] @@ -62,6 +63,7 @@ s3fs = ["s3fs"] redis = ["redis"] qdrant = ["qdrant-client"] ray = ["ray"] +joblib = ["joblib"] docs = ["sphinx", "myst-parser"]