From 3fc4258835a96fb2d493c60cc22f7a1c73330d90 Mon Sep 17 00:00:00 2001 From: "jiarui.wang" Date: Tue, 8 Apr 2025 19:24:34 +0800 Subject: [PATCH] estimator function --- flaml/automl/automl.py | 227 ++++++++++++++++++++++------------------- 1 file changed, 120 insertions(+), 107 deletions(-) diff --git a/flaml/automl/automl.py b/flaml/automl/automl.py index d2d9f98956..ed7bddf63a 100644 --- a/flaml/automl/automl.py +++ b/flaml/automl/automl.py @@ -2502,6 +2502,122 @@ def _search_sequential(self): if time_left < time_ensemble < 2 * time_left: break + def build_ensemble(self, automl_object = None): + + if automl_object is not None: + self.__dict__.update(automl_object.__dict__) + + if not self._ensemble or self._state.task not in ("binary", "multiclass", "regression"): + logger.info("Ensemble is disabled or task type is not supported.") + return + + search_states = list(x for x in self._search_states.items() if x[1].best_config) + search_states.sort(key = lambda x: x[1].best_loss) + estimators = [ + ( + x[0], + x[1].learner_class( + task = self._state.task, + n_jobs = self._state.n_jobs, + **AutoMLState.sanitize(x[1].best_config), + ), + ) + for x in search_states[:2] + ] + estimators += [ + ( + x[0], + x[1].learner_class( + task = self._state.task, + n_jobs = self._state.n_jobs, + **AutoMLState.sanitize(x[1].best_config), + ), + ) + for x in search_states[2:] + if x[1].best_loss < 4 * self._selected.best_loss + ] + logger.info([(estimator[0], estimator[1].params) for estimator in estimators]) + + if len(estimators) <= 1: + logger.info("Not enough estimators to build an ensemble.") + return + + if self._state.task in CLASSIFICATION: + from sklearn.ensemble import StackingClassifier as Stacker + else: + from sklearn.ensemble import StackingRegressor as Stacker + + if self._use_ray is not False: + import ray + n_cpus = ray.is_initialized() and ray.available_resources()["CPU"] or os.cpu_count() + elif self._use_spark: + from flaml.tune.spark.utils import get_n_cpus + n_cpus = get_n_cpus() + else: + n_cpus = os.cpu_count() + + ensemble_n_jobs = ( + -self._state.n_jobs + if abs(self._state.n_jobs) == 1 + else max(1, int(n_cpus / 2 / self._state.n_jobs)) + ) + + if isinstance(self._ensemble, dict): + final_estimator = self._ensemble.get("final_estimator", self._trained_estimator) + passthrough = self._ensemble.get("passthrough", True) + ensemble_n_jobs = self._ensemble.get("n_jobs", ensemble_n_jobs) + else: + final_estimator = self._trained_estimator + passthrough = True + + stacker = Stacker( + estimators, + final_estimator = final_estimator, + n_jobs = ensemble_n_jobs, + passthrough = passthrough, + ) + + sample_weight_dict = ( + {"sample_weight": self._sample_weight_full} if self._sample_weight_full is not None else {} + ) + + try: + logger.info("Building ensemble with tuned estimators") + stacker.fit( + self._X_train_all, + self._y_train_all, + **sample_weight_dict, + ) + logger.info(f"Ensemble built successfully: {stacker}") + self._trained_estimator = stacker + self._trained_estimator.model = stacker + except ValueError as e: + if passthrough: + logger.warning( + "Using passthrough=False for ensemble because the data contain categorical features." + ) + stacker = Stacker( + estimators, + final_estimator = final_estimator, + n_jobs = ensemble_n_jobs, + passthrough = False, + ) + stacker.fit( + self._X_train_all, + self._y_train_all, + **sample_weight_dict, + ) + logger.info(f"Ensemble built successfully with passthrough=False: {stacker}") + self._trained_estimator = stacker + self._trained_estimator.model = stacker + else: + raise e + except joblib.externals.loky.process_executor.TerminatedWorkerError: + logger.error( + "Not enough memory to build the ensemble. " + "Please try increasing available RAM, decreasing n_jobs for ensemble, or disabling ensemble." + ) + def _search(self): # initialize the search_states self._eci = [] @@ -2548,115 +2664,12 @@ def _search(self): self.modelcount = sum(search_state.total_iter for search_state in self._search_states.values()) if self._trained_estimator: logger.info(f"selected model: {self._trained_estimator.model}") - estimators = [] - if self._ensemble and self._state.task in ( - "binary", - "multiclass", - "regression", - ): - search_states = list(x for x in self._search_states.items() if x[1].best_config) - search_states.sort(key=lambda x: x[1].best_loss) - estimators = [ - ( - x[0], - x[1].learner_class( - task=self._state.task, - n_jobs=self._state.n_jobs, - **AutoMLState.sanitize(x[1].best_config), - ), - ) - for x in search_states[:2] - ] - estimators += [ - ( - x[0], - x[1].learner_class( - task=self._state.task, - n_jobs=self._state.n_jobs, - **AutoMLState.sanitize(x[1].best_config), - ), - ) - for x in search_states[2:] - if x[1].best_loss < 4 * self._selected.best_loss - ] - logger.info([(estimator[0], estimator[1].params) for estimator in estimators]) - if len(estimators) > 1: - if self._state.task.is_classification(): - from sklearn.ensemble import StackingClassifier as Stacker - else: - from sklearn.ensemble import StackingRegressor as Stacker - if self._use_ray is not False: - import ray - - n_cpus = ray.is_initialized() and ray.available_resources()["CPU"] or os.cpu_count() - elif self._use_spark: - from flaml.tune.spark.utils import get_n_cpus - n_cpus = get_n_cpus() - else: - n_cpus = os.cpu_count() - ensemble_n_jobs = ( - -self._state.n_jobs # maximize total parallelization degree - if abs(self._state.n_jobs) == 1 # 1 and -1 correspond to min/max parallelization - else max(1, int(n_cpus / 2 / self._state.n_jobs)) - # the total degree of parallelization = parallelization degree per estimator * parallelization degree of ensemble - ) - if isinstance(self._ensemble, dict): - final_estimator = self._ensemble.get("final_estimator", self._trained_estimator) - passthrough = self._ensemble.get("passthrough", True) - ensemble_n_jobs = self._ensemble.get("n_jobs", ensemble_n_jobs) - else: - final_estimator = self._trained_estimator - passthrough = True - stacker = Stacker( - estimators, - final_estimator, - n_jobs=ensemble_n_jobs, - passthrough=passthrough, - ) - sample_weight_dict = ( - (self._sample_weight_full is not None) and {"sample_weight": self._sample_weight_full} or {} - ) - for e in estimators: - e[1].__class__.init() - import joblib + if len(estimators) > 1 and self._ensemble: + self.build_ensemble() + else: + logger.info("Skipping ensemble building due to insufficient estimators or disabled ensemble.") - try: - logger.info("Building ensemble with tuned estimators") - stacker.fit( - self._X_train_all, - self._y_train_all, - **sample_weight_dict, # NOTE: _search is after kwargs is updated to fit_kwargs_by_estimator - ) - logger.info(f"ensemble: {stacker}") - self._trained_estimator = stacker - self._trained_estimator.model = stacker - except ValueError as e: - if passthrough: - logger.warning( - "Using passthrough=False for ensemble because the data contain categorical features." - ) - stacker = Stacker( - estimators, - final_estimator, - n_jobs=self._state.n_jobs, - passthrough=False, - ) - stacker.fit( - self._X_train_all, - self._y_train_all, - **sample_weight_dict, # NOTE: _search is after kwargs is updated to fit_kwargs_by_estimator - ) - logger.info(f"ensemble: {stacker}") - self._trained_estimator = stacker - self._trained_estimator.model = stacker - else: - raise e - except joblib.externals.loky.process_executor.TerminatedWorkerError: - logger.error( - "No enough memory to build the ensemble." - " Please try increasing available RAM, decreasing n_jobs for ensemble, or disabling ensemble." - ) elif self._state.retrain_final: # reset time budget for retraining if self._max_iter > 1: