From 26053ef6d8cf6bd8f5db85e6dd7137129fd8d219 Mon Sep 17 00:00:00 2001 From: joostr Date: Wed, 22 Dec 2021 15:28:44 +0100 Subject: [PATCH] Revert "First release (#4)" This reverts commit 0a92c8cfc756cfa68fba661e996476b72de8733a. --- .github/workflows/python-package.yaml | 42 ------------------- .github/workflows/python-publish.yml | 36 ---------------- README.md | 2 +- example_feature_repo/example.py | 7 +--- feast_spark_offline_store/__init__.py | 7 +--- feast_spark_offline_store/spark.py | 46 +++++++-------------- feast_spark_offline_store/spark_source.py | 41 +++++++----------- feast_spark_offline_store/spark_type_map.py | 23 +++++------ setup.py | 23 ++++++++--- tests/test_spark_offline_store.py | 21 +++++----- 10 files changed, 73 insertions(+), 175 deletions(-) delete mode 100644 .github/workflows/python-package.yaml delete mode 100644 .github/workflows/python-publish.yml diff --git a/.github/workflows/python-package.yaml b/.github/workflows/python-package.yaml deleted file mode 100644 index 4ee6306..0000000 --- a/.github/workflows/python-package.yaml +++ /dev/null @@ -1,42 +0,0 @@ -# This workflow will install Python dependencies, run tests and lint with a variety of Python versions -# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions - -name: Python package - -on: - push: - branches: [ develop ] - pull_request: - branches: [ develop ] - -jobs: - build: - - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - python-version: ["3.8", "3.9", "3.10"] - - steps: - - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -e '.[dev]' - - name: Lint with flake8 - run: | - # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - - name: Verify black formatting - run: | - black --diff --check . - - name: Test with pytest - run: | - pytest diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml deleted file mode 100644 index df48b51..0000000 --- a/.github/workflows/python-publish.yml +++ /dev/null @@ -1,36 +0,0 @@ -# This workflow will upload a Python Package using Twine when a release is created -# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries - -# This workflow uses actions that are not certified by GitHub. -# They are provided by a third-party and are governed by -# separate terms of service, privacy policy, and support -# documentation. - -name: Publish to PyPI - -on: - release: - types: [published] - -jobs: - deploy: - - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - name: Set up Python - uses: actions/setup-python@v2 - with: - python-version: '3.8' - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install build - - name: Build package - run: python -m build - - name: Publish package - uses: pypa/gh-action-pypi-publish@27b31702a0e7fc50959f5ad993c78deac1bdfc29 - with: - user: __token__ - password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/README.md b/README.md index 0d5059c..b90acf4 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Or to install from source: ```bash git clone git@github.com:Adyen/feast-spark-offline-store.git cd feast-spark-offline-store -pip install -e '.[dev]' +pip install -e .[dev] ``` ## Usage diff --git a/example_feature_repo/example.py b/example_feature_repo/example.py index d0bb2ac..8d0df08 100644 --- a/example_feature_repo/example.py +++ b/example_feature_repo/example.py @@ -1,3 +1,4 @@ + # This is an example feature definition file from google.protobuf.duration_pb2 import Duration @@ -28,11 +29,7 @@ ) # Define an entity for the driver. -driver = Entity( - name="driver_id", - value_type=ValueType.INT64, - description="driver id", -) +driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id", ) # Define FeatureView driver_hourly_stats_view = FeatureView( diff --git a/feast_spark_offline_store/__init__.py b/feast_spark_offline_store/__init__.py index f933e4c..a290fd7 100644 --- a/feast_spark_offline_store/__init__.py +++ b/feast_spark_offline_store/__init__.py @@ -9,9 +9,4 @@ # package is not installed pass -__all__ = [ - "SparkOptions", - "SparkSource", - "SparkOfflineStoreConfig", - "SparkOfflineStore", -] +__all__ = ["SparkOptions", "SparkSource", "SparkOfflineStoreConfig", "SparkOfflineStore"] diff --git a/feast_spark_offline_store/spark.py b/feast_spark_offline_store/spark.py index 1260bae..a0f721c 100644 --- a/feast_spark_offline_store/spark.py +++ b/feast_spark_offline_store/spark.py @@ -45,9 +45,7 @@ def pull_latest_from_table_or_query( start_date: datetime, end_date: datetime, ) -> RetrievalJob: - spark_session = get_spark_session_or_start_new_with_repoconfig( - config.offline_store - ) + spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store) assert isinstance(config.offline_store, SparkOfflineStoreConfig) assert isinstance(data_source, SparkSource) @@ -85,7 +83,7 @@ def pull_latest_from_table_or_query( spark_session=spark_session, query=query, full_feature_names=False, - on_demand_feature_views=None, + on_demand_feature_views=None ) @staticmethod @@ -100,9 +98,7 @@ def get_historical_features( ) -> RetrievalJob: assert isinstance(config.offline_store, SparkOfflineStoreConfig) - spark_session = get_spark_session_or_start_new_with_repoconfig( - config.offline_store - ) + spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store) table_name = offline_utils.get_temp_entity_table_name() @@ -110,8 +106,8 @@ def get_historical_features( spark_session, table_name, entity_df ) - entity_df_event_timestamp_col = ( - offline_utils.infer_event_timestamp_from_entity_df(entity_schema) + entity_df_event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df( + entity_schema ) expected_join_keys = offline_utils.get_expected_join_keys( @@ -123,10 +119,7 @@ def get_historical_features( ) query_context = offline_utils.get_feature_view_query_context( - feature_refs, - feature_views, - registry, - project, + feature_refs, feature_views, registry, project, ) query = offline_utils.build_point_in_time_query( @@ -171,9 +164,7 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: return self._on_demand_feature_views def to_spark_df(self) -> pyspark.sql.DataFrame: - statements = self.query.split( - "---EOS---" - ) # TODO can do better than this dirty split + statements = self.query.split("---EOS---") # TODO can do better than this dirty split *_, last = map(self.spark_session.sql, statements) return last @@ -194,7 +185,9 @@ def to_arrow(self) -> pyarrow.Table: def _upload_entity_df_and_get_entity_schema( - spark_session, table_name, entity_df + spark_session, + table_name, + entity_df ) -> Dict[str, np.dtype]: if isinstance(entity_df, pd.DataFrame): spark_session.createDataFrame(entity_df).createOrReplaceTempView(table_name) @@ -204,19 +197,12 @@ def _upload_entity_df_and_get_entity_schema( limited_entity_df = spark_session.table(table_name) # limited_entity_df = spark_session.table(table_name).limit(1).toPandas() - return dict( - zip( - limited_entity_df.columns, - spark_schema_to_np_dtypes(limited_entity_df.dtypes), - ) - ) + return dict(zip(limited_entity_df.columns, spark_schema_to_np_dtypes(limited_entity_df.dtypes))) else: raise InvalidEntityType(type(entity_df)) -def get_spark_session_or_start_new_with_repoconfig( - store_config: SparkOfflineStoreConfig, -) -> SparkSession: +def get_spark_session_or_start_new_with_repoconfig(store_config: SparkOfflineStoreConfig) -> SparkSession: spark_session = SparkSession.getActiveSession() if not spark_session: @@ -224,15 +210,11 @@ def get_spark_session_or_start_new_with_repoconfig( spark_conf = store_config.spark_conf if spark_conf: - spark_builder = spark_builder.config( - conf=SparkConf().setAll(spark_conf.items()) - ) # noqa + spark_builder = spark_builder.config(conf=SparkConf().setAll(spark_conf.items())) # noqa spark_session = spark_builder.getOrCreate() - spark_session.conf.set( - "spark.sql.parser.quotedRegexColumnNames", "true" - ) # important! + spark_session.conf.set("spark.sql.parser.quotedRegexColumnNames", "true") # important! return spark_session diff --git a/feast_spark_offline_store/spark_source.py b/feast_spark_offline_store/spark_source.py index 3d21714..309c771 100644 --- a/feast_spark_offline_store/spark_source.py +++ b/feast_spark_offline_store/spark_source.py @@ -11,18 +11,18 @@ class SparkSource(DataSource): def __init__( - self, - table: Optional[str] = None, - query: Optional[str] = None, - # TODO support file readers - # path: Optional[str] = None, - # jdbc=None, - # format: Optional[str] = None, - # options: Optional[Dict[str, Any]] = None, - event_timestamp_column: Optional[str] = None, - created_timestamp_column: Optional[str] = None, - field_mapping: Optional[Dict[str, str]] = None, - date_partition_column: Optional[str] = None, + self, + table: Optional[str] = None, + query: Optional[str] = None, + # TODO support file readers + # path: Optional[str] = None, + # jdbc=None, + # format: Optional[str] = None, + # options: Optional[Dict[str, Any]] = None, + event_timestamp_column: Optional[str] = None, + created_timestamp_column: Optional[str] = None, + field_mapping: Optional[Dict[str, str]] = None, + date_partition_column: Optional[str] = None, ): super().__init__( event_timestamp_column, @@ -112,20 +112,11 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: - from feast_spark_offline_store.spark import ( - get_spark_session_or_start_new_with_repoconfig, - ) - - spark_session = get_spark_session_or_start_new_with_repoconfig( - config.offline_store - ) + from feast_spark_offline_store.spark import get_spark_session_or_start_new_with_repoconfig + spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store) try: - return ( - (fields["name"], fields["type"]) - for fields in spark_session.table(self.table).schema.jsonValue()[ - "fields" - ] - ) + return ((fields['name'], fields['type']) + for fields in spark_session.table(self.table).schema.jsonValue()["fields"]) except AnalysisException: raise DataSourceNotFoundException(self.table) diff --git a/feast_spark_offline_store/spark_type_map.py b/feast_spark_offline_store/spark_type_map.py index 2e302af..a6ab28a 100644 --- a/feast_spark_offline_store/spark_type_map.py +++ b/feast_spark_offline_store/spark_type_map.py @@ -33,17 +33,16 @@ def spark_schema_to_np_dtypes(dtypes: List[Tuple[str, str]]) -> Iterator[dtype]: # TODO recheck all typing (also tz for timestamp) # https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#timestamp-with-time-zone-semantics - type_map = defaultdict( - lambda: dtype("O"), - { - "boolean": dtype("bool"), - "double": dtype("float64"), - "float": dtype("float64"), - "int": dtype("int64"), - "bigint": dtype("int64"), - "smallint": dtype("int64"), - "timestamp": dtype("datetime64[ns]"), - }, - ) + type_map = defaultdict(lambda: dtype("O"), { + 'boolean': dtype('bool'), + 'double': dtype('float64'), + 'float': dtype('float64'), + 'int': dtype('int64'), + 'bigint': dtype('int64'), + 'smallint': dtype('int64'), + 'timestamp': dtype('datetime64[ns]') + }) return (type_map[t] for _, t in dtypes) + + diff --git a/setup.py b/setup.py index f1b692e..3892dfc 100644 --- a/setup.py +++ b/setup.py @@ -9,15 +9,17 @@ "numpy", "pandas", "pytz>=2021.3", - "pydantic>=1.6", + "pydantic>=1.6" ] DEV_REQUIRES = INSTALL_REQUIRES + [ "wheel", - "black", - "flake8", + "black" +] + +TEST_REQUIRES = INSTALL_REQUIRES + [ "pytest>=6.2.5", - "google", + "google" ] setup( @@ -29,8 +31,17 @@ long_description_content_type="text/markdown", url="https://github.com/Adyen/feast-spark-offline-store", license="MIT", - python_requires=">=3.8.0", + python_requires=">=3.7.0", packages=find_packages(include=["feast_spark_offline_store"]), + test_requires=TEST_REQUIRES, install_requires=INSTALL_REQUIRES, - extras_require={"dev": DEV_REQUIRES}, + extras_require={ + "dev": DEV_REQUIRES + TEST_REQUIRES, + "test": TEST_REQUIRES, + }, + package_data={ + "feast_spark_offline_store": [ + "multiple_feature_view_point_in_time_join.sql", + ], + }, ) diff --git a/tests/test_spark_offline_store.py b/tests/test_spark_offline_store.py index d96c7a5..98c9520 100644 --- a/tests/test_spark_offline_store.py +++ b/tests/test_spark_offline_store.py @@ -18,17 +18,14 @@ def test_end_to_end(): fs.materialize_incremental(end_date=datetime.now()) entity_df = pd.DataFrame( - {"driver_id": [1001], "event_timestamp": [datetime.now()]} + {"driver_id": [1001], + "event_timestamp": [datetime.now()]} ) # Read features from offline store - feature_vector = ( - fs.get_historical_features( - features=["driver_hourly_stats:conv_rate"], entity_df=entity_df - ) - .to_df() - .to_dict() - ) + feature_vector = fs.get_historical_features( + features=["driver_hourly_stats:conv_rate"], entity_df=entity_df + ).to_df().to_dict() conv_rate = feature_vector["conv_rate"][0] assert conv_rate > 0 finally: @@ -38,7 +35,9 @@ def test_end_to_end(): def test_cli(): repo_name = "example_feature_repo" - os.system(f"PYTHONPATH=$PYTHONPATH:/$(pwd) feast -c {repo_name} apply") + os.system( + f"PYTHONPATH=$PYTHONPATH:/$(pwd) feast -c {repo_name} apply" + ) try: os.system( f"PYTHONPATH=$PYTHONPATH:/$(pwd) feast -c {repo_name} materialize-incremental 2021-08-19T22:29:28 > output" @@ -52,4 +51,6 @@ def test_cli(): 'Failed to successfully use provider from CLI. See "output" for more details.' ) finally: - os.system(f"PYTHONPATH=$PYTHONPATH:/$(pwd) feast -c {repo_name} teardown") + os.system( + f"PYTHONPATH=$PYTHONPATH:/$(pwd) feast -c {repo_name} teardown" + )