Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Revert "First release (#4)"
Browse files Browse the repository at this point in the history
This reverts commit 0a92c8c.
  • Loading branch information
joostr committed Dec 22, 2021
1 parent 0a92c8c commit 26053ef
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 175 deletions.
42 changes: 0 additions & 42 deletions .github/workflows/python-package.yaml

This file was deleted.

36 changes: 0 additions & 36 deletions .github/workflows/python-publish.yml

This file was deleted.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Or to install from source:
```bash
git clone git@github.com:Adyen/feast-spark-offline-store.git
cd feast-spark-offline-store
pip install -e '.[dev]'
pip install -e .[dev]
```

## Usage
Expand Down
7 changes: 2 additions & 5 deletions example_feature_repo/example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# This is an example feature definition file

from google.protobuf.duration_pb2 import Duration
Expand Down Expand Up @@ -28,11 +29,7 @@
)

# Define an entity for the driver.
driver = Entity(
name="driver_id",
value_type=ValueType.INT64,
description="driver id",
)
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id", )

# Define FeatureView
driver_hourly_stats_view = FeatureView(
Expand Down
7 changes: 1 addition & 6 deletions feast_spark_offline_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,4 @@
# package is not installed
pass

__all__ = [
"SparkOptions",
"SparkSource",
"SparkOfflineStoreConfig",
"SparkOfflineStore",
]
__all__ = ["SparkOptions", "SparkSource", "SparkOfflineStoreConfig", "SparkOfflineStore"]
46 changes: 14 additions & 32 deletions feast_spark_offline_store/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ def pull_latest_from_table_or_query(
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
spark_session = get_spark_session_or_start_new_with_repoconfig(
config.offline_store
)
spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store)

assert isinstance(config.offline_store, SparkOfflineStoreConfig)
assert isinstance(data_source, SparkSource)
Expand Down Expand Up @@ -85,7 +83,7 @@ def pull_latest_from_table_or_query(
spark_session=spark_session,
query=query,
full_feature_names=False,
on_demand_feature_views=None,
on_demand_feature_views=None
)

@staticmethod
Expand All @@ -100,18 +98,16 @@ def get_historical_features(
) -> RetrievalJob:
assert isinstance(config.offline_store, SparkOfflineStoreConfig)

spark_session = get_spark_session_or_start_new_with_repoconfig(
config.offline_store
)
spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store)

table_name = offline_utils.get_temp_entity_table_name()

entity_schema = _upload_entity_df_and_get_entity_schema(
spark_session, table_name, entity_df
)

entity_df_event_timestamp_col = (
offline_utils.infer_event_timestamp_from_entity_df(entity_schema)
entity_df_event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df(
entity_schema
)

expected_join_keys = offline_utils.get_expected_join_keys(
Expand All @@ -123,10 +119,7 @@ def get_historical_features(
)

query_context = offline_utils.get_feature_view_query_context(
feature_refs,
feature_views,
registry,
project,
feature_refs, feature_views, registry, project,
)

query = offline_utils.build_point_in_time_query(
Expand Down Expand Up @@ -171,9 +164,7 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

def to_spark_df(self) -> pyspark.sql.DataFrame:
statements = self.query.split(
"---EOS---"
) # TODO can do better than this dirty split
statements = self.query.split("---EOS---") # TODO can do better than this dirty split
*_, last = map(self.spark_session.sql, statements)
return last

Expand All @@ -194,7 +185,9 @@ def to_arrow(self) -> pyarrow.Table:


def _upload_entity_df_and_get_entity_schema(
spark_session, table_name, entity_df
spark_session,
table_name,
entity_df
) -> Dict[str, np.dtype]:
if isinstance(entity_df, pd.DataFrame):
spark_session.createDataFrame(entity_df).createOrReplaceTempView(table_name)
Expand All @@ -204,35 +197,24 @@ def _upload_entity_df_and_get_entity_schema(
limited_entity_df = spark_session.table(table_name)
# limited_entity_df = spark_session.table(table_name).limit(1).toPandas()

return dict(
zip(
limited_entity_df.columns,
spark_schema_to_np_dtypes(limited_entity_df.dtypes),
)
)
return dict(zip(limited_entity_df.columns, spark_schema_to_np_dtypes(limited_entity_df.dtypes)))
else:
raise InvalidEntityType(type(entity_df))


def get_spark_session_or_start_new_with_repoconfig(
store_config: SparkOfflineStoreConfig,
) -> SparkSession:
def get_spark_session_or_start_new_with_repoconfig(store_config: SparkOfflineStoreConfig) -> SparkSession:
spark_session = SparkSession.getActiveSession()

if not spark_session:
spark_builder = SparkSession.builder
spark_conf = store_config.spark_conf

if spark_conf:
spark_builder = spark_builder.config(
conf=SparkConf().setAll(spark_conf.items())
) # noqa
spark_builder = spark_builder.config(conf=SparkConf().setAll(spark_conf.items())) # noqa

spark_session = spark_builder.getOrCreate()

spark_session.conf.set(
"spark.sql.parser.quotedRegexColumnNames", "true"
) # important!
spark_session.conf.set("spark.sql.parser.quotedRegexColumnNames", "true") # important!

return spark_session

Expand Down
41 changes: 16 additions & 25 deletions feast_spark_offline_store/spark_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@

class SparkSource(DataSource):
def __init__(
self,
table: Optional[str] = None,
query: Optional[str] = None,
# TODO support file readers
# path: Optional[str] = None,
# jdbc=None,
# format: Optional[str] = None,
# options: Optional[Dict[str, Any]] = None,
event_timestamp_column: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
self,
table: Optional[str] = None,
query: Optional[str] = None,
# TODO support file readers
# path: Optional[str] = None,
# jdbc=None,
# format: Optional[str] = None,
# options: Optional[Dict[str, Any]] = None,
event_timestamp_column: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
):
super().__init__(
event_timestamp_column,
Expand Down Expand Up @@ -112,20 +112,11 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
from feast_spark_offline_store.spark import (
get_spark_session_or_start_new_with_repoconfig,
)

spark_session = get_spark_session_or_start_new_with_repoconfig(
config.offline_store
)
from feast_spark_offline_store.spark import get_spark_session_or_start_new_with_repoconfig
spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store)
try:
return (
(fields["name"], fields["type"])
for fields in spark_session.table(self.table).schema.jsonValue()[
"fields"
]
)
return ((fields['name'], fields['type'])
for fields in spark_session.table(self.table).schema.jsonValue()["fields"])
except AnalysisException:
raise DataSourceNotFoundException(self.table)

Expand Down
23 changes: 11 additions & 12 deletions feast_spark_offline_store/spark_type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@ def spark_schema_to_np_dtypes(dtypes: List[Tuple[str, str]]) -> Iterator[dtype]:
# TODO recheck all typing (also tz for timestamp)
# https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#timestamp-with-time-zone-semantics

type_map = defaultdict(
lambda: dtype("O"),
{
"boolean": dtype("bool"),
"double": dtype("float64"),
"float": dtype("float64"),
"int": dtype("int64"),
"bigint": dtype("int64"),
"smallint": dtype("int64"),
"timestamp": dtype("datetime64[ns]"),
},
)
type_map = defaultdict(lambda: dtype("O"), {
'boolean': dtype('bool'),
'double': dtype('float64'),
'float': dtype('float64'),
'int': dtype('int64'),
'bigint': dtype('int64'),
'smallint': dtype('int64'),
'timestamp': dtype('datetime64[ns]')
})

return (type_map[t] for _, t in dtypes)


23 changes: 17 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
"numpy",
"pandas",
"pytz>=2021.3",
"pydantic>=1.6",
"pydantic>=1.6"
]

DEV_REQUIRES = INSTALL_REQUIRES + [
"wheel",
"black",
"flake8",
"black"
]

TEST_REQUIRES = INSTALL_REQUIRES + [
"pytest>=6.2.5",
"google",
"google"
]

setup(
Expand All @@ -29,8 +31,17 @@
long_description_content_type="text/markdown",
url="https://github.com/Adyen/feast-spark-offline-store",
license="MIT",
python_requires=">=3.8.0",
python_requires=">=3.7.0",
packages=find_packages(include=["feast_spark_offline_store"]),
test_requires=TEST_REQUIRES,
install_requires=INSTALL_REQUIRES,
extras_require={"dev": DEV_REQUIRES},
extras_require={
"dev": DEV_REQUIRES + TEST_REQUIRES,
"test": TEST_REQUIRES,
},
package_data={
"feast_spark_offline_store": [
"multiple_feature_view_point_in_time_join.sql",
],
},
)
Loading

0 comments on commit 26053ef

Please sign in to comment.