From dc4c879753beab8081b66b595d4ed536cca3e225 Mon Sep 17 00:00:00 2001 From: "Mats E. Mollestad" Date: Thu, 9 Nov 2023 22:18:34 +0100 Subject: [PATCH] chore: restructured a lot of the sources --- aligned/compiler/feature_factory.py | 69 ++++++++++++++++++ aligned/compiler/model.py | 28 +++---- aligned/compiler/tests/features.py | 2 +- aligned/compiler/tests/test_repo_reader.py | 2 +- aligned/data_source/batch_data_source.py | 9 +++ aligned/feature_store.py | 19 ++--- aligned/feature_view/feature_view.py | 50 +++++++------ .../tests/test_hidden_variable.py | 23 +++++- aligned/local/job.py | 28 ++++--- aligned/psql/jobs.py | 25 ++++++- aligned/redis/job.py | 4 +- aligned/redshift/jobs.py | 4 +- aligned/request/retrival_request.py | 8 ++ aligned/retrival_job.py | 20 ----- aligned/schemas/feature_view.py | 6 +- aligned/schemas/model.py | 2 +- aligned/sources/local.py | 16 ++-- aligned/sources/psql.py | 7 +- aligned/sources/s3.py | 6 +- aligned/tests/test_model_target.py | 4 +- aligned/tests/test_source_validation.py | 38 +++++----- conftest.py | 14 ++-- pyproject.toml | 2 +- test_data/feature-store.json | 2 +- test_data/test_model.parquet | Bin 624 -> 624 bytes 25 files changed, 250 insertions(+), 138 deletions(-) diff --git a/aligned/compiler/feature_factory.py b/aligned/compiler/feature_factory.py index 00bc800..b35066e 100644 --- a/aligned/compiler/feature_factory.py +++ b/aligned/compiler/feature_factory.py @@ -105,6 +105,75 @@ def feature_referance(self) -> FeatureReferance: pass +def compile_hidden_features( + feature: FeatureFactory, + location: FeatureLocation, + hidden_features: int, + var_name: str, + entities: set[Feature], +): + aggregations = [] + + features = set() + derived_features = set() + + if feature.transformation: + # Adding features that is not stored in the view + # e.g: + # class SomeView(FeatureView): + # ... + # x, y = Bool(), Bool() + # z = (x & y) | x + # + # Here will (x & y)'s result be a 'hidden' feature + feature_deps = [(feat.depth(), feat) for feat in feature.feature_dependencies()] + + # Sorting by key in order to instanciate the "core" features first + # And then making it possible for other features to reference them + def sort_key(x: tuple[int, FeatureFactory]) -> int: + return x[0] + + for depth, feature_dep in sorted(feature_deps, key=sort_key): + + if not feature_dep._location: + feature_dep._location = location + + if feature_dep._name: + feat_dep = feature_dep.feature() + if feat_dep in features or feat_dep in entities: + continue + + if depth == 0: + # The raw value and the transformed have the same name + if not feature_dep._name: + feature_dep._name = var_name + feat_dep = feature_dep.feature() + features.add(feat_dep) + continue + + if not feature_dep._name: + feature_dep._name = str(hidden_features) + hidden_features += 1 + + if isinstance(feature_dep.transformation, AggregationTransformationFactory): + aggregations.append(feature_dep) + else: + feature_graph = feature_dep.compile() # Should decide on which payload to send + if feature_graph in derived_features: + continue + + derived_features.add(feature_dep.compile()) + + if not feature._name: + feature._name = 'ephemoral' + if isinstance(feature.transformation, AggregationTransformationFactory): + aggregations.append(feature) + else: + derived_features.add(feature.compile()) # Should decide on which payload to send + + return features, derived_features + + @dataclass class RegressionLabel(FeatureReferencable): feature: FeatureFactory diff --git a/aligned/compiler/model.py b/aligned/compiler/model.py index 1f852b0..1f4d1f6 100644 --- a/aligned/compiler/model.py +++ b/aligned/compiler/model.py @@ -67,9 +67,9 @@ class ModelMetadata: contacts: list[str] | None = field(default=None) tags: dict[str, str] | None = field(default=None) description: str | None = field(default=None) - predictions_source: BatchDataSource | None = field(default=None) - predictions_stream: StreamDataSource | None = field(default=None) - historical_source: BatchDataSource | None = field(default=None) + prediction_source: BatchDataSource | None = field(default=None) + prediction_stream: StreamDataSource | None = field(default=None) + application_source: BatchDataSource | None = field(default=None) dataset_folder: Folder | None = field(default=None) @@ -94,9 +94,9 @@ def model_contract( contacts: list[str] | None = None, tags: dict[str, str] | None = None, description: str | None = None, - predictions_source: BatchDataSource | None = None, - predictions_stream: StreamDataSource | None = None, - historical_source: BatchDataSource | None = None, + prediction_source: BatchDataSource | None = None, + prediction_stream: StreamDataSource | None = None, + application_source: BatchDataSource | None = None, dataset_folder: Folder | None = None, ) -> Callable[[Type[T]], ModelContractWrapper[T]]: def decorator(cls: Type[T]) -> ModelContractWrapper[T]: @@ -106,9 +106,9 @@ def decorator(cls: Type[T]) -> ModelContractWrapper[T]: contacts=contacts, tags=tags, description=description, - predictions_source=predictions_source, - predictions_stream=predictions_stream, - historical_source=historical_source, + prediction_source=prediction_source, + prediction_stream=prediction_stream, + application_source=application_source, dataset_folder=dataset_folder, ) return ModelContractWrapper(metadata, cls) @@ -126,7 +126,7 @@ def metadata_with( tags: dict[str, str] | None = None, predictions_source: BatchDataSource | None = None, predictions_stream: StreamDataSource | None = None, - historical_source: BatchDataSource | None = None, + application_source: BatchDataSource | None = None, dataset_folder: Folder | None = None, ) -> ModelMetadata: return ModelMetadata( @@ -137,7 +137,7 @@ def metadata_with( description, predictions_source, predictions_stream, - historical_source=historical_source, + application_source=application_source, dataset_folder=dataset_folder, ) @@ -174,9 +174,9 @@ class MyModel(ModelContract): features=set(), derived_features=set(), model_version_column=None, - source=metadata.predictions_source, - historical_source=metadata.historical_source, - stream_source=metadata.predictions_stream, + source=metadata.prediction_source, + application_source=metadata.application_source, + stream_source=metadata.prediction_stream, classification_targets=set(), regression_targets=set(), ) diff --git a/aligned/compiler/tests/features.py b/aligned/compiler/tests/features.py index 14705e6..1fdba38 100644 --- a/aligned/compiler/tests/features.py +++ b/aligned/compiler/tests/features.py @@ -3,7 +3,7 @@ source = PostgreSQLConfig.localhost('test') -@feature_view(name='test', description='test', batch_source=source.table('test')) +@feature_view(name='test', description='test', source=source.table('test')) class Test: id = UUID().as_entity() diff --git a/aligned/compiler/tests/test_repo_reader.py b/aligned/compiler/tests/test_repo_reader.py index cc2c76b..ac485a7 100644 --- a/aligned/compiler/tests/test_repo_reader.py +++ b/aligned/compiler/tests/test_repo_reader.py @@ -14,7 +14,7 @@ async def test_repo_reader() -> None: view = list(definitions.feature_views)[0] assert view.name == 'test' - assert view.batch_data_source.type_name == 'psql' + assert view.source.type_name == 'psql' assert len(view.derived_features) == 1 assert len(view.features) == 2 assert len(view.entities) == 1 diff --git a/aligned/data_source/batch_data_source.py b/aligned/data_source/batch_data_source.py index 828dcb6..e3f45ca 100644 --- a/aligned/data_source/batch_data_source.py +++ b/aligned/data_source/batch_data_source.py @@ -75,8 +75,17 @@ class BatchDataSource(ABC, Codable, SerializableType): @abstractmethod def job_group_key(self) -> str: + """ + A key defining which sources can be grouped together in one request. + """ pass + def source_id(self) -> str: + """ + An id that identifies a source from others. + """ + return self.job_group_key() + def _serialize(self) -> dict: assert ( self.type_name in BatchDataSourceFactory.shared().supported_data_sources diff --git a/aligned/feature_store.py b/aligned/feature_store.py index bf673ed..b81475c 100644 --- a/aligned/feature_store.py +++ b/aligned/feature_store.py @@ -182,7 +182,9 @@ def from_definition(repo: RepoDefinition) -> FeatureStore: FeatureStore.register_enrichers(repo.enrichers) sources = { - FeatureLocation.feature_view(view.name).identifier: view.batch_data_source + FeatureLocation.feature_view(view.name).identifier: view.materialized_source + if view.materialized_source + else view.source for view in repo.feature_views } | { FeatureLocation.model(model.name).identifier: model.predictions_view.source @@ -468,9 +470,7 @@ class MyFeatureView: """ self.feature_views[view.name] = view if isinstance(self.feature_source, BatchFeatureSource): - self.feature_source.sources[ - FeatureLocation.feature_view(view.name).identifier - ] = view.batch_data_source + self.feature_source.sources[FeatureLocation.feature_view(view.name).identifier] = view.source def add_feature_view(self, feature_view: FeatureView) -> None: self.add_compiled_view(feature_view.compile_instance()) @@ -519,7 +519,7 @@ def with_source(self, source: FeatureSourceable = None) -> FeatureStore: feature_source = source.feature_source() elif source is None: sources = { - FeatureLocation.feature_view(view.name).identifier: view.batch_data_source + FeatureLocation.feature_view(view.name).identifier: view.source for view in set(self.feature_views.values()) } | { FeatureLocation.model(model.name).identifier: model.predictions_view.source @@ -560,8 +560,7 @@ def use_application_sources(self) -> FeatureStore: FeatureStore: A new feature store that loads features from the application source """ sources = { - FeatureLocation.feature_view(view.name).identifier: view.application_source - or view.batch_data_source + FeatureLocation.feature_view(view.name).identifier: view.application_source or view.source for view in set(self.feature_views.values()) } | { FeatureLocation.model(model.name).identifier: model.predictions_view.source @@ -601,10 +600,8 @@ def views_with_config(self, config: Any) -> list[SourceRequest]: views: list[SourceRequest] = [] for view in self.feature_views.values(): request = view.request_all.needed_requests[0] - if view.batch_data_source.contains_config(config): - views.append( - SourceRequest(FeatureLocation.feature_view(view.name), view.batch_data_source, request) - ) + if view.source.contains_config(config): + views.append(SourceRequest(FeatureLocation.feature_view(view.name), view.source, request)) if view.application_source and view.application_source.contains_config(config): views.append( diff --git a/aligned/feature_view/feature_view.py b/aligned/feature_view/feature_view.py index df6527b..4adfc78 100644 --- a/aligned/feature_view/feature_view.py +++ b/aligned/feature_view/feature_view.py @@ -38,11 +38,11 @@ @dataclass class FeatureViewMetadata: name: str - batch_source: BatchDataSource + source: BatchDataSource description: str | None = field(default=None) stream_source: StreamDataSource | None = field(default=None) application_source: BatchDataSource | None = field(default=None) - staging_source: BatchDataSource | None = field(default=None) + materialized_source: BatchDataSource | None = field(default=None) contacts: list[str] | None = field(default=None) tags: dict[str, str] = field(default_factory=dict) @@ -52,31 +52,31 @@ def from_compiled(view: CompiledFeatureView) -> FeatureViewMetadata: name=view.name, description=view.description, tags=view.tags, - batch_source=view.batch_data_source, + source=view.source, stream_source=view.stream_data_source, application_source=view.application_source, - staging_source=view.staging_source, + materialized_source=view.materialized_source, ) def feature_view( name: str, - batch_source: BatchDataSource, + source: BatchDataSource, description: str | None = None, stream_source: StreamDataSource | None = None, application_source: BatchDataSource | None = None, - staging_source: BatchDataSource | None = None, + materialized_source: BatchDataSource | None = None, contacts: list[str] | None = None, tags: dict[str, str] | None = None, ) -> Callable[[Type[T]], FeatureViewWrapper[T]]: def decorator(cls: Type[T]) -> FeatureViewWrapper[T]: metadata = FeatureViewMetadata( name, - batch_source, + source, description=description, stream_source=stream_source, application_source=application_source, - staging_source=staging_source, + materialized_source=materialized_source, contacts=contacts, tags=tags or {}, ) @@ -95,7 +95,7 @@ def __call__(self) -> T: # Needs to compiile the model to set the location for the view features _ = self.compile() view = self.view() - setattr(view, "__view_wrapper__", self) + setattr(view, '__view_wrapper__', self) return view def compile(self) -> CompiledFeatureView: @@ -103,7 +103,7 @@ def compile(self) -> CompiledFeatureView: return FeatureView.compile_with_metadata(self.view(), self.metadata) def with_filter( - self, named: str, where: Callable[[T], Bool], stored_at: BatchDataSource | None = None + self, named: str, where: Callable[[T], Bool], materialize_source: BatchDataSource | None = None ) -> FeatureViewWrapper[T]: from aligned.data_source.batch_data_source import FilteredDataSource @@ -113,14 +113,16 @@ def with_filter( condition = where(self.__call__()) + main_source = meta.materialized_source if meta.materialized_source else meta.source + if condition.transformation: - meta.batch_source = FilteredDataSource(self.metadata.batch_source, condition.compile()) + meta.source = FilteredDataSource(main_source, condition.compile()) else: - meta.batch_source = FilteredDataSource(self.metadata.batch_source, condition.feature()) + meta.source = FilteredDataSource(main_source, condition.feature()) - if stored_at: - meta.staging_source = meta.batch_source - meta.batch_source = stored_at + if materialize_source: + meta.materialized_source = materialize_source + meta.source = main_source return FeatureViewWrapper(metadata=meta, view=self.view) @@ -139,8 +141,8 @@ def with_joined(self, view: Any, join_on: str, method: str = 'inner') -> BatchDa request = compiled_view.request_all return JoinDataSource( - source=self.metadata.batch_source, - right_source=compiled_view.batch_data_source, + source=self.metadata.source, + right_source=compiled_view.source, right_request=request.needed_requests[0], left_on=join_on, right_on=join_on, @@ -157,8 +159,8 @@ def with_entity_renaming(self, named: str, renames: dict[str, str] | str) -> Fea meta.name = named all_data_sources = [ - meta.batch_source, - meta.staging_source, + meta.source, + meta.materialized_source, meta.application_source, meta.stream_source, ] @@ -279,7 +281,7 @@ class MyView: ``` """ compiled = self.compile() - return await FeatureView.freshness_in_source(compiled, compiled.batch_data_source) + return await FeatureView.freshness_in_source(compiled, compiled.source) class FeatureView(ABC): @@ -312,7 +314,7 @@ def metadata_with( description, stream_source or HttpStreamSource(name), application_source=application_source, - staging_source=staging_source, + materialized_source=staging_source, contacts=contacts, tags=tags or {}, ) @@ -330,7 +332,7 @@ async def batch_source_freshness(cls) -> datetime | None: Returns the freshest datetime for the batch data source """ compiled = cls().compile_instance() - return await FeatureView.freshness_in_source(compiled, compiled.batch_data_source) + return await FeatureView.freshness_in_source(compiled, compiled.source) @staticmethod async def freshness_in_source(view: CompiledFeatureView, source: BatchDataSource) -> datetime | None: @@ -354,7 +356,7 @@ def compile_with_metadata(feature_view: Any, metadata: FeatureViewMetadata) -> C name=metadata.name, description=metadata.description, tags=metadata.tags, - batch_data_source=metadata.batch_source, + source=metadata.source, entities=set(), features=set(), derived_features=set(), @@ -362,7 +364,7 @@ def compile_with_metadata(feature_view: Any, metadata: FeatureViewMetadata) -> C event_timestamp=None, stream_data_source=metadata.stream_source, application_source=metadata.application_source, - staging_source=metadata.staging_source, + materialized_source=metadata.materialized_source, indexes=[], ) aggregations: list[FeatureFactory] = [] diff --git a/aligned/feature_view/tests/test_hidden_variable.py b/aligned/feature_view/tests/test_hidden_variable.py index 20ae96d..e4386df 100644 --- a/aligned/feature_view/tests/test_hidden_variable.py +++ b/aligned/feature_view/tests/test_hidden_variable.py @@ -1,13 +1,15 @@ import pytest from aligned import Bool, Entity, FeatureView, FeatureViewMetadata, Float, PostgreSQLConfig, String +from aligned.compiler.feature_factory import compile_hidden_features +from aligned.schemas.feature import FeatureLocation source = PostgreSQLConfig.localhost('test') class TestView(FeatureView): - metadata = FeatureViewMetadata(name='test', description='test', tags={}, batch_source=source) + metadata = FeatureViewMetadata(name='test', description='test', tags={}, source=source.table('test')) test_id = Entity(String()) @@ -44,3 +46,22 @@ async def test_select_variables() -> None: assert len(request.needed_requests) == 1 needed_req = request.needed_requests[0] assert len(needed_req.derived_features) == 2 + + +def test_hidden_variable_condition() -> None: + class Test: + x, y = Bool(), Bool() + z = (x & y) | x + + test = Test() + + features, derived_features = compile_hidden_features( + test.z | test.y, + FeatureLocation.feature_view('view'), + hidden_features=0, + var_name='test', + entities=set(), + ) + + assert len(features) == 2 + assert len(derived_features) == 3 diff --git a/aligned/local/job.py b/aligned/local/job.py index 11f32f3..5df1d3e 100644 --- a/aligned/local/job.py +++ b/aligned/local/job.py @@ -1,14 +1,18 @@ from dataclasses import dataclass, field -from datetime import datetime + +from typing import TYPE_CHECKING import pandas as pd import polars as pl from aligned.request.retrival_request import AggregatedFeature, AggregateOver, RetrivalRequest -from aligned.retrival_job import DateRangeJob, FactualRetrivalJob, FullExtractJob, RequestResult, RetrivalJob +from aligned.retrival_job import RequestResult, RetrivalJob from aligned.schemas.feature import Feature from aligned.sources.local import DataFileReference +if TYPE_CHECKING: + from datetime import datetime + class LiteralRetrivalJob(RetrivalJob): @@ -38,7 +42,7 @@ async def to_polars(self) -> pl.LazyFrame: @dataclass -class FileFullJob(FullExtractJob): +class FileFullJob(RetrivalJob): source: DataFileReference request: RetrivalRequest @@ -58,7 +62,7 @@ def file_transformations(self, df: pd.DataFrame) -> pd.DataFrame: if isinstance(self.source, ColumnFeatureMappable): request_features = self.source.feature_identifier_for(all_names) - columns = {org_name: wanted_name for org_name, wanted_name in zip(request_features, all_names)} + columns = dict(zip(request_features, all_names)) df = df.rename( columns=columns, ) @@ -99,7 +103,7 @@ async def to_polars(self) -> pl.LazyFrame: @dataclass -class FileDateJob(DateRangeJob): +class FileDateJob(RetrivalJob): source: DataFileReference request: RetrivalRequest @@ -121,10 +125,13 @@ def file_transformations(self, df: pd.DataFrame) -> pd.DataFrame: request_features = self.source.feature_identifier_for(all_names) df.rename( - columns={org_name: wanted_name for org_name, wanted_name in zip(request_features, all_names)}, + columns=dict(zip(request_features, all_names)), inplace=True, ) + if self.request.event_timestamp is None: + raise ValueError(f'Source {self.source} have no event timestamp to filter on') + event_timestamp_column = self.request.event_timestamp.name # Making sure it is in the correct format df[event_timestamp_column] = pd.to_datetime( @@ -141,13 +148,14 @@ def file_transform_polars(self, df: pl.LazyFrame) -> pl.LazyFrame: entity_names = self.request.entity_names all_names = list(self.request.all_required_feature_names.union(entity_names)) + if self.request.event_timestamp is None: + raise ValueError(f'Source {self.source} have no event timestamp to filter on') + request_features = all_names if isinstance(self.source, ColumnFeatureMappable): request_features = self.source.feature_identifier_for(all_names) - df = df.rename( - mapping={org_name: wanted_name for org_name, wanted_name in zip(request_features, all_names)} - ) + df = df.rename(mapping=dict(zip(request_features, all_names))) event_timestamp_column = self.request.event_timestamp.name return df.filter(pl.col(event_timestamp_column).is_between(self.start_date, self.end_date)) @@ -162,7 +170,7 @@ async def to_polars(self) -> pl.LazyFrame: @dataclass -class FileFactualJob(FactualRetrivalJob): +class FileFactualJob(RetrivalJob): source: DataFileReference requests: list[RetrivalRequest] diff --git a/aligned/psql/jobs.py b/aligned/psql/jobs.py index 79bb4ed..f2c0d2f 100644 --- a/aligned/psql/jobs.py +++ b/aligned/psql/jobs.py @@ -8,9 +8,9 @@ import polars as pl from aligned.request.retrival_request import RequestResult, RetrivalRequest -from aligned.retrival_job import FactualRetrivalJob, RetrivalJob -from aligned.schemas.derivied_feature import AggregatedFeature, AggregateOver -from aligned.schemas.feature import FeatureLocation, FeatureType +from aligned.retrival_job import RetrivalJob +from aligned.schemas.derivied_feature import AggregatedFeature, AggregateOver, DerivedFeature +from aligned.schemas.feature import FeatureLocation, FeatureType, Feature from aligned.schemas.transformation import PsqlTransformation from aligned.sources.psql import PostgreSQLConfig, PostgreSQLDataSource @@ -141,6 +141,23 @@ async def to_polars(self) -> pl.LazyFrame: def describe(self) -> str: return f'PostgreSQL Job: \n{self.query}\n' + def filter(self, condition: str | Feature | DerivedFeature) -> RetrivalJob: + + query = f'SELECT * FROM ({self.query}) as values WHERE ' + + if isinstance(condition, str): + query += condition + elif isinstance(condition, DerivedFeature) and isinstance( + condition.transformation, PsqlTransformation + ): + query += condition.transformation.as_psql() + elif isinstance(condition, Feature): + query += condition.name + else: + raise ValueError(f'Unable to filter on psql job with {condition}') + + return PostgreSqlJob(self.config, query, self.requests) + def build_full_select_query_psql( source: PostgreSQLDataSource, request: RetrivalRequest, limit: int | None @@ -206,7 +223,7 @@ def to_sql(self) -> str: @dataclass -class FactPsqlJob(FactualRetrivalJob): +class FactPsqlJob(RetrivalJob): """Fetches features for defined facts within a postgres DB It is supported to fetch from different tables, in one request diff --git a/aligned/redis/job.py b/aligned/redis/job.py index b95d533..f60a7e8 100644 --- a/aligned/redis/job.py +++ b/aligned/redis/job.py @@ -4,13 +4,13 @@ import polars as pl from aligned.request.retrival_request import RetrivalRequest -from aligned.retrival_job import FactualRetrivalJob, RequestResult, RetrivalJob +from aligned.retrival_job import RequestResult, RetrivalJob from aligned.schemas.feature import FeatureType from aligned.sources.redis import RedisConfig @dataclass -class FactualRedisJob(FactualRetrivalJob): +class FactualRedisJob(RetrivalJob): config: RedisConfig requests: list[RetrivalRequest] diff --git a/aligned/redshift/jobs.py b/aligned/redshift/jobs.py index 6437990..e0d9398 100644 --- a/aligned/redshift/jobs.py +++ b/aligned/redshift/jobs.py @@ -9,7 +9,7 @@ from aligned.psql.jobs import PostgreSqlJob from aligned.redshift.sql_job import SqlColumn, TableFetch from aligned.request.retrival_request import RequestResult, RetrivalRequest -from aligned.retrival_job import FactualRetrivalJob, RetrivalJob +from aligned.retrival_job import RetrivalJob from aligned.schemas.derivied_feature import AggregatedFeature, AggregateOver, DerivedFeature from aligned.schemas.feature import FeatureLocation, FeatureType from aligned.schemas.transformation import RedshiftTransformation @@ -32,7 +32,7 @@ def to_sql(self) -> str: @dataclass -class FactRedshiftJob(FactualRetrivalJob): +class FactRedshiftJob(RetrivalJob): """Fetches features for defined facts within a postgres DB It is supported to fetch from different tables, in one request diff --git a/aligned/request/retrival_request.py b/aligned/request/retrival_request.py index 2cc4a35..e62d98c 100644 --- a/aligned/request/retrival_request.py +++ b/aligned/request/retrival_request.py @@ -254,6 +254,14 @@ class RequestResult(Codable): features: set[Feature] event_timestamp: str | None + @property + def all_returned_columns(self) -> list[str]: + columns = [entity.name for entity in self.entities] + columns.extend([feat.name for feat in self.features]) + if self.event_timestamp: + columns.append(self.event_timestamp) + return columns + @property def feature_columns(self) -> list[str]: return sorted(feature.name for feature in self.features) diff --git a/aligned/retrival_job.py b/aligned/retrival_job.py index e9af0ae..c6c3428 100644 --- a/aligned/retrival_job.py +++ b/aligned/retrival_job.py @@ -1058,26 +1058,6 @@ def remove_derived_features(self) -> RetrivalJob: return self.job.remove_derived_features() -class FullExtractJob(RetrivalJob): - limit: int | None - - -class DateRangeJob(RetrivalJob): - start_date: datetime - end_date: datetime - - """ - ``` - psql_config = PsqlConfig(...) - entites = psql_config.fetch("SELECT * FROM entities WHERE ...") - ``` - """ - - -class FactualRetrivalJob(RetrivalJob): - facts: RetrivalJob - - @dataclass class WithRequests(RetrivalJob, ModificationJob): diff --git a/aligned/schemas/feature_view.py b/aligned/schemas/feature_view.py index 5fe4972..d1aa396 100644 --- a/aligned/schemas/feature_view.py +++ b/aligned/schemas/feature_view.py @@ -16,7 +16,7 @@ class CompiledFeatureView(Codable): name: str tags: dict[str, str] - batch_data_source: BatchDataSource + source: BatchDataSource entities: set[Feature] features: set[Feature] @@ -27,7 +27,7 @@ class CompiledFeatureView(Codable): event_timestamp: EventTimestamp | None = field(default=None) stream_data_source: StreamDataSource | None = field(default=None) application_source: BatchDataSource | None = field(default=None) - staging_source: BatchDataSource | None = field(default=None) + materialized_source: BatchDataSource | None = field(default=None) event_triggers: set[EventTrigger] | None = field(default=None) @@ -38,7 +38,7 @@ def __pre_serialize__(self) -> CompiledFeatureView: assert isinstance(self.name, str) assert isinstance(self.description, str) assert isinstance(self.tags, dict) - assert isinstance(self.batch_data_source, BatchDataSource) + assert isinstance(self.source, BatchDataSource) for entity in self.entities: assert isinstance(entity, Feature) for feature in self.features: diff --git a/aligned/schemas/model.py b/aligned/schemas/model.py index f10aa46..ac10ca7 100644 --- a/aligned/schemas/model.py +++ b/aligned/schemas/model.py @@ -38,7 +38,7 @@ class PredictionsView(Codable): model_version_column: Feature | None = field(default=None) event_timestamp: EventTimestamp | None = field(default=None) source: BatchDataSource | None = field(default=None) - historical_source: BatchDataSource | None = field(default=None) + application_source: BatchDataSource | None = field(default=None) stream_source: StreamDataSource | None = field(default=None) regression_targets: set[RegressionTarget] | None = field(default=None) diff --git a/aligned/sources/local.py b/aligned/sources/local.py index b031e9c..7bdddf0 100644 --- a/aligned/sources/local.py +++ b/aligned/sources/local.py @@ -16,7 +16,7 @@ from aligned.exceptions import UnableToFindFileException from aligned.local.job import FileDateJob, FileFactualJob, FileFullJob from aligned.request.retrival_request import RetrivalRequest -from aligned.retrival_job import DateRangeJob, FactualRetrivalJob, FullExtractJob, RetrivalJob +from aligned.retrival_job import RetrivalJob from aligned.s3.storage import FileStorage, HttpStorage from aligned.schemas.codable import Codable from aligned.schemas.feature import EventTimestamp, FeatureType @@ -67,7 +67,7 @@ async def data_file_freshness(reference: DataFileReference, column_name: str) -> return None -def create_parent_dir(path: str): +def create_parent_dir(path: str) -> None: Path(path).parent.mkdir(exist_ok=True) @@ -161,18 +161,18 @@ def mean( def enricher(self) -> CsvFileEnricher: return CsvFileEnricher(file=self.path) - def all_data(self, request: RetrivalRequest, limit: int | None) -> FullExtractJob: + def all_data(self, request: RetrivalRequest, limit: int | None) -> RetrivalJob: return FileFullJob(self, request, limit) def all_between_dates( self, request: RetrivalRequest, start_date: datetime, end_date: datetime - ) -> DateRangeJob: + ) -> RetrivalJob: return FileDateJob(source=self, request=request, start_date=start_date, end_date=end_date) @classmethod def multi_source_features_for( cls, facts: RetrivalJob, requests: list[tuple[CsvFileSource, RetrivalRequest]] - ) -> FactualRetrivalJob: + ) -> RetrivalJob: sources = {source for source, _ in requests} if len(sources) != 1: raise ValueError(f'Only able to load one {requests} at a time') @@ -264,18 +264,18 @@ async def write_polars(self, df: pl.LazyFrame) -> None: create_parent_dir(self.path) df.collect().write_parquet(self.path, compression=self.config.compression) - def all_data(self, request: RetrivalRequest, limit: int | None) -> FullExtractJob: + def all_data(self, request: RetrivalRequest, limit: int | None) -> RetrivalJob: return FileFullJob(self, request, limit) def all_between_dates( self, request: RetrivalRequest, start_date: datetime, end_date: datetime - ) -> DateRangeJob: + ) -> RetrivalJob: return FileDateJob(source=self, request=request, start_date=start_date, end_date=end_date) @classmethod def multi_source_features_for( cls, facts: RetrivalJob, requests: list[tuple[ParquetFileSource, RetrivalRequest]] - ) -> FactualRetrivalJob: + ) -> RetrivalJob: source = requests[0][0] if not isinstance(source, cls): diff --git a/aligned/sources/psql.py b/aligned/sources/psql.py index 866a979..42a7735 100644 --- a/aligned/sources/psql.py +++ b/aligned/sources/psql.py @@ -6,7 +6,7 @@ from aligned.data_source.batch_data_source import BatchDataSource, ColumnFeatureMappable from aligned.feature_source import WritableFeatureSource from aligned.request.retrival_request import RetrivalRequest -from aligned.retrival_job import FactualRetrivalJob, RetrivalJob +from aligned.retrival_job import RetrivalJob from aligned.schemas.codable import Codable from datetime import datetime @@ -70,6 +70,9 @@ class PostgreSQLDataSource(BatchDataSource, ColumnFeatureMappable, WritableFeatu type_name = 'psql' + def source_id(self) -> str: + return f'{self.config.env_var}/{self.table}' + def job_group_key(self) -> str: return self.config.env_var @@ -103,7 +106,7 @@ def all_between_dates( @classmethod def multi_source_features_for( cls, facts: RetrivalJob, requests: list[tuple[PostgreSQLDataSource, RetrivalRequest]] - ) -> FactualRetrivalJob: + ) -> RetrivalJob: # Group based on config from aligned.psql.jobs import FactPsqlJob diff --git a/aligned/sources/s3.py b/aligned/sources/s3.py index a847b85..6607491 100644 --- a/aligned/sources/s3.py +++ b/aligned/sources/s3.py @@ -9,7 +9,7 @@ from aligned.data_source.batch_data_source import BatchDataSource, ColumnFeatureMappable from aligned.exceptions import UnableToFindFileException from aligned.local.job import FileDateJob, FileFullJob -from aligned.retrival_job import DateRangeJob, FullExtractJob, RetrivalRequest +from aligned.retrival_job import RetrivalRequest, RetrivalJob from aligned.s3.storage import AwsS3Storage from aligned.schemas.codable import Codable from aligned.sources.local import CsvConfig, DataFileReference, ParquetConfig, StorageFileReference @@ -149,12 +149,12 @@ async def write_polars(self, df: pl.LazyFrame) -> None: buffer.seek(0) await self.storage.write(self.path, buffer.read()) - def all_data(self, request: RetrivalRequest, limit: int | None) -> FullExtractJob: + def all_data(self, request: RetrivalRequest, limit: int | None) -> RetrivalJob: return FileFullJob(self, request=request, limit=limit) def all_between_dates( self, request: RetrivalRequest, start_date: datetime, end_date: datetime - ) -> DateRangeJob: + ) -> RetrivalJob: return FileDateJob(self, request, start_date, end_date) diff --git a/aligned/tests/test_model_target.py b/aligned/tests/test_model_target.py index f378fbe..7f6ac23 100644 --- a/aligned/tests/test_model_target.py +++ b/aligned/tests/test_model_target.py @@ -108,7 +108,7 @@ async def test_model_insert_predictions() -> None: path = 'test_data/test_model.parquet' - @model_contract(name='test_model', features=[], predictions_source=FileSource.parquet_at(path)) + @model_contract(name='test_model', features=[], prediction_source=FileSource.parquet_at(path)) class TestModel: id = Int32().as_entity() @@ -137,7 +137,7 @@ async def test_model_upsert_predictions() -> None: path = 'test_data/test_model.parquet' - @model_contract(name='test_model', features=[], predictions_source=FileSource.parquet_at(path)) + @model_contract(name='test_model', features=[], prediction_source=FileSource.parquet_at(path)) class TestModel: id = Int32().as_entity() diff --git a/aligned/tests/test_source_validation.py b/aligned/tests/test_source_validation.py index 339ebfa..fab7f96 100644 --- a/aligned/tests/test_source_validation.py +++ b/aligned/tests/test_source_validation.py @@ -1,10 +1,8 @@ import pytest -from os import environ -from aligned import FeatureStore, FileSource, FeatureView +from aligned import FeatureStore, FileSource from aligned.schemas.feature import FeatureType, FeatureLocation from aligned.source_validation import validate_sources_in -from aligned.sources.psql import PostgreSQLConfig @pytest.mark.asyncio @@ -20,23 +18,23 @@ async def test_source_validation(titanic_feature_store: FeatureStore) -> None: assert {FeatureLocation.feature_view('titanic_parquet'): True} == validation -@pytest.mark.asyncio -async def test_source_validation_psql(titanic_feature_view: FeatureView) -> None: - - if 'PSQL_DATABASE_TEST' not in environ: - environ['PSQL_DATABASE_TEST'] = 'postgresql://postgres:postgres@localhost:5433/aligned-test' - - psql_config = PostgreSQLConfig('PSQL_DATABASE_TEST') - titanic_feature_view.metadata.batch_source = psql_config.table('titanic') - - store = FeatureStore.experimental() - store.add_feature_view(titanic_feature_view) - views = store.views_with_config(psql_config) - - assert len(views) == 1 - validation = await validate_sources_in(views) - - assert {FeatureLocation.feature_view('titanic'): False} == validation +# @pytest.mark.asyncio +# async def test_source_validation_psql(titanic_feature_view: FeatureView) -> None: +# +# if 'PSQL_DATABASE_TEST' not in environ: +# environ['PSQL_DATABASE_TEST'] = 'postgresql://postgres:postgres@localhost:5433/aligned-test' +# +# psql_config = PostgreSQLConfig('PSQL_DATABASE_TEST') +# titanic_feature_view.metadata.source = psql_config.table('titanic') +# +# store = FeatureStore.experimental() +# store.add_feature_view(titanic_feature_view) +# views = store.views_with_config(psql_config) +# +# assert len(views) == 1 +# validation = await validate_sources_in(views) +# +# assert {FeatureLocation.feature_view('titanic'): False} == validation @pytest.mark.asyncio diff --git a/conftest.py b/conftest.py index d6b0f6c..fd01497 100644 --- a/conftest.py +++ b/conftest.py @@ -160,7 +160,7 @@ class BreastDiagnoseFeatureView(FeatureView): name='breast_features', description='Features defining a scan and diagnose of potential cancer cells', tags={}, - batch_source=scan_without_datetime, + source=scan_without_datetime, ) scan_id = Entity(dtype=Int32()) @@ -232,7 +232,7 @@ class BreastDiagnoseFeatureView(FeatureView): name='breast_features', description='Features defining a scan and diagnose of potential cancer cells', tags={}, - batch_source=scan_with_datetime, + source=scan_with_datetime, ) scan_id = Entity(dtype=Int32()) @@ -293,7 +293,7 @@ class BreastDiagnoseFeatureView(FeatureView): name='breast_features', description='Features defining a scan and diagnose of potential cancer cells', tags={}, - batch_source=scan_with_datetime, + source=scan_with_datetime, ) scan_id = Entity(dtype=Int32()) @@ -405,7 +405,7 @@ def titanic_feature_view(titanic_source: CsvFileSource) -> FeatureView: class TitanicPassenger(FeatureView): metadata = FeatureViewMetadata( - name='titanic', description='Some features from the titanic dataset', batch_source=titanic_source + name='titanic', description='Some features from the titanic dataset', source=titanic_source ) passenger_id = Entity(dtype=Int32()) @@ -461,7 +461,7 @@ class TitanicPassenger(FeatureView): metadata = FeatureViewMetadata( name='titanic_parquet', description='Some features from the titanic dataset', - batch_source=titanic_source_parquet, + source=titanic_source_parquet, ) passenger_id = Entity(dtype=Int32()) @@ -503,7 +503,7 @@ def alot_of_transforations_feature_view(titanic_source: CsvFileSource) -> Featur class TitanicPassenger(FeatureView): metadata = FeatureViewMetadata( - name='titanic', description='Some features from the titanic dataset', batch_source=titanic_source + name='titanic', description='Some features from the titanic dataset', source=titanic_source ) passenger_id = Entity(dtype=Int32()) @@ -586,7 +586,7 @@ class TitanicPassenger(FeatureView): metadata = FeatureViewMetadata( name='titanic', description='Some features from the titanic dataset', - batch_source=titanic_source_scd, + source=titanic_source_scd, stream_source=redis.stream(topic='titanic_stream').with_coder(JsonRecordCoder('json')), ) diff --git a/pyproject.toml b/pyproject.toml index 468a396..385b04a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aligned" -version = "0.0.37" +version = "0.0.38" description = "A scalable feature store that makes it easy to align offline and online ML systems" authors = ["Mats E. Mollestad "] license = "Apache-2.0" diff --git a/test_data/feature-store.json b/test_data/feature-store.json index 8bbd12e..7e7d32e 100644 --- a/test_data/feature-store.json +++ b/test_data/feature-store.json @@ -1 +1 @@ -{"metadata": {"created_at": "2023-11-09T07:04:23.290410", "name": "feature_store_location.py", "github_url": null}, "feature_views": [{"name": "titanic", "tags": {}, "batch_data_source": {"mapping_keys": {"PassengerId": "passenger_id", "Age": "age", "Sex": "sex", "Survived": "survived", "SibSp": "sibsp", "UpdatedAt": "updated_at"}, "type_name": "csv", "path": "test_data/titanic_scd_data.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "requierd"}, {"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 100.0}]}, {"name": "name", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "sibsp", "dtype": {"name": "int32"}, "description": "Number of siblings on titanic", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 20.0}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}, {"name": "updated_at", "dtype": {"name": "datetime"}, "description": null, "tags": null, "constraints": null}, {"name": "cabin", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "survived", "dtype": {"name": "bool"}, "description": "If the passenger survived", "tags": null, "constraints": null}], "derived_features": [{"name": "is_male", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "male"}}, "depth": 1}, {"name": "is_female", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "female"}}, "depth": 1}, {"name": "is_mr", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "contains", "dtype": {"name": "bool"}, "key": "name", "value": "Mr."}, "depth": 1}, {"name": "square_sibsp", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}, {"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "mul", "dtype": {"name": "float"}, "front": "sibsp", "behind": "sibsp"}, "depth": 1}, {"name": "has_siblings", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "not-equals", "dtype": {"name": "bool"}, "key": "sibsp", "value": {"name": "int", "value": 0}}, "depth": 1}, {"name": "double_sibsp", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "mul_val", "dtype": {"name": "float"}, "key": "sibsp", "value": {"name": "int", "value": 2}}, "depth": 1}, {"name": "name_embedding", "dtype": {"name": "embedding"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "word_vectoriser", "dtype": {"name": "embedding"}, "key": "name", "model": {"name": "gensim", "model_name": "glove-wiki-gigaword-50", "config": {"to_lowercase": false, "deaccent": false, "encoding": "utf8", "errors": "strict"}, "loaded_model": null}}, "depth": 1}], "description": "Some features from the titanic dataset", "aggregated_features": [], "event_timestamp": {"name": "updated_at", "ttl": null, "description": null, "tags": null, "dtype": {"name": "datetime"}}, "stream_data_source": {"mapping_keys": {}, "name": "redis", "topic_name": "titanic_stream", "config": {"env_var": "REDIS_URL"}, "record_coder": {"coder_type": "json", "key": "json"}}, "application_source": null, "staging_source": null, "event_triggers": null, "contacts": null, "indexes": [{"location": {"name": "titanic", "location": "feature_view"}, "vector": {"name": "name_embedding", "dtype": {"name": "embedding"}, "description": null, "tags": null, "constraints": null}, "vector_dim": 50, "metadata": [{"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "requierd"}, {"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 100.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}], "storage": {"type_name": "redis", "config": {"env_var": "REDIS_URL"}, "name": "name_embedding_index", "initial_cap": 10000, "distance_metric": "COSINE", "index_alogrithm": "FLAT", "embedding_type": "FLOAT32"}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}]}]}, {"name": "titanic_parquet", "tags": {}, "batch_data_source": {"mapping_keys": {}, "type_name": "parquet", "path": "test_data/titanic.parquet", "config": {"engine": "auto", "compression": "snappy", "should_write_index": false}}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "requierd"}, {"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 100.0}]}, {"name": "name", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "sibsp", "dtype": {"name": "int32"}, "description": "Number of siblings on titanic", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 20.0}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}, {"name": "cabin", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "survived", "dtype": {"name": "bool"}, "description": "If the passenger survived", "tags": null, "constraints": null}], "derived_features": [{"name": "has_siblings", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "not-equals", "dtype": {"name": "bool"}, "key": "sibsp", "value": {"name": "int", "value": 0}}, "depth": 1}, {"name": "is_mr", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "contains", "dtype": {"name": "bool"}, "key": "name", "value": "Mr."}, "depth": 1}, {"name": "is_male", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "male"}}, "depth": 1}, {"name": "is_female", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "female"}}, "depth": 1}], "description": "Some features from the titanic dataset", "aggregated_features": [], "event_timestamp": null, "stream_data_source": null, "application_source": null, "staging_source": null, "event_triggers": null, "contacts": null, "indexes": []}], "combined_feature_views": [], "models": [{"name": "titanic", "features": [{"name": "age", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "float"}}, {"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}, {"name": "has_siblings", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}, {"name": "is_male", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}], "predictions_view": {"entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "probability", "dtype": {"name": "float"}, "description": "The probability of target named will_survive being 'True'.", "tags": null, "constraints": null}], "derived_features": [{"name": "will_survive", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "probability", "location": {"name": "titanic", "location": "model"}, "dtype": {"name": "float"}}], "transformation": {"name": "map_arg_max", "dtype": {"name": "bool"}, "column_mappings": {"probability": {"name": "bool", "value": true}}}, "depth": 1}], "model_version_column": null, "event_timestamp": null, "source": null, "historical_source": null, "stream_source": null, "regression_targets": [], "classification_targets": [{"estimating": {"name": "survived", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}, "feature": {"name": "will_survive", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null}, "on_ground_truth_event": null, "event_trigger": null, "class_probabilities": [{"outcome": {"name": "bool", "value": true}, "feature": {"name": "probability", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null}}], "confidence": null}]}, "description": "A model predicting if a passenger will survive", "contacts": null, "tags": null, "dataset_folder": null}], "enrichers": []} +{"metadata": {"created_at": "2023-11-09T21:08:19.701866", "name": "feature_store_location.py", "github_url": null}, "feature_views": [{"name": "titanic", "tags": {}, "source": {"mapping_keys": {"PassengerId": "passenger_id", "Age": "age", "Sex": "sex", "Survived": "survived", "SibSp": "sibsp", "UpdatedAt": "updated_at"}, "type_name": "csv", "path": "test_data/titanic_scd_data.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "survived", "dtype": {"name": "bool"}, "description": "If the passenger survived", "tags": null, "constraints": null}, {"name": "updated_at", "dtype": {"name": "datetime"}, "description": null, "tags": null, "constraints": null}, {"name": "name", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 100.0}, {"name": "requierd"}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "cabin", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "sibsp", "dtype": {"name": "int32"}, "description": "Number of siblings on titanic", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 20.0}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}], "derived_features": [{"name": "is_mr", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "contains", "dtype": {"name": "bool"}, "key": "name", "value": "Mr."}, "depth": 1}, {"name": "has_siblings", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "not-equals", "dtype": {"name": "bool"}, "key": "sibsp", "value": {"name": "int", "value": 0}}, "depth": 1}, {"name": "name_embedding", "dtype": {"name": "embedding"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "word_vectoriser", "dtype": {"name": "embedding"}, "key": "name", "model": {"name": "gensim", "model_name": "glove-wiki-gigaword-50", "config": {"to_lowercase": false, "deaccent": false, "encoding": "utf8", "errors": "strict"}, "loaded_model": null}}, "depth": 1}, {"name": "double_sibsp", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "mul_val", "dtype": {"name": "float"}, "key": "sibsp", "value": {"name": "int", "value": 2}}, "depth": 1}, {"name": "is_female", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "female"}}, "depth": 1}, {"name": "square_sibsp", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}, {"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "mul", "dtype": {"name": "float"}, "front": "sibsp", "behind": "sibsp"}, "depth": 1}, {"name": "is_male", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "male"}}, "depth": 1}], "description": "Some features from the titanic dataset", "aggregated_features": [], "event_timestamp": {"name": "updated_at", "ttl": null, "description": null, "tags": null, "dtype": {"name": "datetime"}}, "stream_data_source": {"mapping_keys": {}, "name": "redis", "topic_name": "titanic_stream", "config": {"env_var": "REDIS_URL"}, "record_coder": {"coder_type": "json", "key": "json"}}, "application_source": null, "materialized_source": null, "event_triggers": null, "contacts": null, "indexes": [{"location": {"name": "titanic", "location": "feature_view"}, "vector": {"name": "name_embedding", "dtype": {"name": "embedding"}, "description": null, "tags": null, "constraints": null}, "vector_dim": 50, "metadata": [{"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 100.0}, {"name": "requierd"}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}], "storage": {"type_name": "redis", "config": {"env_var": "REDIS_URL"}, "name": "name_embedding_index", "initial_cap": 10000, "distance_metric": "COSINE", "index_alogrithm": "FLAT", "embedding_type": "FLOAT32"}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}]}]}, {"name": "titanic_parquet", "tags": {}, "source": {"mapping_keys": {}, "type_name": "parquet", "path": "test_data/titanic.parquet", "config": {"engine": "auto", "compression": "snappy", "should_write_index": false}}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "survived", "dtype": {"name": "bool"}, "description": "If the passenger survived", "tags": null, "constraints": null}, {"name": "name", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 100.0}, {"name": "requierd"}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "cabin", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "sibsp", "dtype": {"name": "int32"}, "description": "Number of siblings on titanic", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 20.0}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}], "derived_features": [{"name": "is_mr", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "contains", "dtype": {"name": "bool"}, "key": "name", "value": "Mr."}, "depth": 1}, {"name": "is_male", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "male"}}, "depth": 1}, {"name": "is_female", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "female"}}, "depth": 1}, {"name": "has_siblings", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "not-equals", "dtype": {"name": "bool"}, "key": "sibsp", "value": {"name": "int", "value": 0}}, "depth": 1}], "description": "Some features from the titanic dataset", "aggregated_features": [], "event_timestamp": null, "stream_data_source": null, "application_source": null, "materialized_source": null, "event_triggers": null, "contacts": null, "indexes": []}], "combined_feature_views": [], "models": [{"name": "titanic", "features": [{"name": "is_male", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}, {"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}, {"name": "age", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "float"}}, {"name": "has_siblings", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}], "predictions_view": {"entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "probability", "dtype": {"name": "float"}, "description": "The probability of target named will_survive being 'True'.", "tags": null, "constraints": null}], "derived_features": [{"name": "will_survive", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "probability", "location": {"name": "titanic", "location": "model"}, "dtype": {"name": "float"}}], "transformation": {"name": "map_arg_max", "dtype": {"name": "bool"}, "column_mappings": {"probability": {"name": "bool", "value": true}}}, "depth": 1}], "model_version_column": null, "event_timestamp": null, "source": null, "application_source": null, "stream_source": null, "regression_targets": [], "classification_targets": [{"estimating": {"name": "survived", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}, "feature": {"name": "will_survive", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null}, "on_ground_truth_event": null, "event_trigger": null, "class_probabilities": [{"outcome": {"name": "bool", "value": true}, "feature": {"name": "probability", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null}}], "confidence": null}]}, "description": "A model predicting if a passenger will survive", "contacts": null, "tags": null, "dataset_folder": null}], "enrichers": []} diff --git a/test_data/test_model.parquet b/test_data/test_model.parquet index 7f1d063b0d6e325da223bbef8fac9e78d0c31339..ea8717b4f3cb078fd24f3e6d655e23f68790ff5d 100644 GIT binary patch delta 53 scmeys@_}W7oG=#yCnJLhC%Xd!3j+l3O*AwUW&{Z{BMUQatop_X0K{1a#Q*>R delta 53 ucmeys@_}W7oUjN3CnEzNC%Xd!7Xt*aOf)nTW(El}frJ@h!W*l;F#-U@?gqsG