diff --git a/.github/workflows/python-package.yaml b/.github/workflows/python-package.yaml new file mode 100644 index 0000000..4ee6306 --- /dev/null +++ b/.github/workflows/python-package.yaml @@ -0,0 +1,42 @@ +# This workflow will install Python dependencies, run tests and lint with a variety of Python versions +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Python package + +on: + push: + branches: [ develop ] + pull_request: + branches: [ develop ] + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.8", "3.9", "3.10"] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e '.[dev]' + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Verify black formatting + run: | + black --diff --check . + - name: Test with pytest + run: | + pytest diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml new file mode 100644 index 0000000..df48b51 --- /dev/null +++ b/.github/workflows/python-publish.yml @@ -0,0 +1,36 @@ +# This workflow will upload a Python Package using Twine when a release is created +# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +name: Publish to PyPI + +on: + release: + types: [published] + +jobs: + deploy: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.8' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build + - name: Build package + run: python -m build + - name: Publish package + uses: pypa/gh-action-pypi-publish@27b31702a0e7fc50959f5ad993c78deac1bdfc29 + with: + user: __token__ + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/README.md b/README.md index b90acf4..0d5059c 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Or to install from source: ```bash git clone git@github.com:Adyen/feast-spark-offline-store.git cd feast-spark-offline-store -pip install -e .[dev] +pip install -e '.[dev]' ``` ## Usage diff --git a/example_feature_repo/example.py b/example_feature_repo/example.py index 8d0df08..d0bb2ac 100644 --- a/example_feature_repo/example.py +++ b/example_feature_repo/example.py @@ -1,4 +1,3 @@ - # This is an example feature definition file from google.protobuf.duration_pb2 import Duration @@ -29,7 +28,11 @@ ) # Define an entity for the driver. -driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id", ) +driver = Entity( + name="driver_id", + value_type=ValueType.INT64, + description="driver id", +) # Define FeatureView driver_hourly_stats_view = FeatureView( diff --git a/feast_spark_offline_store/__init__.py b/feast_spark_offline_store/__init__.py index a290fd7..f933e4c 100644 --- a/feast_spark_offline_store/__init__.py +++ b/feast_spark_offline_store/__init__.py @@ -9,4 +9,9 @@ # package is not installed pass -__all__ = ["SparkOptions", "SparkSource", "SparkOfflineStoreConfig", "SparkOfflineStore"] +__all__ = [ + "SparkOptions", + "SparkSource", + "SparkOfflineStoreConfig", + "SparkOfflineStore", +] diff --git a/feast_spark_offline_store/spark.py b/feast_spark_offline_store/spark.py index a0f721c..1260bae 100644 --- a/feast_spark_offline_store/spark.py +++ b/feast_spark_offline_store/spark.py @@ -45,7 +45,9 @@ def pull_latest_from_table_or_query( start_date: datetime, end_date: datetime, ) -> RetrievalJob: - spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store) + spark_session = get_spark_session_or_start_new_with_repoconfig( + config.offline_store + ) assert isinstance(config.offline_store, SparkOfflineStoreConfig) assert isinstance(data_source, SparkSource) @@ -83,7 +85,7 @@ def pull_latest_from_table_or_query( spark_session=spark_session, query=query, full_feature_names=False, - on_demand_feature_views=None + on_demand_feature_views=None, ) @staticmethod @@ -98,7 +100,9 @@ def get_historical_features( ) -> RetrievalJob: assert isinstance(config.offline_store, SparkOfflineStoreConfig) - spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store) + spark_session = get_spark_session_or_start_new_with_repoconfig( + config.offline_store + ) table_name = offline_utils.get_temp_entity_table_name() @@ -106,8 +110,8 @@ def get_historical_features( spark_session, table_name, entity_df ) - entity_df_event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df( - entity_schema + entity_df_event_timestamp_col = ( + offline_utils.infer_event_timestamp_from_entity_df(entity_schema) ) expected_join_keys = offline_utils.get_expected_join_keys( @@ -119,7 +123,10 @@ def get_historical_features( ) query_context = offline_utils.get_feature_view_query_context( - feature_refs, feature_views, registry, project, + feature_refs, + feature_views, + registry, + project, ) query = offline_utils.build_point_in_time_query( @@ -164,7 +171,9 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: return self._on_demand_feature_views def to_spark_df(self) -> pyspark.sql.DataFrame: - statements = self.query.split("---EOS---") # TODO can do better than this dirty split + statements = self.query.split( + "---EOS---" + ) # TODO can do better than this dirty split *_, last = map(self.spark_session.sql, statements) return last @@ -185,9 +194,7 @@ def to_arrow(self) -> pyarrow.Table: def _upload_entity_df_and_get_entity_schema( - spark_session, - table_name, - entity_df + spark_session, table_name, entity_df ) -> Dict[str, np.dtype]: if isinstance(entity_df, pd.DataFrame): spark_session.createDataFrame(entity_df).createOrReplaceTempView(table_name) @@ -197,12 +204,19 @@ def _upload_entity_df_and_get_entity_schema( limited_entity_df = spark_session.table(table_name) # limited_entity_df = spark_session.table(table_name).limit(1).toPandas() - return dict(zip(limited_entity_df.columns, spark_schema_to_np_dtypes(limited_entity_df.dtypes))) + return dict( + zip( + limited_entity_df.columns, + spark_schema_to_np_dtypes(limited_entity_df.dtypes), + ) + ) else: raise InvalidEntityType(type(entity_df)) -def get_spark_session_or_start_new_with_repoconfig(store_config: SparkOfflineStoreConfig) -> SparkSession: +def get_spark_session_or_start_new_with_repoconfig( + store_config: SparkOfflineStoreConfig, +) -> SparkSession: spark_session = SparkSession.getActiveSession() if not spark_session: @@ -210,11 +224,15 @@ def get_spark_session_or_start_new_with_repoconfig(store_config: SparkOfflineSto spark_conf = store_config.spark_conf if spark_conf: - spark_builder = spark_builder.config(conf=SparkConf().setAll(spark_conf.items())) # noqa + spark_builder = spark_builder.config( + conf=SparkConf().setAll(spark_conf.items()) + ) # noqa spark_session = spark_builder.getOrCreate() - spark_session.conf.set("spark.sql.parser.quotedRegexColumnNames", "true") # important! + spark_session.conf.set( + "spark.sql.parser.quotedRegexColumnNames", "true" + ) # important! return spark_session diff --git a/feast_spark_offline_store/spark_source.py b/feast_spark_offline_store/spark_source.py index 309c771..3d21714 100644 --- a/feast_spark_offline_store/spark_source.py +++ b/feast_spark_offline_store/spark_source.py @@ -11,18 +11,18 @@ class SparkSource(DataSource): def __init__( - self, - table: Optional[str] = None, - query: Optional[str] = None, - # TODO support file readers - # path: Optional[str] = None, - # jdbc=None, - # format: Optional[str] = None, - # options: Optional[Dict[str, Any]] = None, - event_timestamp_column: Optional[str] = None, - created_timestamp_column: Optional[str] = None, - field_mapping: Optional[Dict[str, str]] = None, - date_partition_column: Optional[str] = None, + self, + table: Optional[str] = None, + query: Optional[str] = None, + # TODO support file readers + # path: Optional[str] = None, + # jdbc=None, + # format: Optional[str] = None, + # options: Optional[Dict[str, Any]] = None, + event_timestamp_column: Optional[str] = None, + created_timestamp_column: Optional[str] = None, + field_mapping: Optional[Dict[str, str]] = None, + date_partition_column: Optional[str] = None, ): super().__init__( event_timestamp_column, @@ -112,11 +112,20 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: - from feast_spark_offline_store.spark import get_spark_session_or_start_new_with_repoconfig - spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store) + from feast_spark_offline_store.spark import ( + get_spark_session_or_start_new_with_repoconfig, + ) + + spark_session = get_spark_session_or_start_new_with_repoconfig( + config.offline_store + ) try: - return ((fields['name'], fields['type']) - for fields in spark_session.table(self.table).schema.jsonValue()["fields"]) + return ( + (fields["name"], fields["type"]) + for fields in spark_session.table(self.table).schema.jsonValue()[ + "fields" + ] + ) except AnalysisException: raise DataSourceNotFoundException(self.table) diff --git a/feast_spark_offline_store/spark_type_map.py b/feast_spark_offline_store/spark_type_map.py index a6ab28a..2e302af 100644 --- a/feast_spark_offline_store/spark_type_map.py +++ b/feast_spark_offline_store/spark_type_map.py @@ -33,16 +33,17 @@ def spark_schema_to_np_dtypes(dtypes: List[Tuple[str, str]]) -> Iterator[dtype]: # TODO recheck all typing (also tz for timestamp) # https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#timestamp-with-time-zone-semantics - type_map = defaultdict(lambda: dtype("O"), { - 'boolean': dtype('bool'), - 'double': dtype('float64'), - 'float': dtype('float64'), - 'int': dtype('int64'), - 'bigint': dtype('int64'), - 'smallint': dtype('int64'), - 'timestamp': dtype('datetime64[ns]') - }) + type_map = defaultdict( + lambda: dtype("O"), + { + "boolean": dtype("bool"), + "double": dtype("float64"), + "float": dtype("float64"), + "int": dtype("int64"), + "bigint": dtype("int64"), + "smallint": dtype("int64"), + "timestamp": dtype("datetime64[ns]"), + }, + ) return (type_map[t] for _, t in dtypes) - - diff --git a/setup.py b/setup.py index 3892dfc..47d2719 100644 --- a/setup.py +++ b/setup.py @@ -9,39 +9,28 @@ "numpy", "pandas", "pytz>=2021.3", - "pydantic>=1.6" + "pydantic>=1.6", ] DEV_REQUIRES = INSTALL_REQUIRES + [ "wheel", - "black" -] - -TEST_REQUIRES = INSTALL_REQUIRES + [ + "black", + "flake8", "pytest>=6.2.5", - "google" + "google", ] setup( name="feast_spark_offline_store", - version="0.0.2", + version="0.0.3", author="Thijs Brits", description="Spark support for Feast offline store", long_description=open("README.md").read(), long_description_content_type="text/markdown", url="https://github.com/Adyen/feast-spark-offline-store", license="MIT", - python_requires=">=3.7.0", + python_requires=">=3.8.0", packages=find_packages(include=["feast_spark_offline_store"]), - test_requires=TEST_REQUIRES, install_requires=INSTALL_REQUIRES, - extras_require={ - "dev": DEV_REQUIRES + TEST_REQUIRES, - "test": TEST_REQUIRES, - }, - package_data={ - "feast_spark_offline_store": [ - "multiple_feature_view_point_in_time_join.sql", - ], - }, + extras_require={"dev": DEV_REQUIRES}, ) diff --git a/tests/test_spark_offline_store.py b/tests/test_spark_offline_store.py index 98c9520..d96c7a5 100644 --- a/tests/test_spark_offline_store.py +++ b/tests/test_spark_offline_store.py @@ -18,14 +18,17 @@ def test_end_to_end(): fs.materialize_incremental(end_date=datetime.now()) entity_df = pd.DataFrame( - {"driver_id": [1001], - "event_timestamp": [datetime.now()]} + {"driver_id": [1001], "event_timestamp": [datetime.now()]} ) # Read features from offline store - feature_vector = fs.get_historical_features( - features=["driver_hourly_stats:conv_rate"], entity_df=entity_df - ).to_df().to_dict() + feature_vector = ( + fs.get_historical_features( + features=["driver_hourly_stats:conv_rate"], entity_df=entity_df + ) + .to_df() + .to_dict() + ) conv_rate = feature_vector["conv_rate"][0] assert conv_rate > 0 finally: @@ -35,9 +38,7 @@ def test_end_to_end(): def test_cli(): repo_name = "example_feature_repo" - os.system( - f"PYTHONPATH=$PYTHONPATH:/$(pwd) feast -c {repo_name} apply" - ) + os.system(f"PYTHONPATH=$PYTHONPATH:/$(pwd) feast -c {repo_name} apply") try: os.system( f"PYTHONPATH=$PYTHONPATH:/$(pwd) feast -c {repo_name} materialize-incremental 2021-08-19T22:29:28 > output" @@ -51,6 +52,4 @@ def test_cli(): 'Failed to successfully use provider from CLI. See "output" for more details.' ) finally: - os.system( - f"PYTHONPATH=$PYTHONPATH:/$(pwd) feast -c {repo_name} teardown" - ) + os.system(f"PYTHONPATH=$PYTHONPATH:/$(pwd) feast -c {repo_name} teardown")