diff --git a/aligned/data_file.py b/aligned/data_file.py index 7df0488..e384a30 100644 --- a/aligned/data_file.py +++ b/aligned/data_file.py @@ -14,7 +14,7 @@ async def read_pandas(self) -> pd.DataFrame: raise NotImplementedError() async def to_pandas(self) -> pd.DataFrame: - await self.read_pandas() + return await self.read_pandas() async def to_polars(self) -> pl.LazyFrame: raise NotImplementedError() diff --git a/aligned/feature_store.py b/aligned/feature_store.py index e1b08d4..1690adc 100644 --- a/aligned/feature_store.py +++ b/aligned/feature_store.py @@ -5,7 +5,7 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta from importlib import import_module -from typing import Any +from typing import Any, Union from prometheus_client import Histogram @@ -44,6 +44,8 @@ labelnames=['feature_view'], ) +FeatureSourceable = Union[FeatureSource, FeatureSourceFactory, None] + @dataclass class SourceRequest: @@ -493,12 +495,12 @@ def add_model(self, model: ModelContract) -> None: compiled_model = type(model).compile() self.models[compiled_model.name] = compiled_model - def with_source(self, source: FeatureSource | FeatureSourceFactory | None = None) -> FeatureStore: + def with_source(self, source: FeatureSourceable = None) -> FeatureStore: """ Creates a new instance of a feature store, but changes where to fetch the features from ``` - store = # Load the store + store = await FeatureStore.from_dir(".") redis_store = store.with_source(redis) batch_source = redis_store.with_source() ``` @@ -511,7 +513,7 @@ def with_source(self, source: FeatureSource | FeatureSourceFactory | None = None """ if isinstance(source, FeatureSourceFactory): feature_source = source.feature_source() - else: + elif source is None: sources = { FeatureLocation.feature_view(view.name).identifier: view.batch_data_source for view in set(self.feature_views.values()) @@ -521,6 +523,13 @@ def with_source(self, source: FeatureSource | FeatureSourceFactory | None = None if model.predictions_view.source is not None } feature_source = source or BatchFeatureSource(sources=sources) + elif isinstance(source, FeatureSource): + feature_source = source + else: + raise ValueError( + 'Setting a dedicated source needs to be either a FeatureSource, ' + f'or FeatureSourceFactory. Got: {type(source)}' + ) return FeatureStore( feature_views=self.feature_views, @@ -781,11 +790,9 @@ def all_predictions(self, limit: int | None = None) -> RetrivalJob: request = pred_view.request(self.model.name) return pred_view.source.all_data(request, limit=limit) - def using_source( - self, source: FeatureSource | FeatureSourceFactory | BatchDataSource - ) -> ModelFeatureStore: + def using_source(self, source: FeatureSourceable | BatchDataSource) -> ModelFeatureStore: - model_source: FeatureSource | FeatureSourceFactory + model_source: FeatureSourceable if isinstance(source, BatchDataSource): model_source = BatchFeatureSource({FeatureLocation.model(self.model.name).identifier: source}) @@ -1038,9 +1045,7 @@ def request(self) -> RetrivalRequest: def source(self) -> FeatureSource: return self.store.feature_source - def using_source( - self, source: FeatureSource | FeatureSourceFactory | BatchDataSource - ) -> FeatureViewStore: + def using_source(self, source: FeatureSourceable | BatchDataSource) -> FeatureViewStore: """ Sets the source to load features from. @@ -1061,7 +1066,7 @@ def using_source( Returns: A new `FeatureViewStore` that sends queries to the passed source """ - view_source: FeatureSource | FeatureSourceFactory + view_source: FeatureSourceable if isinstance(source, BatchDataSource): view_source = BatchFeatureSource( diff --git a/aligned/feature_view/combined_view.py b/aligned/feature_view/combined_view.py index 4d6a144..eb7fd63 100644 --- a/aligned/feature_view/combined_view.py +++ b/aligned/feature_view/combined_view.py @@ -44,9 +44,8 @@ def query(self) -> 'FeatureViewStore': """Makes it possible to query the feature view for features ```python - class SomeView(FeatureView): - - metadata = ... + @feature_view(...) + class SomeView: id = Int32().as_entity() diff --git a/aligned/retrival_job.py b/aligned/retrival_job.py index 7c51762..ed3ca06 100644 --- a/aligned/retrival_job.py +++ b/aligned/retrival_job.py @@ -340,6 +340,9 @@ def fill_missing_columns(self) -> RetrivalJob: def rename(self, mappings: dict[str, str]) -> RetrivalJob: return RenameJob(self, mappings) + def drop_duplicate_entities(self) -> RetrivalJob: + return DropDuplicateEntities(self) + def ignore_event_timestamp(self) -> RetrivalJob: if isinstance(self, ModificationJob): return self.copy_with(self.job.ignore_event_timestamp()) @@ -433,6 +436,24 @@ async def to_polars(self) -> pl.LazyFrame: return df.rename(self.mappings) +@dataclass +class DropDuplicateEntities(RetrivalJob, ModificationJob): + + job: RetrivalJob + + @property + def entity_columns(self) -> list[str]: + return self.job.request_result.entity_columns + + async def to_polars(self) -> pl.LazyFrame: + df = await self.job.to_polars() + return df.unique(subset=self.entity_columns) + + async def to_pandas(self) -> pd.DataFrame: + df = await self.job.to_pandas() + return df.drop_duplicates(subset=self.entity_columns) + + @dataclass class UpdateVectorIndexJob(RetrivalJob, ModificationJob): diff --git a/aligned/schemas/model.py b/aligned/schemas/model.py index db6627f..9948c4b 100644 --- a/aligned/schemas/model.py +++ b/aligned/schemas/model.py @@ -69,13 +69,10 @@ def full_schema(self) -> set[Feature]: return schema def request(self, name: str) -> RetrivalRequest: - entities = self.entities - if self.model_version_column: - entities.add(self.model_version_column) return RetrivalRequest( name=name, location=FeatureLocation.model(name), - entities=entities, + entities=self.entities, features=self.features, derived_features=self.derived_features, event_timestamp=self.event_timestamp, @@ -84,9 +81,6 @@ def request(self, name: str) -> RetrivalRequest: def request_for(self, features: set[str], name: str) -> RetrivalRequest: entities = self.entities - # if self.model_version_column: - # entities.add(self.model_version_column) - return RetrivalRequest( name=name, location=FeatureLocation.model(name), diff --git a/aligned/validation/pandera.py b/aligned/validation/pandera.py index c98951a..aae7758 100644 --- a/aligned/validation/pandera.py +++ b/aligned/validation/pandera.py @@ -58,27 +58,13 @@ def _column_for(self, feature: Feature) -> Column: ) def _build_schema(self, features: list[Feature]) -> DataFrameSchema: - return DataFrameSchema(columns={feature.name: self._column_for(feature) for feature in features}) + return DataFrameSchema( + columns={feature.name: self._column_for(feature) for feature in features}, drop_invalid_rows=True + ) async def validate_pandas(self, features: list[Feature], df: pd.DataFrame) -> pd.DataFrame: - from pandera.errors import SchemaError - schema = self._build_schema(features) - try: - return schema.validate(df) - except SchemaError as error: - # Will only return one error at a time, so will remove - # errors and then run it recrusive - - if error.failure_cases.shape[0] == df.shape[0]: - raise ValueError('Validation is removing all the data.') - - if error.failure_cases['index'].iloc[0] is None: - raise ValueError(error) - - return await self.validate_pandas( - features, df.loc[df.index.delete(error.failure_cases['index'])].reset_index() - ) + return schema.validate(df, lazy=True) async def validate_polars(self, features: list[Feature], df: pl.LazyFrame) -> pl.LazyFrame: input_df = df.collect().to_pandas() diff --git a/poetry.lock b/poetry.lock index 5217751..86929dc 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1106,6 +1106,17 @@ pyparsing = ">=2.3.1" python-dateutil = ">=2.7" setuptools_scm = ">=7" +[[package]] +name = "multimethod" +version = "1.10" +description = "Multiple argument dispatching." +optional = true +python-versions = ">=3.8" +files = [ + {file = "multimethod-1.10-py3-none-any.whl", hash = "sha256:afd84da9c3d0445c84f827e4d63ad42d17c6d29b122427c6dee9032ac2d2a0d4"}, + {file = "multimethod-1.10.tar.gz", hash = "sha256:daa45af3fe257f73abb69673fd54ddeaf31df0eb7363ad6e1251b7c9b192d8c5"}, +] + [[package]] name = "mypy-extensions" version = "1.0.0" @@ -1300,34 +1311,36 @@ xml = ["lxml (>=4.8.0)"] [[package]] name = "pandera" -version = "0.13.4" +version = "0.17.2" description = "A light-weight and flexible data validation and testing tool for statistical data objects." optional = true python-versions = ">=3.7" files = [ - {file = "pandera-0.13.4-py3-none-any.whl", hash = "sha256:9e91687861406284270add1d467f204630377892e7a4b45809bb7546f0013153"}, - {file = "pandera-0.13.4.tar.gz", hash = "sha256:6ef2b7ee00d3439ac815d4347984421a08502da1020cec60c06dd0135e8aee2f"}, + {file = "pandera-0.17.2-py3-none-any.whl", hash = "sha256:8e4e7b279c62f6d4b5109801544bf8d46e1c9fdf7ceceb8fedd5f3dad0c1bea1"}, + {file = "pandera-0.17.2.tar.gz", hash = "sha256:67515984f855ba14d12443f893b5ff90ae6796f613d5f3df43abad406a48c373"}, ] [package.dependencies] +multimethod = "*" numpy = ">=1.19.0" packaging = ">=20.0" pandas = ">=1.2.0" pydantic = "*" +typeguard = ">=3.0.2" typing-inspect = ">=0.6.0" wrapt = "*" [package.extras] -all = ["black", "dask", "fastapi", "frictionless", "geopandas", "hypothesis (>=5.41.1)", "modin", "pandas-stubs (<=1.4.3.220807)", "pyspark (>=3.2.0)", "pyyaml (>=5.1)", "ray (<=1.7.0)", "scipy", "shapely"] +all = ["black", "dask", "fastapi", "frictionless (<=4.40.8)", "geopandas", "hypothesis (>=5.41.1)", "modin", "pandas-stubs", "pyspark (>=3.2.0)", "pyyaml (>=5.1)", "ray", "scipy", "shapely"] dask = ["dask"] fastapi = ["fastapi"] geopandas = ["geopandas", "shapely"] hypotheses = ["scipy"] -io = ["black", "frictionless", "pyyaml (>=5.1)"] -modin = ["dask", "modin", "ray (<=1.7.0)"] +io = ["black", "frictionless (<=4.40.8)", "pyyaml (>=5.1)"] +modin = ["dask", "modin", "ray"] modin-dask = ["dask", "modin"] -modin-ray = ["modin", "ray (<=1.7.0)"] -mypy = ["pandas-stubs (<=1.4.3.220807)"] +modin-ray = ["modin", "ray"] +mypy = ["pandas-stubs"] pyspark = ["pyspark (>=3.2.0)"] strategies = ["hypothesis (>=5.41.1)"] @@ -2076,6 +2089,24 @@ files = [ {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, ] +[[package]] +name = "typeguard" +version = "4.1.5" +description = "Run-time type checker for Python" +optional = true +python-versions = ">=3.8" +files = [ + {file = "typeguard-4.1.5-py3-none-any.whl", hash = "sha256:8923e55f8873caec136c892c3bed1f676eae7be57cdb94819281b3d3bc9c0953"}, + {file = "typeguard-4.1.5.tar.gz", hash = "sha256:ea0a113bbc111bcffc90789ebb215625c963411f7096a7e9062d4e4630c155fd"}, +] + +[package.dependencies] +typing-extensions = {version = ">=4.7.0", markers = "python_version < \"3.12\""} + +[package.extras] +doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)"] +test = ["coverage[toml] (>=7)", "mypy (>=1.2.0)", "pytest (>=7)"] + [[package]] name = "types-pyopenssl" version = "23.2.0.2" @@ -2279,4 +2310,4 @@ server = ["asgi-correlation-id", "fastapi", "prometheus-fastapi-instrumentator", [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "b44c29471b4d57bb29508c4cf59de347a6279a762b0cab93ff647451ad90fd11" +content-hash = "7920f2f5aacee01ea109a63d13fb11116c3d26a889314e56cd030d81f868c02d" diff --git a/pyproject.toml b/pyproject.toml index 2f0d689..2d6af25 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aligned" -version = "0.0.30" +version = "0.0.31" description = "A scalable feature store that makes it easy to align offline and online ML systems" authors = ["Mats E. Mollestad "] license = "Apache-2.0" @@ -55,7 +55,7 @@ nest-asyncio = "^1.5.5" pydantic = "^2.0.0" prometheus_client = "^0.16.0" asgi-correlation-id = { version = "^3.0.0", optional = true } -pandera = { version = "^0.13.3", optional = true} +pandera = { version = "^0.17.0", optional = true} httpx = "^0.23.0" polars = { version = "^0.17.15", extras = ["all"] } pillow = { version = "^9.4.0", optional = true }