Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APPEND][FSTORE-1424] Feature logging for spark #1367

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def insert(
validation_options: dict = None,
):
dataframe_features = engine.get_instance().parse_schema_feature_group(
feature_dataframe, feature_group.time_travel_format
feature_dataframe, feature_group.time_travel_format, feature_group.features
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
Expand Down
5 changes: 5 additions & 0 deletions python/hsfs/core/feature_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ def from_response_json(cls, json_dict: Dict[str, Any]) -> 'FeatureLogging':
untransformed_features = FeatureGroup.from_response_json(untransformed_features)
return cls(json_decamelized.get('id'), transformed_features, untransformed_features)

def update(self, others):
self._transformed_features = others._transformed_features
self._untransformed_features = others._untransformed_features
return self

Comment on lines +29 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this method instead of the setter methods for the two fields? @transformed_features.setter and @untransformed_features.setter?

@property
def transformed_features(self) -> "feature_group.FeatureGroup":
return self._transformed_features
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/core/feature_view_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,4 +460,4 @@ def delete_feature_logs(
path_params += [self._TRANSFORMED_lOG]
else:
path_params += [self._UNTRANSFORMED_LOG]
_client._send_request("DELETE", path_params, {})
return _client._send_request("DELETE", path_params, {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this returning? Normally delete methods return void since the corresponding item gets deleted.

34 changes: 18 additions & 16 deletions python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,29 +948,32 @@ def get_feature_logging(self, fv):

def _get_logging_fg(self, fv, transformed):
feature_logging = self.get_feature_logging(fv)
return self._get_logging_fg_feature_logging(feature_logging, transformed)

def _get_logging_fg_feature_logging(self, feature_logging, transformed):
if transformed:
return feature_logging.transformed_features
else:
return feature_logging.untransformed_features
Comment on lines 949 to 957
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these list are not easy to read.
_get_logging_fg_feature_logging is returning features that can be transformed or untransformed, but the method name indicates feature_logging.
_get_logging_fg retrieves the feature_logging for a fv and then call the previous method to return the (transformed or untransformed) features.

I wouldn't guess their behavior based on the method names :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, based on their used in log_features, it seems that untransformed_features and transformed_features are actually fg objects


def log_features(self, fv, features, prediction=None, transformed=False, write_options=None, training_dataset_version=None, hsml_model=None):
def log_features(self, fv, feature_logging, features_rows, prediction=None, transformed=False, write_options=None, training_dataset_version=None, hsml_model=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the types for all these parameters?
What is the type of feature_rows?

default_write_options = {
"start_offline_materialization": False,
}
if write_options:
default_write_options.update(write_options)
fg = self._get_logging_fg(fv, transformed)
fg = self._get_logging_fg_feature_logging(feature_logging, transformed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we better have a method called get_feature_group in the feature_logging class, that accepts a transformed boolean parameter?

def get_feature_group(self, transformed):
    return self._transformed_features if transformed else self._untransformed_features

Then:

  • we don't need _get_logging_fg_feature_logging and _get_logging_fg methods. We can just call get_feature_logging from the engine and then the internal method if needed.
  • we don't need to pass feature_logging as a separate parameter. We can use get_feature_logging(fv).get_feature_group(transformed)

df = engine.get_instance().get_feature_logging_df(
fg,
features,
[feature for feature in fv.features if not feature.label],
[feature for feature in fv.features if feature.label],
FeatureViewEngine._LOG_TD_VERSION,
FeatureViewEngine._LOG_TIME,
FeatureViewEngine._HSML_MODEL,
prediction,
training_dataset_version,
hsml_model,
features_rows,
fg=fg,
fg_features=[feature for feature in fv.features if not feature.label],
td_predictions=[feature for feature in fv.features if feature.label],
td_col_name=FeatureViewEngine._LOG_TD_VERSION,
time_col_name=FeatureViewEngine._LOG_TIME,
model_col_name=FeatureViewEngine._HSML_MODEL,
prediction=prediction,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is prediction one or multiple predictions? If many, can we use predictions instead?

training_dataset_version=training_dataset_version,
hsml_model=hsml_model,
)
return fg.insert(df, write_options=default_write_options)

Expand All @@ -997,9 +1000,7 @@ def read_feature_logs(self, fv,
query = query.filter(fg.get_feature(FeatureViewEngine._HSML_MODEL) == self.get_hsml_model_value(hsml_model))
if filter:
query = query.filter(self._convert_to_log_fg_filter(fg, fv, filter, fv_feat_name_map))
df = query.read()
df = df.drop(["log_id", FeatureViewEngine._LOG_TIME], axis=1)
return df
return engine.get_instance().read_feature_log(query)

@staticmethod
def get_hsml_model_value(hsml_model):
Expand Down Expand Up @@ -1067,7 +1068,8 @@ def materialize_feature_logs(self, fv, wait):
pass
return jobs

def delete_feature_logs(self, fv, transformed):
def delete_feature_logs(self, fv, feature_logging, transformed):
self._feature_view_api.delete_feature_logs(
fv.name, fv.version, transformed
)
feature_logging.update(self.get_feature_logging(fv))
52 changes: 32 additions & 20 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,20 +768,30 @@ def parse_schema_feature_group(
self,
dataframe: Union[pd.DataFrame, pl.DataFrame],
time_travel_format: Optional[str] = None,
features: Optional[List[feature.Feature]] = None,
) -> List[feature.Feature]:
feature_type_map = {}
if features:
for _feature in features:
feature_type_map[_feature.name] = _feature.type
if isinstance(dataframe, pd.DataFrame):
arrow_schema = pa.Schema.from_pandas(dataframe, preserve_index=False)
elif isinstance(dataframe, pl.DataFrame) or isinstance(
dataframe, pl.dataframe.frame.DataFrame
):
arrow_schema = dataframe.to_arrow().schema
features = []
for feat_name in arrow_schema.names:
for i in range(len(arrow_schema.names)):
feat_name = arrow_schema.names[i]
name = util.autofix_feature_name(feat_name)
try:
converted_type = self._convert_pandas_dtype_to_offline_type(
arrow_schema.field(feat_name).type
)
pd_type = arrow_schema.field(feat_name).type
if pd_type == "null" and feature_type_map.get(name):
converted_type = feature_type_map.get(name)
else:
converted_type = self._convert_pandas_dtype_to_offline_type(
pd_type
)
except ValueError as e:
raise FeatureStoreException(f"Feature '{name}': {str(e)}") from e
features.append(feature.Feature(name, converted_type))
Expand Down Expand Up @@ -1195,13 +1205,16 @@ def _apply_transformation_function(
if isinstance(dataset, pl.DataFrame) or isinstance(
dataset, pl.dataframe.frame.DataFrame
):
if not inplace:
dataset = dataset.clone()
dataset = dataset.with_columns(
pl.col(feature_name).map_elements(
transformation_fn.transformation_fn
)
)
else:
dataset = pd.DataFrame.copy(dataset)
if not inplace:
dataset = pd.DataFrame.copy(dataset)
dataset[feature_name] = dataset[feature_name].map(
transformation_fn.transformation_fn
)
Expand Down Expand Up @@ -1596,34 +1609,33 @@ def _convert_feature_log_to_df(feature_log, cols):
return feature_log.copy(deep=False)

@staticmethod
def get_feature_logging_df(fg,
features,
fg_features: List[TrainingDatasetFeature],
td_predictions: List[TrainingDatasetFeature],
td_col_name,
time_col_name,
model_col_name,
def get_feature_logging_df(features,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add types to all parameters?

fg=None,
fg_features: List[TrainingDatasetFeature]=None,
td_predictions: List[TrainingDatasetFeature]=None,
td_col_name=None,
time_col_name=None,
model_col_name=None,
prediction=None,
training_dataset_version=None,
hsml_model=None,
) -> pd.DataFrame:
import uuid
features = Engine._convert_feature_log_to_df(features, [f.name for f in fg_features])
if td_predictions:
prediction = Engine._convert_feature_log_to_df(prediction, [f.name for f in td_predictions])
for f in td_predictions:
prediction[f.name] = Engine._cast_column_to_offline_type(prediction[f.name], f.type)
if not set(prediction.columns).intersection(set(features.columns)):
features = pd.concat([features, prediction], axis=1)
# need to case the column type as if it is None, type cannot be inferred.
features[td_col_name] = Engine._cast_column_to_offline_type(
pd.Series([training_dataset_version for _ in range(len(features))]), fg.get_feature(td_col_name).type
)
features[td_col_name] = pd.Series([training_dataset_version for _ in range(len(features))])
# _cast_column_to_offline_type cannot cast string type
features[model_col_name] = pd.Series([FeatureViewEngine.get_hsml_model_value(hsml_model) if hsml_model else None for _ in range(len(features))], dtype=pd.StringDtype())
now = datetime.now()
features[time_col_name] = Engine._cast_column_to_offline_type(
pd.Series([now for _ in range(len(features))]), fg.get_feature(time_col_name).type
)
features[time_col_name] = pd.Series([now for _ in range(len(features))])
features["log_id"] = [str(uuid.uuid4()) for _ in range(len(features))]
return features[[feat.name for feat in fg.features]]

@staticmethod
def read_feature_log(query):
df = query.read()
return df.drop(["log_id", FeatureViewEngine._LOG_TIME], axis=1)
55 changes: 50 additions & 5 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import os
import re
import shutil
import uuid
import warnings
from datetime import date, datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeVar, Union

from hsfs.core.feature_view_engine import FeatureViewEngine
from hsfs.training_dataset_feature import TrainingDatasetFeature


if TYPE_CHECKING:
import great_expectations
Expand Down Expand Up @@ -1077,7 +1081,12 @@ def read_options(self, data_format, provided_options):
options.update(provided_options)
return options

def parse_schema_feature_group(self, dataframe, time_travel_format=None):
def parse_schema_feature_group(self,
dataframe,
time_travel_format=None,
features: Optional[
List[feature.Feature]] = None,
):
features = []
using_hudi = time_travel_format == "HUDI"
for feat in dataframe.schema:
Expand Down Expand Up @@ -1190,7 +1199,7 @@ def add_cols_to_delta_table(self, feature_group, new_features):
"spark.databricks.delta.schema.autoMerge.enabled", "true"
).save(feature_group.location)

def _apply_transformation_function(self, transformation_functions, dataset):
def _apply_transformation_function(self, transformation_functions, dataset, **kwargs):
# generate transformation function expressions
transformed_feature_names = []
transformation_fn_expressions = []
Expand Down Expand Up @@ -1402,12 +1411,48 @@ def cast_columns(df, schema, online=False):
def is_connector_type_supported(type):
return True

@staticmethod
def get_feature_logging_df(features, prediction=None):
def get_feature_logging_df(self, features,
fg=None,
fg_features: List[TrainingDatasetFeature]=None,
td_predictions: List[TrainingDatasetFeature]=None,
td_col_name=None,
time_col_name=None,
model_col_name=None,
prediction=None,
training_dataset_version=None,
hsml_model=None,
**kwargs):
Comment on lines +1414 to +1424
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add types to these parameters?

# do not take prediction separately because spark ml framework usually return feature together with the prediction
# and it is costly to join them back
return features
df = self.convert_to_default_dataframe(features)
if td_predictions:
for f in td_predictions:
if f.name not in df.columns:
df = df.withColumn(f.name, lit(None).cast(
Engine._convert_offline_type_to_spark_type(f.type)))

uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())

# Add new columns to the DataFrame
df = df.withColumn(td_col_name,
lit(training_dataset_version).cast(LongType()))
Comment on lines +1437 to +1438
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation seems off here.

if hsml_model is not None:
hsml_str = FeatureViewEngine.get_hsml_model_value(hsml_model)
else:
hsml_str = None
df = df.withColumn(model_col_name, lit(hsml_str).cast(StringType()))
now = datetime.now()
df = df.withColumn(time_col_name,
lit(now).cast(TimestampType()))
Comment on lines +1445 to +1446
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation here as well

df = df.withColumn("log_id", uuid_udf())

# Select the required columns
return df.select(*[feat.name for feat in fg.features])

@staticmethod
def read_feature_log(query):
df = query.read()
return df.drop("log_id", FeatureViewEngine._LOG_TIME)

class SchemaError(Exception):
"""Thrown when schemas don't match"""
13 changes: 10 additions & 3 deletions python/hsfs/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def __init__(
featurestore_id, self.ENTITY_TYPE
)
self._logging_enabled = logging_enabled
self._feature_logging = None

if self._id:
self._init_feature_monitoring_engine()
Expand Down Expand Up @@ -3463,7 +3464,9 @@ def enable_logging(self) -> None:
# Raises
`hsfs.client.exceptions.RestAPIError` in case the backend fails to enable feature logging.
"""
return self._feature_view_engine.enable_feature_logging(self)
fv = self._feature_view_engine.enable_feature_logging(self)
self._feature_logging = self._feature_view_engine.get_feature_logging(fv)
return fv

def log(self,
features: Union[pd.DataFrame, list[list], np.ndarray],
Expand Down Expand Up @@ -3503,7 +3506,7 @@ def log(self,
)
self.enable_logging()
return self._feature_view_engine.log_features(
self, features, predictions, transformed,
self, self._feature_logging, features, predictions, transformed,
write_options,
training_dataset_version=(
training_dataset_version or self.get_last_accessed_training_dataset()
Expand Down Expand Up @@ -3640,7 +3643,11 @@ def delete_log(self, transformed: Optional[bool]=None):
# Raises
`hsfs.client.exceptions.RestAPIError` in case the backend fails to delete the log.
"""
return self._feature_view_engine.delete_feature_logs(self, transformed)
if self._feature_logging is None:
self._feature_logging = self._feature_view_engine.get_feature_logging(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if delete_log is called in a new FV without feature_logging enabled? (no call to enable_feature_logging() or log()) ?
I guess this line will return None, and then fail in the delete_feature_logs() ?

return self._feature_view_engine.delete_feature_logs(
self, self._feature_logging, transformed
)

@staticmethod
def _update_attribute_if_present(this: "FeatureView", new: Any, key: str) -> None:
Expand Down
Loading