From 493eff12c6c066a7112a70cb8583dff25378c51e Mon Sep 17 00:00:00 2001 From: Chinmaya Jena Date: Fri, 24 Jun 2022 21:17:55 +0530 Subject: [PATCH 01/10] Add callback function as parameter in the feature usage functions --- feathr_project/feathr/client.py | 58 ++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/feathr_project/feathr/client.py b/feathr_project/feathr/client.py index 0da845e4c..6f8556f2c 100644 --- a/feathr_project/feathr/client.py +++ b/feathr_project/feathr/client.py @@ -2,6 +2,7 @@ import logging import os import tempfile +import asyncio from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Union @@ -264,13 +265,16 @@ def _get_registry_client(self): """ return self.registry._get_registry_client() - def get_online_features(self, feature_table, key, feature_names): - """Fetches feature value for a certain key from a online feature table. + def get_online_features(self, feature_table, key, feature_names, callback: callable = None, params: dict = None): + """Fetches feature value for a certain key from a online feature table. There is an optional callback function + and the params to extend this function's capability.For eg. cosumer of the features. Args: feature_table: the name of the feature table. key: the key of the entity feature_names: list of feature names to fetch + callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. + params: a dictionary of parameters for the callback function Return: A list of feature values for this entity. It's ordered by the requested feature names. @@ -283,15 +287,21 @@ def get_online_features(self, feature_table, key, feature_names): """ redis_key = self._construct_redis_key(feature_table, key) res = self.redis_clint.hmget(redis_key, *feature_names) - return self._decode_proto(res) + feature_values = self._decode_proto(res) + if (callback is not None) and (params is not None): + event_loop = asyncio.get_event_loop() + event_loop.create_task(callback(params)) + return feature_values - def multi_get_online_features(self, feature_table, keys, feature_names): + def multi_get_online_features(self, feature_table, keys, feature_names, callback: callable = None, params: dict = None): """Fetches feature value for a list of keys from a online feature table. This is the batch version of the get API. Args: feature_table: the name of the feature table. keys: list of keys for the entities feature_names: list of feature names to fetch + callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. + params: a dictionary of parameters for the callback function Return: A list of feature values for the requested entities. It's ordered by the requested feature names. For @@ -312,6 +322,10 @@ def multi_get_online_features(self, feature_table, keys, feature_names): for feature_list in pipeline_result: decoded_pipeline_result.append(self._decode_proto(feature_list)) + if (callback is not None) and (params is not None): + event_loop = asyncio.get_event_loop() + event_loop.create_task(callback(params)) + return dict(zip(keys, decoded_pipeline_result)) def _decode_proto(self, feature_list): @@ -412,15 +426,20 @@ def get_offline_features(self, output_path: str, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, udf_files = None, - verbose: bool = False + verbose: bool = False, + callback: callable = None, + params: dict = None ): """ - Get offline features for the observation dataset + Get offline features for the observation dataset. There is an optional callback function and the params + to extend this function's capability.For eg. cosumer of the features. Args: observation_settings: settings of the observation data, e.g. timestamp columns, input path, etc. feature_query: features that are requested to add onto the observation data output_path: output path of job, i.e. the observation data with features attached. execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. + callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. + params: a dictionary of parameters for the callback function """ feature_queries = feature_query if isinstance(feature_query, List) else [feature_query] feature_names = [] @@ -457,7 +476,11 @@ def get_offline_features(self, FeaturePrinter.pretty_print_feature_query(feature_query) write_to_file(content=config, full_file_name=config_file_path) - return self._get_offline_features_with_config(config_file_path, execution_configuratons, udf_files=udf_files) + job_info = self._get_offline_features_with_config(config_file_path, execution_configuratons, udf_files=udf_files) + if (callback is not None) and (params is not None): + event_loop = asyncio.get_event_loop() + event_loop.create_task(callback(params)) + return job_info def _get_offline_features_with_config(self, feature_join_conf_path='feature_join_conf/feature_join.conf', execution_configuratons: Dict[str,str] = {}, udf_files=[]): """Joins the features to your offline observation dataset based on the join config. @@ -534,21 +557,30 @@ def wait_job_to_finish(self, timeout_sec: int = 300): else: raise RuntimeError('Spark job failed.') - def monitor_features(self, settings: MonitoringSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False): - """Create a offline job to generate statistics to monitor feature data + def monitor_features(self, settings: MonitoringSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, callback: callable = None, params: dict = None): + """Create a offline job to generate statistics to monitor feature data. There is an optional + callback function and the params to extend this function's capability.For eg. cosumer of the features. Args: settings: Feature monitoring settings execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. + callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. + params: a dictionary of parameters for the callback function. """ self.materialize_features(settings, execution_configuratons, verbose) + if (callback is not None) and (params is not None): + event_loop = asyncio.get_event_loop() + event_loop.create_task(callback(params)) - def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False): - """Materialize feature data + def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, callback: callable = None, params: dict = None): + """Materialize feature data. There is an optional callback function and the params + to extend this function's capability.For eg. cosumer of the feature store. Args: settings: Feature materialization settings execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. + callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. + params: a dictionary of parameters for the callback function """ # produce materialization config for end in settings.get_backfill_cutoff_time(): @@ -575,6 +607,10 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf # Pretty print feature_names of materialized features if verbose and settings: FeaturePrinter.pretty_print_materialize_features(settings) + + if (callback is not None) and (params is not None): + event_loop = asyncio.get_event_loop() + event_loop.create_task(callback(params)) def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configuratons: Dict[str,str] = {}, udf_files=[]): """Materializes feature data based on the feature generation config. The feature From 308bafa431f1476edb59e7270538bb9131800be1 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Sat, 25 Jun 2022 13:39:21 +0530 Subject: [PATCH 02/10] added documentation for using callback function in client.py --- .../how-to-guides/client-callback-function.md | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 docs/how-to-guides/client-callback-function.md diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md new file mode 100644 index 000000000..ddf1f8d29 --- /dev/null +++ b/docs/how-to-guides/client-callback-function.md @@ -0,0 +1,43 @@ +--- +layout: default +title: How to use callback function in feathr client +parent: Feathr How-to Guides +--- + +# How to use callback function in feathr client + +This doc shows how to build feathr registry docker image locally and publish to registry + +## What is a callback function + +A callback function is a function that is sent to another function as an argument. It can be used to extend the function as per thr user needs. + +## How to use callback functions + +Currently these functions in feathr client support callbacks + +- get_online_features +- multi_get_online_features +- get_offline_features +- monitor_features +- materialize_features + +They accept two optional parameters named **callback** and **params**, where callback is of type function and params is a dictionary where user can pass the arguments for the callback function. + +An example on how to use it: + +```python +async def callback(params): + import httpx + async with httpx.AsyncClient() as client: + response = await client.post('https://some-endpoint', json = payload) + return response + +params = {"param1":"value1", "param2":"value2"} + +# inside the notebook +client = FeathrClient(config_path) +client.get_offline_features(observation_settings,feature_query,output_path, callback, params) + + +``` From abad7d4d6a3233d3d80aedf4b2015611a2dda5ed Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Sat, 25 Jun 2022 15:05:17 +0530 Subject: [PATCH 03/10] corrected payload to params --- docs/how-to-guides/client-callback-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md index ddf1f8d29..fb005fd83 100644 --- a/docs/how-to-guides/client-callback-function.md +++ b/docs/how-to-guides/client-callback-function.md @@ -30,7 +30,7 @@ An example on how to use it: async def callback(params): import httpx async with httpx.AsyncClient() as client: - response = await client.post('https://some-endpoint', json = payload) + response = await client.post('https://some-endpoint', json = params) return response params = {"param1":"value1", "param2":"value2"} From c41f0aaf88d183f2d02b441422510ae286a93ef0 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Sun, 26 Jun 2022 21:12:43 +0530 Subject: [PATCH 04/10] added asyncio to setup.py and requirements.txt --- feathr_project/docs/requirements.txt | 3 ++- feathr_project/setup.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/feathr_project/docs/requirements.txt b/feathr_project/docs/requirements.txt index eb4f05184..2bb0a9e42 100644 --- a/feathr_project/docs/requirements.txt +++ b/feathr_project/docs/requirements.txt @@ -14,4 +14,5 @@ google>=3.0.0 google-api-python-client>=2.41.0 azure-keyvault-secrets confluent-kafka -azure-core<=1.22.1 \ No newline at end of file +azure-core<=1.22.1 +asyncio \ No newline at end of file diff --git a/feathr_project/setup.py b/feathr_project/setup.py index 9791fee95..7dda80343 100644 --- a/feathr_project/setup.py +++ b/feathr_project/setup.py @@ -51,7 +51,8 @@ # https://github.com/Azure/azure-sdk-for-python/pull/22891 # using a version lower than that to workaround this issue "azure-core<=1.22.1", - "typing_extensions>=4.2.0" + "typing_extensions>=4.2.0", + "asyncio" ], tests_require=[ 'pytest', From bc6b9c324c95c9b95d65e1b699abd9a982d9b9ec Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 11:32:42 +0000 Subject: [PATCH 05/10] removed asyncio and fixed documentation --- docs/how-to-guides/client-callback-function.md | 9 ++------- feathr_project/docs/requirements.txt | 3 +-- feathr_project/setup.py | 1 - 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md index fb005fd83..dd37a622b 100644 --- a/docs/how-to-guides/client-callback-function.md +++ b/docs/how-to-guides/client-callback-function.md @@ -4,17 +4,13 @@ title: How to use callback function in feathr client parent: Feathr How-to Guides --- -# How to use callback function in feathr client - -This doc shows how to build feathr registry docker image locally and publish to registry - ## What is a callback function -A callback function is a function that is sent to another function as an argument. It can be used to extend the function as per thr user needs. +A callback function is a function that is sent to another function as an argument. It can be used to extend the function as per the user needs. ## How to use callback functions -Currently these functions in feathr client support callbacks +Currently below functions in feathr client support callback - get_online_features - multi_get_online_features @@ -39,5 +35,4 @@ params = {"param1":"value1", "param2":"value2"} client = FeathrClient(config_path) client.get_offline_features(observation_settings,feature_query,output_path, callback, params) - ``` diff --git a/feathr_project/docs/requirements.txt b/feathr_project/docs/requirements.txt index 2bb0a9e42..eb4f05184 100644 --- a/feathr_project/docs/requirements.txt +++ b/feathr_project/docs/requirements.txt @@ -14,5 +14,4 @@ google>=3.0.0 google-api-python-client>=2.41.0 azure-keyvault-secrets confluent-kafka -azure-core<=1.22.1 -asyncio \ No newline at end of file +azure-core<=1.22.1 \ No newline at end of file diff --git a/feathr_project/setup.py b/feathr_project/setup.py index 7dda80343..dd940cada 100644 --- a/feathr_project/setup.py +++ b/feathr_project/setup.py @@ -52,7 +52,6 @@ # using a version lower than that to workaround this issue "azure-core<=1.22.1", "typing_extensions>=4.2.0", - "asyncio" ], tests_require=[ 'pytest', From e5cd8375f51fb2bcce9de7ba17ac53d196dc2043 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 18:39:44 +0530 Subject: [PATCH 06/10] fixed docs and added tests --- .../how-to-guides/client-callback-function.md | 11 +- feathr_project/docs/requirements.txt | 2 +- feathr_project/setup.py | 2 +- feathr_project/test/test_client_callback.py | 124 ++++++++++++++++++ 4 files changed, 130 insertions(+), 9 deletions(-) create mode 100644 feathr_project/test/test_client_callback.py diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md index fb005fd83..e213171e9 100644 --- a/docs/how-to-guides/client-callback-function.md +++ b/docs/how-to-guides/client-callback-function.md @@ -4,17 +4,13 @@ title: How to use callback function in feathr client parent: Feathr How-to Guides --- -# How to use callback function in feathr client - -This doc shows how to build feathr registry docker image locally and publish to registry - ## What is a callback function -A callback function is a function that is sent to another function as an argument. It can be used to extend the function as per thr user needs. +A callback function is a function that is sent to another function as an argument. It can be used to extend the function as per the user needs. ## How to use callback functions -Currently these functions in feathr client support callbacks +Currently the below functions in feathr client support passing a callback as an argument: - get_online_features - multi_get_online_features @@ -22,7 +18,8 @@ Currently these functions in feathr client support callbacks - monitor_features - materialize_features -They accept two optional parameters named **callback** and **params**, where callback is of type function and params is a dictionary where user can pass the arguments for the callback function. +These functions accept two optional parameters named **callback** and **params**. +callback is of type function and params is a dictionary where user can pass the arguments for the callback function. An example on how to use it: diff --git a/feathr_project/docs/requirements.txt b/feathr_project/docs/requirements.txt index 2bb0a9e42..fbdfd2def 100644 --- a/feathr_project/docs/requirements.txt +++ b/feathr_project/docs/requirements.txt @@ -15,4 +15,4 @@ google-api-python-client>=2.41.0 azure-keyvault-secrets confluent-kafka azure-core<=1.22.1 -asyncio \ No newline at end of file +mock \ No newline at end of file diff --git a/feathr_project/setup.py b/feathr_project/setup.py index 7dda80343..8b7eadc7b 100644 --- a/feathr_project/setup.py +++ b/feathr_project/setup.py @@ -52,7 +52,7 @@ # using a version lower than that to workaround this issue "azure-core<=1.22.1", "typing_extensions>=4.2.0", - "asyncio" + "mock" ], tests_require=[ 'pytest', diff --git a/feathr_project/test/test_client_callback.py b/feathr_project/test/test_client_callback.py new file mode 100644 index 000000000..e8eed77b2 --- /dev/null +++ b/feathr_project/test/test_client_callback.py @@ -0,0 +1,124 @@ +import os +import asyncio +import mock +import time +from subprocess import call +from datetime import datetime, timedelta + +from pathlib import Path +from feathr import ValueType +from feathr import FeatureQuery +from feathr import ObservationSettings +from feathr import TypedKey +from test_fixture import basic_test_setup +from test_fixture import get_online_test_table_name +from feathr.definition._materialization_utils import _to_materialization_config +from feathr import (BackfillTime, MaterializationSettings) +from feathr import (BackfillTime, MaterializationSettings, FeatureQuery, + ObservationSettings, SparkExecutionConfiguration) +from feathr import RedisSink, HdfsSink + + +params = {"wait" : 0.1} +async def sample_callback(params): + print(params) + await asyncio.sleep(0.1) + +callback = mock.MagicMock(return_value=sample_callback(params)) + +def test_client_callback_offline_feature(): + test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" + client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + + location_id = TypedKey(key_column="DOLocationID", + key_column_type=ValueType.INT32, + description="location id in NYC", + full_name="nyc_taxi.location_id") + feature_query = FeatureQuery(feature_list=["f_location_avg_fare"], key=location_id) + + settings = ObservationSettings( + observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", + event_timestamp_column="lpep_dropoff_datetime", + timestamp_format="yyyy-MM-dd HH:mm:ss") + + now = datetime.now() + output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".avro"]) + + res = client.get_offline_features(observation_settings=settings, + feature_query=feature_query, + output_path=output_path, + callback=callback, + params=params) + callback.assert_called_with(params) + + +def test_client_callback_materialization(): + online_test_table = get_online_test_table_name("nycTaxiCITable") + test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" + + client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) + redisSink = RedisSink(table_name=online_test_table) + settings = MaterializationSettings("nycTaxiTable", + sinks=[redisSink], + feature_names=[ + "f_location_avg_fare", "f_location_max_fare"], + backfill_time=backfill_time) + client.materialize_features(settings, callback=callback, params=params) + callback.assert_called_with(params) + +def test_client_callback_monitor_features(): + online_test_table = get_online_test_table_name("nycTaxiCITable") + test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" + + client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) + redisSink = RedisSink(table_name=online_test_table) + settings = MaterializationSettings("nycTaxiTable", + sinks=[redisSink], + feature_names=[ + "f_location_avg_fare", "f_location_max_fare"], + backfill_time=backfill_time) + client.monitor_features(settings, callback=callback, params=params) + callback.assert_called_with(params) + +def test_client_callback_get_online_features(): + online_test_table = get_online_test_table_name("nycTaxiCITable") + test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" + + client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) + redisSink = RedisSink(table_name=online_test_table) + settings = MaterializationSettings("nycTaxiTable", + sinks=[redisSink], + feature_names=[ + "f_location_avg_fare", "f_location_max_fare"], + backfill_time=backfill_time) + client.materialize_features(settings) + callback.assert_called_with(params) + client.wait_job_to_finish(timeout_sec=900) + # wait for a few secs for the data to come in redis + time.sleep(5) + client.get_online_features('nycTaxiDemoFeature', '265', ['f_location_avg_fare', 'f_location_max_fare'], callback=callback, params=params) + callback.assert_called_with(params) + + +def test_client_callback_multi_get_online_features(): + online_test_table = get_online_test_table_name("nycTaxiCITable") + test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" + + client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) + redisSink = RedisSink(table_name=online_test_table) + settings = MaterializationSettings("nycTaxiTable", + sinks=[redisSink], + feature_names=[ + "f_location_avg_fare", "f_location_max_fare"], + backfill_time=backfill_time) + client.materialize_features(settings) + callback.assert_called_with(params) + client.wait_job_to_finish(timeout_sec=900) + # wait for a few secs for the data to come in redis + time.sleep(5) + client.multi_get_online_features('nycTaxiDemoFeature', ["239", "265"], ['f_location_avg_fare', 'f_location_max_fare'], callback=callback, params=params) + callback.assert_called_with(params) \ No newline at end of file From 525e15b52b03546bd0c91f1846107bf447e8acca Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 18:47:05 +0530 Subject: [PATCH 07/10] fixed docs example code --- docs/how-to-guides/client-callback-function.md | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md index 945d6c0aa..1d87fa57d 100644 --- a/docs/how-to-guides/client-callback-function.md +++ b/docs/how-to-guides/client-callback-function.md @@ -24,16 +24,17 @@ callback is of type function and params is a dictionary where user can pass the An example on how to use it: ```python -async def callback(params): - import httpx - async with httpx.AsyncClient() as client: - response = await client.post('https://some-endpoint', json = params) - return response +# inside notebook +client = FeathrClient(config_path) +client.get_offline_features(observation_settings,feature_query,output_path, callback, params) +# users can define their own callback function and params params = {"param1":"value1", "param2":"value2"} -# inside the notebook -client = FeathrClient(config_path) -client.get_offline_features(observation_settings,feature_query,output_path, callback, params) +async def callback(params): + import httpx + async with httpx.AsyncClient() as requestHandler: + response = await requestHandler.post('https://some-endpoint', json = params) + return response ``` From f39e07c8d223174afefdff5057b36eaeddaa4eb2 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 18:51:43 +0530 Subject: [PATCH 08/10] moved mock to tests_require in setup.py --- feathr_project/docs/requirements.txt | 4 ---- feathr_project/setup.py | 7 ++----- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/feathr_project/docs/requirements.txt b/feathr_project/docs/requirements.txt index 5ffc612a7..05db397e7 100644 --- a/feathr_project/docs/requirements.txt +++ b/feathr_project/docs/requirements.txt @@ -14,9 +14,5 @@ google>=3.0.0 google-api-python-client>=2.41.0 azure-keyvault-secrets confluent-kafka -<<<<<<< HEAD azure-core<=1.22.1 mock -======= -azure-core<=1.22.1 ->>>>>>> bc6b9c324c95c9b95d65e1b699abd9a982d9b9ec diff --git a/feathr_project/setup.py b/feathr_project/setup.py index f72ba234b..d22d7402a 100644 --- a/feathr_project/setup.py +++ b/feathr_project/setup.py @@ -51,14 +51,11 @@ # https://github.com/Azure/azure-sdk-for-python/pull/22891 # using a version lower than that to workaround this issue "azure-core<=1.22.1", - "typing_extensions>=4.2.0", -<<<<<<< HEAD - "mock" -======= ->>>>>>> bc6b9c324c95c9b95d65e1b699abd9a982d9b9ec + "typing_extensions>=4.2.0" ], tests_require=[ 'pytest', + 'mock' ], entry_points={ 'console_scripts': ['feathr=feathrcli.cli:cli'] From e20914bac6fe94b1869900d85653dccba830c3c9 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 19:19:42 +0530 Subject: [PATCH 09/10] changed mock to unittest.mock --- feathr_project/docs/requirements.txt | 3 +-- feathr_project/setup.py | 1 - feathr_project/test/test_client_callback.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/feathr_project/docs/requirements.txt b/feathr_project/docs/requirements.txt index 05db397e7..eb4f05184 100644 --- a/feathr_project/docs/requirements.txt +++ b/feathr_project/docs/requirements.txt @@ -14,5 +14,4 @@ google>=3.0.0 google-api-python-client>=2.41.0 azure-keyvault-secrets confluent-kafka -azure-core<=1.22.1 -mock +azure-core<=1.22.1 \ No newline at end of file diff --git a/feathr_project/setup.py b/feathr_project/setup.py index d22d7402a..9791fee95 100644 --- a/feathr_project/setup.py +++ b/feathr_project/setup.py @@ -55,7 +55,6 @@ ], tests_require=[ 'pytest', - 'mock' ], entry_points={ 'console_scripts': ['feathr=feathrcli.cli:cli'] diff --git a/feathr_project/test/test_client_callback.py b/feathr_project/test/test_client_callback.py index e8eed77b2..b3b543426 100644 --- a/feathr_project/test/test_client_callback.py +++ b/feathr_project/test/test_client_callback.py @@ -1,6 +1,6 @@ import os import asyncio -import mock +import unittest.mock as mock import time from subprocess import call from datetime import datetime, timedelta From aeb3e45c8c9b992ffccec4392a7492233ea8d882 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Tue, 28 Jun 2022 10:07:24 +0530 Subject: [PATCH 10/10] moved callback to constructor --- .../how-to-guides/client-callback-function.md | 18 ++-- feathr_project/feathr/client.py | 40 ++++--- feathr_project/test/test_client_callback.py | 101 +++++++++++++++--- 3 files changed, 117 insertions(+), 42 deletions(-) diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md index 1d87fa57d..e7bfca830 100644 --- a/docs/how-to-guides/client-callback-function.md +++ b/docs/how-to-guides/client-callback-function.md @@ -10,7 +10,13 @@ A callback function is a function that is sent to another function as an argumen ## How to use callback functions -Currently the below functions in feathr client support passing a callback as an argument: +We can pass a callback function when initializing the feathr client. + +```python +client = FeathrClient(config_path, callback) +``` + +The below functions accept an optional parameters named **params**. params is a dictionary where user can pass the arguments for the callback function. - get_online_features - multi_get_online_features @@ -18,19 +24,15 @@ Currently the below functions in feathr client support passing a callback as an - monitor_features - materialize_features -These functions accept two optional parameters named **callback** and **params**. -callback is of type function and params is a dictionary where user can pass the arguments for the callback function. - An example on how to use it: ```python # inside notebook -client = FeathrClient(config_path) -client.get_offline_features(observation_settings,feature_query,output_path, callback, params) - -# users can define their own callback function and params +client = FeathrClient(config_path, callback) params = {"param1":"value1", "param2":"value2"} +client.get_offline_features(observation_settings,feature_query,output_path, params) +# users can define their own callback function async def callback(params): import httpx async with httpx.AsyncClient() as requestHandler: diff --git a/feathr_project/feathr/client.py b/feathr_project/feathr/client.py index 3aac1b123..2a8c4cd86 100644 --- a/feathr_project/feathr/client.py +++ b/feathr_project/feathr/client.py @@ -83,12 +83,13 @@ class FeathrClient(object): local_workspace_dir (str, optional): set where is the local work space dir. If not set, Feathr will create a temporary folder to store local workspace related files. credential (optional): credential to access cloud resources, most likely to be the returned result of DefaultAzureCredential(). If not set, Feathr will initialize DefaultAzureCredential() inside the __init__ function to get credentials. project_registry_tag (Dict[str, str]): adding tags for project in Feathr registry. This might be useful if you want to tag your project as deprecated, or allow certain customizations on project leve. Default is empty + callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. This is optional. Raises: RuntimeError: Fail to create the client since necessary environment variables are not set for Redis client creation. """ - def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir: str = None, credential=None, project_registry_tag: Dict[str, str]=None): + def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir: str = None, credential=None, project_registry_tag: Dict[str, str]=None, callback:callable = None): self.logger = logging.getLogger(__name__) # Redis key separator self._KEY_SEPARATOR = ':' @@ -183,6 +184,7 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir 'feature_registry', 'purview', 'purview_name') # initialize the registry no matter whether we set purview name or not, given some of the methods are used there. self.registry = _FeatureRegistry(self.project_name, self.azure_purview_name, self.registry_delimiter, project_registry_tag, config_path = config_path, credential=self.credential) + self.callback = callback def _check_required_environment_variables_exist(self): """Checks if the required environment variables(form feathr_config.yaml) is set. @@ -265,7 +267,7 @@ def _get_registry_client(self): """ return self.registry._get_registry_client() - def get_online_features(self, feature_table, key, feature_names, callback: callable = None, params: dict = None): + def get_online_features(self, feature_table, key, feature_names, params: dict = None): """Fetches feature value for a certain key from a online feature table. There is an optional callback function and the params to extend this function's capability.For eg. cosumer of the features. @@ -273,7 +275,6 @@ def get_online_features(self, feature_table, key, feature_names, callback: calla feature_table: the name of the feature table. key: the key of the entity feature_names: list of feature names to fetch - callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. params: a dictionary of parameters for the callback function Return: @@ -288,19 +289,18 @@ def get_online_features(self, feature_table, key, feature_names, callback: calla redis_key = self._construct_redis_key(feature_table, key) res = self.redis_clint.hmget(redis_key, *feature_names) feature_values = self._decode_proto(res) - if (callback is not None) and (params is not None): + if (self.callback is not None) and (params is not None): event_loop = asyncio.get_event_loop() - event_loop.create_task(callback(params)) + event_loop.create_task(self.callback(params)) return feature_values - def multi_get_online_features(self, feature_table, keys, feature_names, callback: callable = None, params: dict = None): + def multi_get_online_features(self, feature_table, keys, feature_names, params: dict = None): """Fetches feature value for a list of keys from a online feature table. This is the batch version of the get API. Args: feature_table: the name of the feature table. keys: list of keys for the entities feature_names: list of feature names to fetch - callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. params: a dictionary of parameters for the callback function Return: @@ -322,9 +322,9 @@ def multi_get_online_features(self, feature_table, keys, feature_names, callback for feature_list in pipeline_result: decoded_pipeline_result.append(self._decode_proto(feature_list)) - if (callback is not None) and (params is not None): + if (self.callback is not None) and (params is not None): event_loop = asyncio.get_event_loop() - event_loop.create_task(callback(params)) + event_loop.create_task(self.callback(params)) return dict(zip(keys, decoded_pipeline_result)) @@ -427,7 +427,6 @@ def get_offline_features(self, execution_configurations: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, udf_files = None, verbose: bool = False, - callback: callable = None, params: dict = None ): """ @@ -438,7 +437,6 @@ def get_offline_features(self, feature_query: features that are requested to add onto the observation data output_path: output path of job, i.e. the observation data with features attached. execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. - callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. params: a dictionary of parameters for the callback function """ feature_queries = feature_query if isinstance(feature_query, List) else [feature_query] @@ -476,10 +474,10 @@ def get_offline_features(self, FeaturePrinter.pretty_print_feature_query(feature_query) write_to_file(content=config, full_file_name=config_file_path) - job_info = self._get_offline_features_with_config(config_file_path, execution_configuratons, udf_files=udf_files) - if (callback is not None) and (params is not None): + job_info = self._get_offline_features_with_config(config_file_path, execution_configurations, udf_files=udf_files) + if (self.callback is not None) and (params is not None): event_loop = asyncio.get_event_loop() - event_loop.create_task(callback(params)) + event_loop.create_task(self.callback(params)) return job_info def _get_offline_features_with_config(self, feature_join_conf_path='feature_join_conf/feature_join.conf', execution_configurations: Dict[str,str] = {}, udf_files=[]): @@ -557,29 +555,27 @@ def wait_job_to_finish(self, timeout_sec: int = 300): else: raise RuntimeError('Spark job failed.') - def monitor_features(self, settings: MonitoringSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, callback: callable = None, params: dict = None): + def monitor_features(self, settings: MonitoringSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, params: dict = None): """Create a offline job to generate statistics to monitor feature data. There is an optional callback function and the params to extend this function's capability.For eg. cosumer of the features. Args: settings: Feature monitoring settings execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. - callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. params: a dictionary of parameters for the callback function. """ self.materialize_features(settings, execution_configuratons, verbose) - if (callback is not None) and (params is not None): + if (self.callback is not None) and (params is not None): event_loop = asyncio.get_event_loop() - event_loop.create_task(callback(params)) + event_loop.create_task(self.callback(params)) - def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, callback: callable = None, params: dict = None): + def materialize_features(self, settings: MaterializationSettings, execution_configurations: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, params: dict = None): """Materialize feature data. There is an optional callback function and the params to extend this function's capability.For eg. cosumer of the feature store. Args: settings: Feature materialization settings execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. - callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. params: a dictionary of parameters for the callback function """ # produce materialization config @@ -608,9 +604,9 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf if verbose and settings: FeaturePrinter.pretty_print_materialize_features(settings) - if (callback is not None) and (params is not None): + if (self.callback is not None) and (params is not None): event_loop = asyncio.get_event_loop() - event_loop.create_task(callback(params)) + event_loop.create_task(self.callback(params)) def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configurations: Dict[str,str] = {}, udf_files=[]): """Materializes feature data based on the feature generation config. The feature diff --git a/feathr_project/test/test_client_callback.py b/feathr_project/test/test_client_callback.py index b3b543426..544c4c20b 100644 --- a/feathr_project/test/test_client_callback.py +++ b/feathr_project/test/test_client_callback.py @@ -10,14 +10,16 @@ from feathr import FeatureQuery from feathr import ObservationSettings from feathr import TypedKey -from test_fixture import basic_test_setup from test_fixture import get_online_test_table_name from feathr.definition._materialization_utils import _to_materialization_config from feathr import (BackfillTime, MaterializationSettings) from feathr import (BackfillTime, MaterializationSettings, FeatureQuery, ObservationSettings, SparkExecutionConfiguration) from feathr import RedisSink, HdfsSink - +from feathr import (BOOLEAN, FLOAT, INPUT_CONTEXT, INT32, STRING, + DerivedFeature, Feature, FeatureAnchor, HdfsSource, + TypedKey, ValueType, WindowAggTransformation) +from feathr import FeathrClient params = {"wait" : 0.1} async def sample_callback(params): @@ -26,9 +28,85 @@ async def sample_callback(params): callback = mock.MagicMock(return_value=sample_callback(params)) + +def basic_test_setup_with_callback(config_path: str, callback: callable): + + now = datetime.now() + # set workspace folder by time; make sure we don't have write conflict if there are many CI tests running + os.environ['SPARK_CONFIG__DATABRICKS__WORK_DIR'] = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) + os.environ['SPARK_CONFIG__AZURE_SYNAPSE__WORKSPACE_DIR'] = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/feathr_github_ci','_', str(now.minute), '_', str(now.second) ,'_', str(now.microsecond)]) + + client = FeathrClient(config_path=config_path, callback=callback) + batch_source = HdfsSource(name="nycTaxiBatchSource", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", + event_timestamp_column="lpep_dropoff_datetime", + timestamp_format="yyyy-MM-dd HH:mm:ss") + + f_trip_distance = Feature(name="f_trip_distance", + feature_type=FLOAT, transform="trip_distance") + f_trip_time_duration = Feature(name="f_trip_time_duration", + feature_type=INT32, + transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60") + + features = [ + f_trip_distance, + f_trip_time_duration, + Feature(name="f_is_long_trip_distance", + feature_type=BOOLEAN, + transform="cast_float(trip_distance)>30"), + Feature(name="f_day_of_week", + feature_type=INT32, + transform="dayofweek(lpep_dropoff_datetime)"), + ] + + + request_anchor = FeatureAnchor(name="request_features", + source=INPUT_CONTEXT, + features=features) + + f_trip_time_distance = DerivedFeature(name="f_trip_time_distance", + feature_type=FLOAT, + input_features=[ + f_trip_distance, f_trip_time_duration], + transform="f_trip_distance * f_trip_time_duration") + + f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded", + feature_type=INT32, + input_features=[f_trip_time_duration], + transform="f_trip_time_duration % 10") + + location_id = TypedKey(key_column="DOLocationID", + key_column_type=ValueType.INT32, + description="location id in NYC", + full_name="nyc_taxi.location_id") + agg_features = [Feature(name="f_location_avg_fare", + key=location_id, + feature_type=FLOAT, + transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)", + agg_func="AVG", + window="90d")), + Feature(name="f_location_max_fare", + key=location_id, + feature_type=FLOAT, + transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)", + agg_func="MAX", + window="90d")) + ] + + agg_anchor = FeatureAnchor(name="aggregationFeatures", + source=batch_source, + features=agg_features) + + client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=[ + f_trip_time_distance, f_trip_time_rounded]) + + return client + + + def test_client_callback_offline_feature(): test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client = basic_test_setup_with_callback(os.path.join(test_workspace_dir, "feathr_config.yaml"),callback) location_id = TypedKey(key_column="DOLocationID", key_column_type=ValueType.INT32, @@ -47,7 +125,6 @@ def test_client_callback_offline_feature(): res = client.get_offline_features(observation_settings=settings, feature_query=feature_query, output_path=output_path, - callback=callback, params=params) callback.assert_called_with(params) @@ -56,7 +133,7 @@ def test_client_callback_materialization(): online_test_table = get_online_test_table_name("nycTaxiCITable") test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client = basic_test_setup_with_callback(os.path.join(test_workspace_dir, "feathr_config.yaml"),callback) backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) settings = MaterializationSettings("nycTaxiTable", @@ -64,14 +141,14 @@ def test_client_callback_materialization(): feature_names=[ "f_location_avg_fare", "f_location_max_fare"], backfill_time=backfill_time) - client.materialize_features(settings, callback=callback, params=params) + client.materialize_features(settings, params=params) callback.assert_called_with(params) def test_client_callback_monitor_features(): online_test_table = get_online_test_table_name("nycTaxiCITable") test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client = basic_test_setup_with_callback(os.path.join(test_workspace_dir, "feathr_config.yaml"),callback) backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) settings = MaterializationSettings("nycTaxiTable", @@ -79,14 +156,14 @@ def test_client_callback_monitor_features(): feature_names=[ "f_location_avg_fare", "f_location_max_fare"], backfill_time=backfill_time) - client.monitor_features(settings, callback=callback, params=params) + client.monitor_features(settings, params=params) callback.assert_called_with(params) def test_client_callback_get_online_features(): online_test_table = get_online_test_table_name("nycTaxiCITable") test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client = basic_test_setup_with_callback(os.path.join(test_workspace_dir, "feathr_config.yaml"),callback) backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) settings = MaterializationSettings("nycTaxiTable", @@ -99,7 +176,7 @@ def test_client_callback_get_online_features(): client.wait_job_to_finish(timeout_sec=900) # wait for a few secs for the data to come in redis time.sleep(5) - client.get_online_features('nycTaxiDemoFeature', '265', ['f_location_avg_fare', 'f_location_max_fare'], callback=callback, params=params) + client.get_online_features('nycTaxiDemoFeature', '265', ['f_location_avg_fare', 'f_location_max_fare'], params=params) callback.assert_called_with(params) @@ -107,7 +184,7 @@ def test_client_callback_multi_get_online_features(): online_test_table = get_online_test_table_name("nycTaxiCITable") test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client = basic_test_setup_with_callback(os.path.join(test_workspace_dir, "feathr_config.yaml"),callback) backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) settings = MaterializationSettings("nycTaxiTable", @@ -120,5 +197,5 @@ def test_client_callback_multi_get_online_features(): client.wait_job_to_finish(timeout_sec=900) # wait for a few secs for the data to come in redis time.sleep(5) - client.multi_get_online_features('nycTaxiDemoFeature', ["239", "265"], ['f_location_avg_fare', 'f_location_max_fare'], callback=callback, params=params) + client.multi_get_online_features('nycTaxiDemoFeature', ["239", "265"], ['f_location_avg_fare', 'f_location_max_fare'], params=params) callback.assert_called_with(params) \ No newline at end of file