From 8b3a6e4c5ccabd344b5c5b3871801e5014426285 Mon Sep 17 00:00:00 2001 From: kenneth Date: Fri, 5 Jul 2024 17:00:23 +0200 Subject: [PATCH 1/8] handle null features in fg insert --- python/hsfs/core/feature_group_engine.py | 2 +- python/hsfs/engine/python.py | 34 +++++++++++++++--------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 3e88805eda..8e6e84c5e2 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -88,7 +88,7 @@ def insert( validation_options: dict = None, ): dataframe_features = engine.get_instance().parse_schema_feature_group( - feature_dataframe, feature_group.time_travel_format + feature_dataframe, feature_group.time_travel_format, feature_group.features ) util.validate_embedding_feature_type( feature_group.embedding_index, dataframe_features diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index a755c58862..42cafc5b2e 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -768,7 +768,12 @@ def parse_schema_feature_group( self, dataframe: Union[pd.DataFrame, pl.DataFrame], time_travel_format: Optional[str] = None, + features: Optional[List[feature.Feature]] = None, ) -> List[feature.Feature]: + feature_type_map = {} + if features: + for _feature in features: + feature_type_map[_feature.name] = _feature.type if isinstance(dataframe, pd.DataFrame): arrow_schema = pa.Schema.from_pandas(dataframe, preserve_index=False) elif isinstance(dataframe, pl.DataFrame) or isinstance( @@ -776,12 +781,17 @@ def parse_schema_feature_group( ): arrow_schema = dataframe.to_arrow().schema features = [] - for feat_name in arrow_schema.names: + for i in range(len(arrow_schema.names)): + feat_name = arrow_schema.names[i] name = util.autofix_feature_name(feat_name) try: - converted_type = self._convert_pandas_dtype_to_offline_type( - arrow_schema.field(feat_name).type - ) + pd_type = arrow_schema.field(feat_name).type + if pd_type == "null" and feature_type_map.get(name): + converted_type = feature_type_map.get(name) + else: + converted_type = self._convert_pandas_dtype_to_offline_type( + pd_type + ) except ValueError as e: raise FeatureStoreException(f"Feature '{name}': {str(e)}") from e features.append(feature.Feature(name, converted_type)) @@ -1186,7 +1196,7 @@ def _apply_transformation_function( str, transformation_function_attached.TransformationFunctionAttached ], dataset: Union[pd.DataFrame, pl.DataFrame], - inplace=True, + inplace=False, ) -> Union[pd.DataFrame, pl.DataFrame]: for ( feature_name, @@ -1195,13 +1205,16 @@ def _apply_transformation_function( if isinstance(dataset, pl.DataFrame) or isinstance( dataset, pl.dataframe.frame.DataFrame ): + if inplace: + dataset = dataset.clone() dataset = dataset.with_columns( pl.col(feature_name).map_elements( transformation_fn.transformation_fn ) ) else: - dataset = pd.DataFrame.copy(dataset) + if inplace: + dataset = pd.DataFrame.copy(dataset) dataset[feature_name] = dataset[feature_name].map( transformation_fn.transformation_fn ) @@ -1615,15 +1628,10 @@ def get_feature_logging_df(fg, prediction[f.name] = Engine._cast_column_to_offline_type(prediction[f.name], f.type) if not set(prediction.columns).intersection(set(features.columns)): features = pd.concat([features, prediction], axis=1) - # need to case the column type as if it is None, type cannot be inferred. - features[td_col_name] = Engine._cast_column_to_offline_type( - pd.Series([training_dataset_version for _ in range(len(features))]), fg.get_feature(td_col_name).type - ) + features[td_col_name] = pd.Series([training_dataset_version for _ in range(len(features))]) # _cast_column_to_offline_type cannot cast string type features[model_col_name] = pd.Series([FeatureViewEngine.get_hsml_model_value(hsml_model) if hsml_model else None for _ in range(len(features))], dtype=pd.StringDtype()) now = datetime.now() - features[time_col_name] = Engine._cast_column_to_offline_type( - pd.Series([now for _ in range(len(features))]), fg.get_feature(time_col_name).type - ) + features[time_col_name] = pd.Series([now for _ in range(len(features))]) features["log_id"] = [str(uuid.uuid4()) for _ in range(len(features))] return features[[feat.name for feat in fg.features]] From d0d826d809fc745cee09dad9ea99ba5f3bb0d1ec Mon Sep 17 00:00:00 2001 From: kenneth Date: Mon, 8 Jul 2024 10:27:49 +0200 Subject: [PATCH 2/8] update logging, get_feature_logging_df in spark --- python/hsfs/core/feature_logging.py | 5 +++++ python/hsfs/core/feature_view_api.py | 2 +- python/hsfs/core/feature_view_engine.py | 30 ++++++++++++++----------- python/hsfs/engine/python.py | 14 ++++++------ python/hsfs/engine/spark.py | 2 +- python/hsfs/feature_view.py | 11 ++++++--- 6 files changed, 39 insertions(+), 25 deletions(-) diff --git a/python/hsfs/core/feature_logging.py b/python/hsfs/core/feature_logging.py index b29a7317dd..bd17a4aac6 100644 --- a/python/hsfs/core/feature_logging.py +++ b/python/hsfs/core/feature_logging.py @@ -26,6 +26,11 @@ def from_response_json(cls, json_dict: Dict[str, Any]) -> 'FeatureLogging': untransformed_features = FeatureGroup.from_response_json(untransformed_features) return cls(json_decamelized.get('id'), transformed_features, untransformed_features) + def update(self, others): + self._transformed_features = others._transformed_features + self._untransformed_features = others._untransformed_features + return self + @property def transformed_features(self) -> "feature_group.FeatureGroup": return self._transformed_features diff --git a/python/hsfs/core/feature_view_api.py b/python/hsfs/core/feature_view_api.py index 732160748d..47e6e61e09 100644 --- a/python/hsfs/core/feature_view_api.py +++ b/python/hsfs/core/feature_view_api.py @@ -460,4 +460,4 @@ def delete_feature_logs( path_params += [self._TRANSFORMED_lOG] else: path_params += [self._UNTRANSFORMED_LOG] - _client._send_request("DELETE", path_params, {}) + return _client._send_request("DELETE", path_params, {}) diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 9926a6fc01..a3834f7ee9 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -948,29 +948,32 @@ def get_feature_logging(self, fv): def _get_logging_fg(self, fv, transformed): feature_logging = self.get_feature_logging(fv) + return self._get_logging_fg_feature_logging(feature_logging, transformed) + + def _get_logging_fg_feature_logging(self, feature_logging, transformed): if transformed: return feature_logging.transformed_features else: return feature_logging.untransformed_features - def log_features(self, fv, features, prediction=None, transformed=False, write_options=None, training_dataset_version=None, hsml_model=None): + def log_features(self, fv, feature_logging, features_rows, prediction=None, transformed=False, write_options=None, training_dataset_version=None, hsml_model=None): default_write_options = { "start_offline_materialization": False, } if write_options: default_write_options.update(write_options) - fg = self._get_logging_fg(fv, transformed) + fg = self._get_logging_fg_feature_logging(feature_logging, transformed) df = engine.get_instance().get_feature_logging_df( - fg, - features, - [feature for feature in fv.features if not feature.label], - [feature for feature in fv.features if feature.label], - FeatureViewEngine._LOG_TD_VERSION, - FeatureViewEngine._LOG_TIME, - FeatureViewEngine._HSML_MODEL, - prediction, - training_dataset_version, - hsml_model, + features_rows, + fg=fg, + fg_features=[feature for feature in fv.features if not feature.label], + td_predictions=[feature for feature in fv.features if feature.label], + td_col_name=FeatureViewEngine._LOG_TD_VERSION, + time_col_name=FeatureViewEngine._LOG_TIME, + model_col_name=FeatureViewEngine._HSML_MODEL, + prediction=prediction, + training_dataset_version=training_dataset_version, + hsml_model=hsml_model, ) return fg.insert(df, write_options=default_write_options) @@ -1067,7 +1070,8 @@ def materialize_feature_logs(self, fv, wait): pass return jobs - def delete_feature_logs(self, fv, transformed): + def delete_feature_logs(self, fv, feature_logging, transformed): self._feature_view_api.delete_feature_logs( fv.name, fv.version, transformed ) + feature_logging.update(self.get_feature_logging(fv)) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 42cafc5b2e..43d36fea21 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1609,13 +1609,13 @@ def _convert_feature_log_to_df(feature_log, cols): return feature_log.copy(deep=False) @staticmethod - def get_feature_logging_df(fg, - features, - fg_features: List[TrainingDatasetFeature], - td_predictions: List[TrainingDatasetFeature], - td_col_name, - time_col_name, - model_col_name, + def get_feature_logging_df(features, + fg=None, + fg_features: List[TrainingDatasetFeature]=None, + td_predictions: List[TrainingDatasetFeature]=None, + td_col_name=None, + time_col_name=None, + model_col_name=None, prediction=None, training_dataset_version=None, hsml_model=None, diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 1a9fcd3872..5dfad8c428 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1403,7 +1403,7 @@ def is_connector_type_supported(type): return True @staticmethod - def get_feature_logging_df(features, prediction=None): + def get_feature_logging_df(features, prediction=None, **kwargs): # do not take prediction separately because spark ml framework usually return feature together with the prediction # and it is costly to join them back return features diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index d28b53da15..d931d5936f 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -146,6 +146,7 @@ def __init__( featurestore_id, self.ENTITY_TYPE ) self._logging_enabled = logging_enabled + self._feature_logging = None if self._id: self._init_feature_monitoring_engine() @@ -3463,7 +3464,9 @@ def enable_logging(self) -> None: # Raises `hsfs.client.exceptions.RestAPIError` in case the backend fails to enable feature logging. """ - return self._feature_view_engine.enable_feature_logging(self) + fv = self._feature_view_engine.enable_feature_logging(self) + self._feature_logging = self._feature_view_engine.get_feature_logging(fv) + return fv def log(self, features: Union[pd.DataFrame, list[list], np.ndarray], @@ -3503,7 +3506,7 @@ def log(self, ) self.enable_logging() return self._feature_view_engine.log_features( - self, features, predictions, transformed, + self, self._feature_logging, features, predictions, transformed, write_options, training_dataset_version=( training_dataset_version or self.get_last_accessed_training_dataset() @@ -3640,7 +3643,9 @@ def delete_log(self, transformed: Optional[bool]=None): # Raises `hsfs.client.exceptions.RestAPIError` in case the backend fails to delete the log. """ - return self._feature_view_engine.delete_feature_logs(self, transformed) + return self._feature_view_engine.delete_feature_logs( + self, self._feature_logging, transformed + ) @staticmethod def _update_attribute_if_present(this: "FeatureView", new: Any, key: str) -> None: From 817077a143e0507527941c9e2599537e6572ecfe Mon Sep 17 00:00:00 2001 From: kenneth Date: Mon, 8 Jul 2024 10:39:07 +0200 Subject: [PATCH 3/8] spark read log --- python/hsfs/core/feature_view_engine.py | 4 +--- python/hsfs/engine/python.py | 5 +++++ python/hsfs/engine/spark.py | 5 +++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index a3834f7ee9..8f5a6a2902 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -1000,9 +1000,7 @@ def read_feature_logs(self, fv, query = query.filter(fg.get_feature(FeatureViewEngine._HSML_MODEL) == self.get_hsml_model_value(hsml_model)) if filter: query = query.filter(self._convert_to_log_fg_filter(fg, fv, filter, fv_feat_name_map)) - df = query.read() - df = df.drop(["log_id", FeatureViewEngine._LOG_TIME], axis=1) - return df + return engine.get_instance().read_feature_log(query) @staticmethod def get_hsml_model_value(hsml_model): diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 43d36fea21..1265bff453 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1635,3 +1635,8 @@ def get_feature_logging_df(features, features[time_col_name] = pd.Series([now for _ in range(len(features))]) features["log_id"] = [str(uuid.uuid4()) for _ in range(len(features))] return features[[feat.name for feat in fg.features]] + + @staticmethod + def read_feature_log(query): + df = query.read() + return df.drop(["log_id", FeatureViewEngine._LOG_TIME], axis=1) \ No newline at end of file diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 5dfad8c428..929bb206a6 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -25,6 +25,7 @@ from datetime import date, datetime, timezone from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeVar, Union +from hsfs.core.feature_view_engine import FeatureViewEngine if TYPE_CHECKING: import great_expectations @@ -1408,6 +1409,10 @@ def get_feature_logging_df(features, prediction=None, **kwargs): # and it is costly to join them back return features + @staticmethod + def read_feature_log(query): + df = query.read() + return df.drop("log_id", FeatureViewEngine._LOG_TIME) class SchemaError(Exception): """Thrown when schemas don't match""" From f3b8093fc4551e8cf22b8e74a1721805c2a47d71 Mon Sep 17 00:00:00 2001 From: kenneth Date: Mon, 8 Jul 2024 11:19:42 +0200 Subject: [PATCH 4/8] fix delete log --- python/hsfs/engine/python.py | 2 +- python/hsfs/feature_view.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 1265bff453..1a29c70ec4 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1639,4 +1639,4 @@ def get_feature_logging_df(features, @staticmethod def read_feature_log(query): df = query.read() - return df.drop(["log_id", FeatureViewEngine._LOG_TIME], axis=1) \ No newline at end of file + return df.drop(["log_id", FeatureViewEngine._LOG_TIME], axis=1) diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index d931d5936f..0e7ac3620d 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -3643,6 +3643,8 @@ def delete_log(self, transformed: Optional[bool]=None): # Raises `hsfs.client.exceptions.RestAPIError` in case the backend fails to delete the log. """ + if self._feature_logging is None: + self._feature_logging = self._feature_view_engine.get_feature_logging(self) return self._feature_view_engine.delete_feature_logs( self, self._feature_logging, transformed ) From 7994d17aefdc9edac2ec9205d7939ed8326fcd9c Mon Sep 17 00:00:00 2001 From: kenneth Date: Mon, 8 Jul 2024 13:34:41 +0200 Subject: [PATCH 5/8] log spark --- python/hsfs/engine/python.py | 1 - python/hsfs/engine/spark.py | 47 +++++++++++++++++++++++++++++++++--- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 1a29c70ec4..19141e0a3f 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1620,7 +1620,6 @@ def get_feature_logging_df(features, training_dataset_version=None, hsml_model=None, ) -> pd.DataFrame: - import uuid features = Engine._convert_feature_log_to_df(features, [f.name for f in fg_features]) if td_predictions: prediction = Engine._convert_feature_log_to_df(prediction, [f.name for f in td_predictions]) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 929bb206a6..040d5c5521 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -21,11 +21,13 @@ import os import re import shutil +import uuid import warnings from datetime import date, datetime, timezone from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeVar, Union from hsfs.core.feature_view_engine import FeatureViewEngine +from hsfs.training_dataset_feature import TrainingDatasetFeature if TYPE_CHECKING: import great_expectations @@ -1078,7 +1080,12 @@ def read_options(self, data_format, provided_options): options.update(provided_options) return options - def parse_schema_feature_group(self, dataframe, time_travel_format=None): + def parse_schema_feature_group(self, + dataframe, + time_travel_format=None, + features: Optional[ + List[feature.Feature]] = None, + ): features = [] using_hudi = time_travel_format == "HUDI" for feat in dataframe.schema: @@ -1403,11 +1410,43 @@ def cast_columns(df, schema, online=False): def is_connector_type_supported(type): return True - @staticmethod - def get_feature_logging_df(features, prediction=None, **kwargs): + def get_feature_logging_df(self, features, + fg=None, + fg_features: List[TrainingDatasetFeature]=None, + td_predictions: List[TrainingDatasetFeature]=None, + td_col_name=None, + time_col_name=None, + model_col_name=None, + prediction=None, + training_dataset_version=None, + hsml_model=None, + **kwargs): # do not take prediction separately because spark ml framework usually return feature together with the prediction # and it is costly to join them back - return features + df = self.convert_to_default_dataframe(features) + if td_predictions: + for f in td_predictions: + if f.name not in df.columns: + df = df.withColumn(f.name, lit(None).cast( + Engine._convert_offline_type_to_spark_type(f.type))) + + uuid_udf = udf(lambda: str(uuid.uuid4()), StringType()) + + # Add new columns to the DataFrame + df = df.withColumn(td_col_name, + lit(training_dataset_version).cast(LongType())) + if hsml_model is not None: + hsml_str = FeatureViewEngine.get_hsml_model_value(hsml_model) + else: + hsml_str = None + df = df.withColumn(model_col_name, lit(hsml_str).cast(StringType())) + now = datetime.now() + df = df.withColumn(time_col_name, + lit(now).cast(TimestampType())) + df = df.withColumn("log_id", uuid_udf()) + + # Select the required columns + return df.select(*[feat.name for feat in fg.features]) @staticmethod def read_feature_log(query): From 5099d5dd17cb73464ae590f48873a830c0ec0923 Mon Sep 17 00:00:00 2001 From: kenneth Date: Tue, 9 Jul 2024 08:44:57 +0200 Subject: [PATCH 6/8] fix style --- python/hsfs/engine/spark.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 040d5c5521..e74609988b 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -29,6 +29,7 @@ from hsfs.core.feature_view_engine import FeatureViewEngine from hsfs.training_dataset_feature import TrainingDatasetFeature + if TYPE_CHECKING: import great_expectations from pyspark.rdd import RDD From 16e40f71069884727a480e18c8e50ae5ad2647be Mon Sep 17 00:00:00 2001 From: kenneth Date: Tue, 9 Jul 2024 11:51:11 +0200 Subject: [PATCH 7/8] fix _apply_transformation_function --- python/hsfs/engine/spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index e74609988b..5ded426bd8 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1199,7 +1199,7 @@ def add_cols_to_delta_table(self, feature_group, new_features): "spark.databricks.delta.schema.autoMerge.enabled", "true" ).save(feature_group.location) - def _apply_transformation_function(self, transformation_functions, dataset): + def _apply_transformation_function(self, transformation_functions, dataset, **kwargs): # generate transformation function expressions transformed_feature_names = [] transformation_fn_expressions = [] From fb6642dcadad218ae261ea21cd4253ec766e64e1 Mon Sep 17 00:00:00 2001 From: kenneth Date: Tue, 9 Jul 2024 12:06:01 +0200 Subject: [PATCH 8/8] fix inplace --- python/hsfs/engine/python.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 19141e0a3f..0f4b213d68 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1196,7 +1196,7 @@ def _apply_transformation_function( str, transformation_function_attached.TransformationFunctionAttached ], dataset: Union[pd.DataFrame, pl.DataFrame], - inplace=False, + inplace=True, ) -> Union[pd.DataFrame, pl.DataFrame]: for ( feature_name, @@ -1205,7 +1205,7 @@ def _apply_transformation_function( if isinstance(dataset, pl.DataFrame) or isinstance( dataset, pl.dataframe.frame.DataFrame ): - if inplace: + if not inplace: dataset = dataset.clone() dataset = dataset.with_columns( pl.col(feature_name).map_elements( @@ -1213,7 +1213,7 @@ def _apply_transformation_function( ) ) else: - if inplace: + if not inplace: dataset = pd.DataFrame.copy(dataset) dataset[feature_name] = dataset[feature_name].map( transformation_fn.transformation_fn