diff --git a/aligned/compiler/repo_reader.py b/aligned/compiler/repo_reader.py index 9b6fdc2..ba4474e 100644 --- a/aligned/compiler/repo_reader.py +++ b/aligned/compiler/repo_reader.py @@ -5,9 +5,8 @@ from typing import Any from aligned.compiler.model import ModelContractWrapper -from aligned.enricher import Enricher from aligned.feature_view.feature_view import FeatureViewWrapper -from aligned.schemas.repo_definition import EnricherReference, RepoDefinition, RepoMetadata, RepoReference +from aligned.schemas.repo_definition import RepoDefinition, RepoMetadata, RepoReference from pathlib import Path @@ -109,11 +108,7 @@ async def definition_from_files(files: list[Path], root_path: Path) -> RepoDefin obj = getattr(module, attribute) - if isinstance(obj, Enricher): - repo.enrichers.append( - EnricherReference(module=module_path, attribute_name=attribute, enricher=obj) - ) - elif isinstance(obj, FeatureViewWrapper): + if isinstance(obj, FeatureViewWrapper): repo.feature_views.add(obj.compile()) elif isinstance(obj, ModelContractWrapper): repo.models.add(obj.compile()) diff --git a/aligned/enricher.py b/aligned/enricher.py deleted file mode 100644 index ef541bb..0000000 --- a/aligned/enricher.py +++ /dev/null @@ -1,227 +0,0 @@ -from __future__ import annotations - -import logging -from abc import ABC, abstractmethod -from dataclasses import dataclass, field -from datetime import datetime, timedelta -from pathlib import Path - -from aligned.lazy_imports import pandas as pd -from mashumaro.types import SerializableType - -from aligned.schemas.codable import Codable -from aligned.sources.redis import RedisConfig - -logger = logging.getLogger(__name__) - - -@dataclass -class TimespanSelector(Codable): - timespand: timedelta - time_column: str - - -class StatisticEricher: - def std( - self, columns: set[str], time: TimespanSelector | None = None, limit: int | None = None - ) -> Enricher: - raise NotImplementedError() - - def mean( - self, columns: set[str], time: TimespanSelector | None = None, limit: int | None = None - ) -> Enricher: - raise NotImplementedError() - - -class Enricher(ABC, Codable, SerializableType): - - name: str - - def _serialize(self) -> dict: - return self.to_dict() - - @classmethod - def _deserialize(cls, value: dict) -> Enricher: - name_type = value['name'] - del value['name'] - data_class = SupportedEnrichers.shared().types[name_type] - return data_class.from_dict(value) - - def lock(self, lock_name: str, redis_config: RedisConfig, timeout: int = 60) -> Enricher: - return RedisLockEnricher(lock_name=lock_name, enricher=self, config=redis_config, timeout=timeout) - - def cache(self, ttl: timedelta, cache_key: str) -> Enricher: - return FileCacheEnricher(ttl, cache_key, self) - - @abstractmethod - async def as_df(self) -> pd.DataFrame: - pass - - -class SupportedEnrichers: - - types: dict[str, type[Enricher]] - - _shared: SupportedEnrichers | None = None - - def __init__(self) -> None: - self.types = {} - - default_types: list[type[Enricher]] = [RedisLockEnricher, FileCacheEnricher, SqlDatabaseEnricher] - for enrich_type in default_types: - self.add(enrich_type) - - def add(self, enrich_type: type[Enricher]) -> None: - self.types[enrich_type.name] = enrich_type - - @classmethod - def shared(cls) -> SupportedEnrichers: - if cls._shared: - return cls._shared - cls._shared = SupportedEnrichers() - return cls._shared - - -@dataclass -class RedisLockEnricher(Enricher): - - enricher: Enricher - config: RedisConfig - lock_name: str - timeout: int - name: str = 'redis_lock' - - def __init__(self, lock_name: str, enricher: Enricher, config: RedisConfig, timeout: int): - self.lock_name = lock_name - self.config = config - self.enricher = enricher - self.timeout = timeout - - async def as_df(self) -> pd.DataFrame: - redis = self.config.redis() - async with redis.lock(self.lock_name, timeout=self.timeout) as _: - return await self.enricher.as_df() - - -@dataclass -class CsvFileSelectedEnricher(Enricher): - file: str - time: TimespanSelector | None = field(default=None) - limit: int | None = field(default=None) - name: str = 'selective_file' - - async def as_df(self) -> pd.DataFrame: - dates_to_parse = None - if self.time: - dates_to_parse = [self.time.time_column] - - uri = self.file - path = Path(self.file) - if 'http' not in path.parts[0]: - uri = str(path.absolute()) - - if self.limit: - file = pd.read_csv(uri, nrows=self.limit, parse_dates=dates_to_parse) - else: - file = pd.read_csv(uri, nrows=self.limit, parse_dates=dates_to_parse) - - if not self.time: - return file - - date = datetime.now() - self.time.timespand - selector = file[self.time.time_column] >= date - return file.loc[selector] - - -@dataclass -class CsvFileEnricher(Enricher): - - file: str - name: str = 'file' - - def selector( - self, time: TimespanSelector | None = None, limit: int | None = None - ) -> CsvFileSelectedEnricher: - return CsvFileSelectedEnricher(self.file, time=time, limit=limit) - - async def as_df(self) -> pd.DataFrame: - return pd.read_csv(self.file) - - -@dataclass -class LoadedStatEnricher(Enricher): - - stat: str - columns: list[str] - enricher: Enricher - mapping_keys: dict[str, str] = field(default_factory=dict) - - async def as_df(self) -> pd.DataFrame: - data = await self.enricher.as_df() - renamed = data.rename(columns=self.mapping_keys) - if self.stat == 'mean': - return renamed[self.columns].mean() - elif self.stat == 'std': - return renamed[self.columns].std() - else: - raise ValueError(f'Not supporting stat: {self.stat}') - - -@dataclass -class FileCacheEnricher(Enricher): - - ttl: timedelta - file_path: str - enricher: Enricher - name: str = 'file_cache' - - def is_out_of_date_cache(self) -> bool: - file_uri = Path(self.file_path).absolute() - try: - # Checks last modified metadata field - modified_at = datetime.fromtimestamp(file_uri.stat().st_mtime) - compare = datetime.now() - self.ttl - return modified_at < compare - except FileNotFoundError: - return True - - async def as_df(self) -> pd.DataFrame: - file_uri = Path(self.file_path).absolute() - - if self.is_out_of_date_cache(): - logger.info('Fetching from source') - data: pd.DataFrame = await self.enricher.as_df() - file_uri.parent.mkdir(exist_ok=True, parents=True) - logger.info(f'Storing cache at file {file_uri.as_uri()}') - data.to_parquet(file_uri) - else: - logger.info('Loading cache') - data = pd.read_parquet(file_uri) - return data - - -@dataclass -class SqlDatabaseEnricher(Enricher): - - query: str - values: dict | None - url_env: str - name: str = 'sql' - - def __init__(self, url_env: str, query: str, values: dict | None = None) -> None: - self.query = query - self.values = values - self.url_env = url_env - - async def as_df(self) -> pd.DataFrame: - import os - - import connectorx as cx - - df = cx.read_sql(os.environ[self.url_env], self.query, return_type='pandas') - - for name, dtype in df.dtypes.iteritems(): - if dtype == 'object': # Need to convert the databases UUID type - df[name] = df[name].astype('str') - - return df diff --git a/aligned/feature_store.py b/aligned/feature_store.py index 44135ad..f9b76d9 100644 --- a/aligned/feature_store.py +++ b/aligned/feature_store.py @@ -7,7 +7,6 @@ from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timedelta -from importlib import import_module from typing import Union, TypeVar, Callable from prometheus_client import Histogram @@ -19,7 +18,6 @@ ColumnFeatureMappable, BatchDataSource, ) -from aligned.enricher import Enricher from aligned.exceptions import UnableToFindFileException from aligned.feature_source import ( BatchFeatureSource, @@ -44,7 +42,7 @@ from aligned.schemas.folder import DatasetStore from aligned.schemas.model import EventTrigger from aligned.schemas.model import Model as ModelSchema -from aligned.schemas.repo_definition import EnricherReference, RepoDefinition, RepoMetadata +from aligned.schemas.repo_definition import RepoDefinition, RepoMetadata from aligned.sources.vector_index import VectorIndex logger = logging.getLogger(__name__) @@ -150,42 +148,6 @@ def empty() -> ContractStore: def experimental() -> ContractStore: return ContractStore.empty() - @staticmethod - def register_enrichers(enrichers: list[EnricherReference]) -> None: - from types import ModuleType - - class DynamicEnricher(ModuleType): - def __init__(self, values: dict[str, Enricher]) -> None: - for key, item in values.items(): - self.__setattr__(key, item) - - def set_module(path: str, module_class: DynamicEnricher) -> None: - import sys - - components = path.split('.') - cum_path = '' - - for component in components: - cum_path += f'.{component}' - if cum_path.startswith('.'): - cum_path = cum_path[1:] - - try: - sys.modules[cum_path] = import_module(cum_path) - except Exception: - logger.info(f'Setting enricher at {cum_path}') - sys.modules[cum_path] = module_class - - grouped_enrichers: dict[str, list[EnricherReference]] = defaultdict(list) - - for enricher in enrichers: - grouped_enrichers[enricher.module].append(enricher) - - for module, values in grouped_enrichers.items(): - set_module( - module, DynamicEnricher({enricher.attribute_name: enricher.enricher for enricher in values}) - ) - @staticmethod def from_definition(repo: RepoDefinition) -> ContractStore: """Creates a feature store based on a repo definition @@ -204,8 +166,6 @@ def from_definition(repo: RepoDefinition) -> ContractStore: Returns: FeatureStore: A ready to use feature store """ - ContractStore.register_enrichers(repo.enrichers) - store = ContractStore( feature_views={}, models={}, diff --git a/aligned/feature_view/feature_view.py b/aligned/feature_view/feature_view.py index 2b2e0c7..0456f41 100644 --- a/aligned/feature_view/feature_view.py +++ b/aligned/feature_view/feature_view.py @@ -874,7 +874,9 @@ def func_wrapper(*args, **kwargs) -> Any: # type: ignore from typing import _AnnotatedAlias # type: ignore params_to_check = { - name: value for name, value in func.__annotations__.items() if type(value) is _AnnotatedAlias + name: value + for name, value in func.__annotations__.items() + if type(value) == _AnnotatedAlias # noqa: E721 } function_args = func.__code__.co_varnames diff --git a/aligned/feature_view/tests/test_check_schema.py b/aligned/feature_view/tests/test_check_schema.py index 27db523..f2d7bfc 100644 --- a/aligned/feature_view/tests/test_check_schema.py +++ b/aligned/feature_view/tests/test_check_schema.py @@ -1,5 +1,3 @@ -from __future__ import annotations - import pytest from aligned import Bool, Float, String, feature_view, FileSource from aligned.feature_view.feature_view import check_schema diff --git a/aligned/schemas/repo_definition.py b/aligned/schemas/repo_definition.py index a37eaba..4670feb 100644 --- a/aligned/schemas/repo_definition.py +++ b/aligned/schemas/repo_definition.py @@ -7,7 +7,6 @@ from pathlib import Path from typing import TYPE_CHECKING -from aligned.enricher import Enricher from aligned.feature_source import FeatureSource, FeatureSourceFactory from aligned.schemas.codable import Codable from aligned.schemas.feature_view import CompiledFeatureView @@ -120,13 +119,6 @@ def from_reference( return FastAPIServer.app(feature_store.with_source(online_source)) -@dataclass -class EnricherReference(Codable): - module: str - attribute_name: str - enricher: Enricher - - @dataclass class RepoMetadata(Codable): created_at: datetime @@ -142,7 +134,6 @@ class RepoDefinition(Codable): feature_views: set[CompiledFeatureView] = field(default_factory=set) models: set[Model] = field(default_factory=set) - enrichers: list[EnricherReference] = field(default_factory=list) def to_dict(self, **kwargs: dict) -> dict: # type: ignore for view in self.feature_views: @@ -151,8 +142,6 @@ def to_dict(self, **kwargs: dict) -> dict: # type: ignore for model in self.models: assert isinstance(model, Model) - for enricher in self.enrichers: - assert isinstance(enricher, EnricherReference) return super().to_dict(**kwargs) @staticmethod diff --git a/aligned/sources/local.py b/aligned/sources/local.py index 660342f..e8eb84f 100644 --- a/aligned/sources/local.py +++ b/aligned/sources/local.py @@ -13,7 +13,6 @@ from aligned.data_file import DataFileReference, upsert_on_column from aligned.data_source.batch_data_source import CodableBatchDataSource, ColumnFeatureMappable -from aligned.enricher import CsvFileEnricher, Enricher, LoadedStatEnricher, TimespanSelector from aligned.exceptions import UnableToFindFileException from aligned.local.job import FileDateJob, FileFactualJob, FileFullJob from aligned.request.retrival_request import RetrivalRequest @@ -318,29 +317,6 @@ async def write_polars(self, df: pl.LazyFrame) -> None: create_parent_dir(self.path) await self.write_pandas(df.collect().to_pandas()) - def std( - self, columns: set[str], time: TimespanSelector | None = None, limit: int | None = None - ) -> Enricher: - return LoadedStatEnricher( - stat='std', - columns=list(columns), - enricher=self.enricher().selector(time, limit), - mapping_keys=self.mapping_keys, - ) - - def mean( - self, columns: set[str], time: TimespanSelector | None = None, limit: int | None = None - ) -> Enricher: - return LoadedStatEnricher( - stat='mean', - columns=list(columns), - enricher=self.enricher().selector(time, limit), - mapping_keys=self.mapping_keys, - ) - - def enricher(self) -> CsvFileEnricher: - return CsvFileEnricher(file=self.path) - def all_data(self, request: RetrivalRequest, limit: int | None) -> RetrivalJob: with_schema = CsvFileSource( path=self.path, @@ -1020,11 +996,15 @@ class LiteralReference(DataFileReference): file: pl.LazyFrame - def __init__(self, file: pl.LazyFrame | pd.DataFrame) -> None: - if isinstance(file, pd.DataFrame): + def __init__(self, file: pl.LazyFrame | pd.DataFrame | pl.DataFrame) -> None: + if isinstance(file, pl.DataFrame): + self.file = file.lazy() + elif isinstance(file, pl.LazyFrame): + self.file = file + elif isinstance(file, pd.DataFrame): self.file = pl.from_pandas(file).lazy() else: - self.file = file + raise ValueError(f"Unsupported type {type(file)}") def job_group_key(self) -> str: return str(uuid4()) diff --git a/aligned/sources/psql.py b/aligned/sources/psql.py index eda9abd..72f85be 100644 --- a/aligned/sources/psql.py +++ b/aligned/sources/psql.py @@ -13,7 +13,6 @@ from aligned.schemas.feature import FeatureType if TYPE_CHECKING: - from aligned.enricher import Enricher from aligned.schemas.feature import EventTimestamp @@ -45,11 +44,6 @@ def localhost(db: str, credentials: tuple[str, str] | None = None) -> PostgreSQL def table(self, table: str, mapping_keys: dict[str, str] | None = None) -> PostgreSQLDataSource: return PostgreSQLDataSource(config=self, table=table, mapping_keys=mapping_keys or {}) - def data_enricher(self, sql: str, values: dict | None = None) -> Enricher: - from aligned.enricher import SqlDatabaseEnricher - - return SqlDatabaseEnricher(self.env_var, sql, values) - def fetch(self, query: str) -> RetrivalJob: from aligned.psql.jobs import PostgreSqlJob diff --git a/aligned/sources/redshift.py b/aligned/sources/redshift.py index c79b098..e45f887 100644 --- a/aligned/sources/redshift.py +++ b/aligned/sources/redshift.py @@ -1,12 +1,10 @@ from __future__ import annotations from dataclasses import dataclass, field -from datetime import datetime, timedelta +from datetime import datetime from typing import Any, TYPE_CHECKING -from aligned import RedisConfig from aligned.data_source.batch_data_source import CodableBatchDataSource, ColumnFeatureMappable -from aligned.enricher import Enricher from aligned.request.retrival_request import RetrivalRequest from aligned.retrival_job import RetrivalJob from aligned.schemas.codable import Codable @@ -64,19 +62,6 @@ def table( config=self, table=table, mapping_keys=mapping_keys or {}, list_references=list_references or {} ) - def data_enricher( - self, name: str, sql: str, redis: RedisConfig, values: dict | None = None, lock_timeout: int = 60 - ) -> Enricher: - from aligned.enricher import FileCacheEnricher, RedisLockEnricher, SqlDatabaseEnricher - - return FileCacheEnricher( - timedelta(days=1), - file_path=f'./cache/{name}.parquet', - enricher=RedisLockEnricher( - name, SqlDatabaseEnricher(self.url, sql, values), redis, timeout=lock_timeout - ), - ) - def with_schema(self, name: str) -> RedshiftSQLConfig: return RedshiftSQLConfig(env_var=self.env_var, schema=name) diff --git a/aligned/tests/test_cache_enricher.py b/aligned/tests/test_cache_enricher.py deleted file mode 100644 index 0099c87..0000000 --- a/aligned/tests/test_cache_enricher.py +++ /dev/null @@ -1,22 +0,0 @@ -from datetime import timedelta - -import pytest - -from aligned import FileSource - - -@pytest.mark.asyncio -async def test_cache_enricher(mocker) -> None: # type: ignore - cache_time = timedelta(hours=1) - source = FileSource.csv_at(path='test_data/data-with-datetime.csv', mapping_keys={}).enricher() - enricher = source.cache(ttl=cache_time, cache_key='cache/cached_data') - - file = await enricher.as_df() - expected = await source.as_df() - assert file.equals(expected) - - pandas_mock = mocker.patch('pandas.read_parquet', return_value=file.iloc[0:3]) - new_file = await enricher.as_df() - - assert file.iloc[0:3].equals(new_file) - pandas_mock.assert_called_once() diff --git a/test_data/credit_history.csv b/test_data/credit_history.csv new file mode 100644 index 0000000..7b784fd --- /dev/null +++ b/test_data/credit_history.csv @@ -0,0 +1,7 @@ +bankruptcies,credit_card_due,dob_ssn,due_sum,event_timestamp,student_loan_due +0,8419,19530219_5179,30747,1587924064746575,22328 +0,2944,19520816_8737,5459,1587924064746575,2515 +0,833,19860413_2537,33833,1587924064746575,33000 +0,5936,19530219_5179,54891,1588010464746575,48955 +0,1575,19520816_8737,11076,1588010464746575,9501 +0,6263,19860413_2537,41773,1588010464746575,35510 diff --git a/test_data/credit_history_mater.parquet b/test_data/credit_history_mater.parquet index ee67e4f..63e120f 100644 Binary files a/test_data/credit_history_mater.parquet and b/test_data/credit_history_mater.parquet differ diff --git a/test_data/data/csv_iso.csv b/test_data/data/csv_iso.csv index f4de443..fe3a987 100644 --- a/test_data/data/csv_iso.csv +++ b/test_data/data/csv_iso.csv @@ -1,4 +1,4 @@ id,other,et,timestamp -1,foo,2024-09-11T16:38:14.734554UTC,2024-09-11T16:38:14.734562UTC -2,bar,2024-09-10T16:38:14.734560UTC,2024-09-12T16:38:14.734562UTC -3,baz,2024-09-09T16:38:14.734561UTC,2024-09-13T16:38:14.734563UTC +1,foo,2024-09-11T20:12:11.546844UTC,2024-09-11T20:12:11.546848UTC +2,bar,2024-09-10T20:12:11.546846UTC,2024-09-12T20:12:11.546849UTC +3,baz,2024-09-09T20:12:11.546848UTC,2024-09-13T20:12:11.546849UTC diff --git a/test_data/data/csv_unix.csv b/test_data/data/csv_unix.csv index 8d23fa3..7276a67 100644 --- a/test_data/data/csv_unix.csv +++ b/test_data/data/csv_unix.csv @@ -1,4 +1,4 @@ id,other,et,timestamp -1,foo,1726072694734554,1726072694734562 -2,bar,1725986294734560,1726159094734562 -3,baz,1725899894734561,1726245494734563 +1,foo,1726085531546844,1726085531546848 +2,bar,1725999131546846,1726171931546849 +3,baz,1725912731546848,1726258331546849 diff --git a/test_data/data/parquet_iso.parquet b/test_data/data/parquet_iso.parquet index ca94ede..7a4df20 100644 Binary files a/test_data/data/parquet_iso.parquet and b/test_data/data/parquet_iso.parquet differ diff --git a/test_data/data/parquet_unix.parquet b/test_data/data/parquet_unix.parquet index 5784f07..0e36692 100644 Binary files a/test_data/data/parquet_unix.parquet and b/test_data/data/parquet_unix.parquet differ diff --git a/test_data/test_model.csv b/test_data/test_model.csv index 285ec36..decf326 100644 --- a/test_data/test_model.csv +++ b/test_data/test_model.csv @@ -1,4 +1,7 @@ -some_id,a +a,some_id +10,1 +14,2 +20,3 1,1 2,2 3,3 diff --git a/test_data/test_model.parquet b/test_data/test_model.parquet index 2c0640f..5fb669a 100644 Binary files a/test_data/test_model.parquet and b/test_data/test_model.parquet differ