From facd0f219707d24c5b21ae16784e93ee16ab1806 Mon Sep 17 00:00:00 2001 From: "Mats E. Mollestad" Date: Tue, 1 Oct 2024 23:34:04 +0200 Subject: [PATCH] Pyright changes and minor bug fixes --- .gitignore | 1 + aligned/compiler/aggregation_factory.py | 2 +- aligned/compiler/feature_factory.py | 3 + aligned/compiler/model.py | 6 +- aligned/data_source/batch_data_source.py | 12 ++ aligned/feature_store.py | 3 +- .../tests/test_feature_request_generation.py | 10 +- aligned/retrival_job.py | 15 +- aligned/sources/azure_blob_storage.py | 5 +- aligned/sources/local.py | 7 +- aligned/sources/tests/test_parquet.py | 2 +- aligned/worker.py | 20 +- conftest.py | 187 ++++++++---------- pyproject.toml | 2 +- test_data/credit_history_mater.parquet | Bin 1356 -> 1359 bytes test_data/data/csv_iso.csv | 6 +- test_data/data/csv_unix.csv | 6 +- test_data/data/parquet_iso.parquet | Bin 1862 -> 1866 bytes test_data/data/parquet_unix.parquet | Bin 1479 -> 1479 bytes test_data/test_model.parquet | Bin 818 -> 818 bytes 20 files changed, 141 insertions(+), 146 deletions(-) diff --git a/.gitignore b/.gitignore index 61d8d83c..35737086 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ mlruns test_data/feature-store.json test_data/mlruns test_data/temp +test_data # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/aligned/compiler/aggregation_factory.py b/aligned/compiler/aggregation_factory.py index 0f8aa3ce..4ea9bc04 100644 --- a/aligned/compiler/aggregation_factory.py +++ b/aligned/compiler/aggregation_factory.py @@ -376,6 +376,6 @@ def compile(self) -> Transformation: else: return PolarsFunctionTransformation( code=code, - function_name=dill.source.getname(self.method), + function_name=dill.source.getname(self.method), # type: ignore dtype=self.dtype.dtype, ) diff --git a/aligned/compiler/feature_factory.py b/aligned/compiler/feature_factory.py index 99c22442..dc83aff6 100644 --- a/aligned/compiler/feature_factory.py +++ b/aligned/compiler/feature_factory.py @@ -1354,6 +1354,9 @@ class Timestamp(DateFeature, ArithmeticFeature): def __init__(self, time_zone: str | None = 'UTC') -> None: self.time_zone = time_zone + def defines_freshness(self) -> Timestamp: + return self.with_tag('freshness_timestamp') + @property def dtype(self) -> FeatureType: from zoneinfo import ZoneInfo diff --git a/aligned/compiler/model.py b/aligned/compiler/model.py index a5afca4f..73a9c95c 100644 --- a/aligned/compiler/model.py +++ b/aligned/compiler/model.py @@ -559,7 +559,11 @@ def sort_key(x: tuple[int, FeatureFactory]) -> int: from aligned.schemas.transformation import MapArgMax transformation = MapArgMax( - {probs._name: LiteralValue.from_value(probs.of_value) for probs in probabilities} + { + probs._name: LiteralValue.from_value(probs.of_value) + for probs in probabilities + if probs._name is not None + } ) arg_max_feature = DerivedFeature( diff --git a/aligned/data_source/batch_data_source.py b/aligned/data_source/batch_data_source.py index 91a0d951..51c101d0 100644 --- a/aligned/data_source/batch_data_source.py +++ b/aligned/data_source/batch_data_source.py @@ -1166,6 +1166,18 @@ def random_values_for(feature: Feature, size: int, seed: int | None = None) -> p else: values = np.random.random(size) + if max_value is None and dtype.name.startswith('uint'): + bits = dtype.name.lstrip('uint') + if bits.isdigit(): + max_value = 2 ** int(bits) + min_value = 0 + elif max_value is None and dtype.name.startswith('int'): + bits = dtype.name.lstrip('int') + if bits.isdigit(): + value_range = 2 ** int(bits) / 2 + max_value = value_range + min_value = -value_range + if max_value and min_value: values = values * (max_value - min_value) + min_value elif max_value is not None: diff --git a/aligned/feature_store.py b/aligned/feature_store.py index bf2a820f..45182a62 100644 --- a/aligned/feature_store.py +++ b/aligned/feature_store.py @@ -94,7 +94,8 @@ def feature_names(self) -> set[str]: def unpack_feature(feature: str) -> tuple[FeatureLocation, str]: splits = feature.split(':') if len(splits) == 3: - return (FeatureLocation(splits[1], splits[0]), splits[2]) + assert splits[0] + return (FeatureLocation(splits[1], splits[0]), splits[2]) # type: ignore if len(splits) == 2: return (FeatureLocation(splits[0], 'feature_view'), splits[1]) else: diff --git a/aligned/request/tests/test_feature_request_generation.py b/aligned/request/tests/test_feature_request_generation.py index 8b2b908d..3158c151 100644 --- a/aligned/request/tests/test_feature_request_generation.py +++ b/aligned/request/tests/test_feature_request_generation.py @@ -1,12 +1,12 @@ import pytest -from aligned.feature_view.feature_view import FeatureView +from aligned.feature_view.feature_view import FeatureViewWrapper @pytest.mark.asyncio -async def test_fetch_all_request(titanic_feature_view: FeatureView) -> None: +async def test_fetch_all_request(titanic_feature_view: FeatureViewWrapper) -> None: - compiled_view = type(titanic_feature_view).compile() + compiled_view = titanic_feature_view.compile() request = compiled_view.request_all expected_features = { @@ -35,9 +35,9 @@ async def test_fetch_all_request(titanic_feature_view: FeatureView) -> None: @pytest.mark.asyncio -async def test_fetch_features_request(titanic_feature_view: FeatureView) -> None: +async def test_fetch_features_request(titanic_feature_view: FeatureViewWrapper) -> None: - compiled_view = type(titanic_feature_view).compile() + compiled_view = titanic_feature_view.compile() wanted_features = {'cabin', 'is_male'} request = compiled_view.request_for(wanted_features) expected_features = {'sex', 'cabin', 'is_male'} diff --git a/aligned/retrival_job.py b/aligned/retrival_job.py index 059359f0..4eaeb473 100644 --- a/aligned/retrival_job.py +++ b/aligned/retrival_job.py @@ -1739,15 +1739,8 @@ async def compute_derived_features_pandas(self, df: pd.DataFrame) -> pd.DataFram logger.debug(f'Computing feature with pandas: {feature.name}') df[feature.name] = await feature.transformation.transform_pandas( - df[feature.depending_on_names] + df[feature.depending_on_names] # type: ignore ) - # if df[feature.name].dtype != feature.dtype.pandas_type: - # if feature.dtype.is_numeric: - # df[feature.name] = pd.to_numeric(df[feature.name], errors='coerce').astype( - # feature.dtype.pandas_type - # ) - # else: - # df[feature.name] = df[feature.name].astype(feature.dtype.pandas_type) return df async def to_pandas(self) -> pd.DataFrame: @@ -1989,7 +1982,7 @@ async def to_polars(self) -> AsyncIterator[pl.LazyFrame]: df = raw_files[start:end, :] chunked_job = ( - LiteralRetrivalJob(df.lazy(), RequestResult.from_request_list(needed_requests)) + LiteralRetrivalJob(df.lazy(), needed_requests) .derive_features(needed_requests) .select_columns(features_to_include_names) ) @@ -2312,7 +2305,7 @@ async def combine_data(self, df: pd.DataFrame) -> pd.DataFrame: continue logger.debug(f'Computing feature: {feature.name}') df[feature.name] = await feature.transformation.transform_pandas( - df[feature.depending_on_names] + df[feature.depending_on_names] # type: ignore ) return df @@ -2406,7 +2399,7 @@ async def to_pandas(self) -> pd.DataFrame: df = await self.job.to_pandas() if self.include_features: total_list = list({ent.name for ent in self.request_result.entities}.union(self.include_features)) - return df[total_list] + return df[total_list] # type: ignore else: return df diff --git a/aligned/sources/azure_blob_storage.py b/aligned/sources/azure_blob_storage.py index bb3a198e..46bb6269 100644 --- a/aligned/sources/azure_blob_storage.py +++ b/aligned/sources/azure_blob_storage.py @@ -571,8 +571,9 @@ def delete_directory_recursively(directory_path: str) -> None: fs.rmdir(directory_path) upsert_on = sorted(request.entity_names) + returend_columns = request.all_returned_columns - df = await job.select(request.all_returned_columns).to_polars() + df = await job.select(returend_columns).to_polars() unique_partitions = df.select(self.partition_keys).unique() filters: list[pl.Expr] = [] @@ -590,7 +591,7 @@ def delete_directory_recursively(directory_path: str) -> None: try: existing_df = (await self.to_lazy_polars()).filter(*filters) - write_df = upsert_on_column(upsert_on, df.lazy(), existing_df).collect() + write_df = upsert_on_column(upsert_on, df.lazy(), existing_df).select(returend_columns).collect() except (UnableToFindFileException, pl.ComputeError): write_df = df.lazy() diff --git a/aligned/sources/local.py b/aligned/sources/local.py index e8eb84f7..7c17b37d 100644 --- a/aligned/sources/local.py +++ b/aligned/sources/local.py @@ -230,7 +230,7 @@ async def to_lazy_polars(self) -> pl.LazyFrame: schema = { # type: ignore name: dtype.polars_type for name, dtype in self.expected_schema.items() - if not dtype.is_datetime + if not dtype.is_datetime and dtype.name != 'bool' } if self.mapping_keys: @@ -506,7 +506,8 @@ async def upsert(self, job: RetrivalJob, request: RetrivalRequest) -> None: upsert_on = sorted(request.entity_names) - df = await job.select(request.all_returned_columns).to_polars() + returned_columns = request.all_returned_columns + df = await job.select(returned_columns).to_polars() unique_partitions = df.select(self.partition_keys).unique() filters: list[pl.Expr] = [] @@ -524,7 +525,7 @@ async def upsert(self, job: RetrivalJob, request: RetrivalRequest) -> None: try: existing_df = (await self.to_lazy_polars()).filter(*filters) - write_df = upsert_on_column(upsert_on, df.lazy(), existing_df).collect() + write_df = upsert_on_column(upsert_on, df.lazy(), existing_df).select(returned_columns).collect() except (UnableToFindFileException, pl.ComputeError): write_df = df.lazy() diff --git a/aligned/sources/tests/test_parquet.py b/aligned/sources/tests/test_parquet.py index c52ed1eb..17079769 100644 --- a/aligned/sources/tests/test_parquet.py +++ b/aligned/sources/tests/test_parquet.py @@ -209,7 +209,7 @@ async def test_read_csv(point_in_time_data_test: DataTest) -> None: description=view.metadata.description, batch_source=file_source, ) - compiled = view.compile_instance() + compiled = view.compile() assert compiled.source.path == file_source.path store.add_compiled_view(compiled) diff --git a/aligned/worker.py b/aligned/worker.py index eb454604..122c14ad 100644 --- a/aligned/worker.py +++ b/aligned/worker.py @@ -187,7 +187,8 @@ async def start(self) -> None: processes = [] for topic_name, views in feature_views.items(): process_views = views - stream: StreamDataSource = views[0].view.stream_data_source + stream: StreamDataSource | None = views[0].view.stream_data_source + assert stream is not None stream_consumer = stream.consumer( self.read_timestamps.get(topic_name, self.default_start_timestamp) ) @@ -204,10 +205,12 @@ async def start(self) -> None: if not source: logger.debug(f'Skipping to setup active learning set for {model_name}') - - processes.append( - process_predictions(source.consumer(), store.model(model_name), active_learning_config) - ) + else: + processes.append( + process_predictions( + source.consumer(), store.model(model_name), active_learning_config + ) + ) if self.metric_logging_port: start_http_server(self.metric_logging_port) @@ -229,9 +232,6 @@ async def process_predictions( logger.debug('No active learning config found, will not listen to predictions') return - topic_name = model.model.predictions_view.stream_source.topic_name - logger.debug(f'Started listning to {topic_name}') - while True: records = await stream_source.read() @@ -240,7 +240,7 @@ async def process_predictions( start_time = timeit.default_timer() request = model.model.request_all_predictions.needed_requests[0] - job = RetrivalJob.from_dict(records, request).ensure_types([request]) + job = RetrivalJob.from_dict(records, request).ensure_types([request]) # type: ignore job = ActiveLearningJob( job, model.model, @@ -264,7 +264,7 @@ def stream_job(values: list[dict], feature_view: FeatureViewStore) -> RetrivalJo if isinstance(feature_view.view.stream_data_source, ColumnFeatureMappable): mappings = feature_view.view.stream_data_source.mapping_keys - value_job = RetrivalJob.from_dict(values, request) + value_job = RetrivalJob.from_dict(values, request) # type: ignore if mappings: value_job = value_job.rename(mappings) diff --git a/conftest.py b/conftest.py index 00a2091a..aaaacb35 100644 --- a/conftest.py +++ b/conftest.py @@ -17,8 +17,9 @@ String, Int8, EmbeddingModel, + feature_view, ) -from aligned.feature_view.feature_view import FeatureView, FeatureViewMetadata +from aligned.feature_view.feature_view import FeatureView, FeatureViewWrapper from aligned.compiler.model import model_contract, ModelContractWrapper from aligned.feature_store import ContractStore from aligned.retrival_job import DerivedFeatureJob, RetrivalJob, RetrivalRequest @@ -150,14 +151,13 @@ def scan_without_datetime() -> CsvFileSource: @pytest.fixture -def breast_scan_feature_viewout_with_datetime(scan_without_datetime: CsvFileSource) -> FeatureView: - class BreastDiagnoseFeatureView(FeatureView): - - metadata = FeatureViewMetadata( - name='breast_features', - description='Features defining a scan and diagnose of potential cancer cells', - source=scan_without_datetime, - ) +def breast_scan_feature_viewout_with_datetime(scan_without_datetime: CsvFileSource) -> FeatureViewWrapper: + @feature_view( + name='breast_features', + description='Features defining a scan and diagnose of potential cancer cells', + source=scan_without_datetime, + ) + class BreastDiagnoseFeatureView: scan_id = Entity(dtype=Int32()) diagnosis = String().description('The given diagnose. M for malignant, and B for benigne') @@ -203,7 +203,7 @@ class BreastDiagnoseFeatureView(FeatureView): fractal_dimension_se = Float() fractal_dimension_worst = Float() - return BreastDiagnoseFeatureView() + return BreastDiagnoseFeatureView @pytest.fixture @@ -224,14 +224,13 @@ def scan_with_datetime() -> CsvFileSource: @pytest.fixture -def breast_scan_feature_view_with_datetime(scan_with_datetime: CsvFileSource) -> FeatureView: - class BreastDiagnoseFeatureView(FeatureView): - - metadata = FeatureViewMetadata( - name='breast_features', - description='Features defining a scan and diagnose of potential cancer cells', - source=scan_with_datetime, - ) +def breast_scan_feature_view_with_datetime(scan_with_datetime: CsvFileSource) -> FeatureViewWrapper: + @feature_view( + name='breast_features', + description='Features defining a scan and diagnose of potential cancer cells', + source=scan_with_datetime, + ) + class BreastDiagnoseFeatureView: scan_id = Int32().as_entity() @@ -280,18 +279,19 @@ class BreastDiagnoseFeatureView(FeatureView): fractal_dimension_se = Float() fractal_dimension_worst = Float() - return BreastDiagnoseFeatureView() + return BreastDiagnoseFeatureView @pytest.fixture -def breast_scan_feature_view_with_datetime_and_aggregation(scan_with_datetime: CsvFileSource) -> FeatureView: - class BreastDiagnoseFeatureView(FeatureView): - - metadata = FeatureViewMetadata( - name='breast_features', - description='Features defining a scan and diagnose of potential cancer cells', - source=scan_with_datetime, - ) +def breast_scan_feature_view_with_datetime_and_aggregation( + scan_with_datetime: CsvFileSource, +) -> FeatureViewWrapper: + @feature_view( + name='breast_features', + description='Features defining a scan and diagnose of potential cancer cells', + source=scan_with_datetime, + ) + class BreastDiagnoseFeatureView: scan_id = Entity(dtype=Int32()) @@ -340,7 +340,7 @@ class BreastDiagnoseFeatureView(FeatureView): fractal_dimension_se = Float() fractal_dimension_worst = Float() - return BreastDiagnoseFeatureView() + return BreastDiagnoseFeatureView @pytest.fixture @@ -393,17 +393,14 @@ def titanic_source_scd() -> CsvFileSource: @pytest.fixture -def titanic_source_parquet() -> CsvFileSource: +def titanic_source_parquet() -> ParquetFileSource: return FileSource.parquet_at('test_data/titanic.parquet') @pytest.fixture -def titanic_feature_view(titanic_source: CsvFileSource) -> FeatureView: - class TitanicPassenger(FeatureView): - - metadata = FeatureViewMetadata( - name='titanic', description='Some features from the titanic dataset', source=titanic_source - ) +def titanic_feature_view(titanic_source: CsvFileSource) -> FeatureViewWrapper: + @feature_view(name='titanic', description='Some features from the titanic dataset', source=titanic_source) + class TitanicPassenger: passenger_id = Int32().as_entity() @@ -425,13 +422,13 @@ class TitanicPassenger(FeatureView): is_male, is_female = sex.one_hot_encode(['male', 'female']) is_mr = name.contains('Mr.') - return TitanicPassenger() + return TitanicPassenger @pytest.fixture -def titanic_model(titanic_feature_view: FeatureView) -> ModelContractWrapper: +def titanic_model(titanic_feature_view: FeatureViewWrapper) -> ModelContractWrapper: - features = titanic_feature_view + features = titanic_feature_view() @model_contract( name='titanic', @@ -453,15 +450,13 @@ class Titanic: @pytest.fixture -def titanic_feature_view_parquet(titanic_source_parquet: ParquetFileSource) -> FeatureView: - class TitanicPassenger(FeatureView): - - metadata = FeatureViewMetadata( - name='titanic_parquet', - description='Some features from the titanic dataset', - source=titanic_source_parquet, - ) - +def titanic_feature_view_parquet(titanic_source_parquet: ParquetFileSource) -> FeatureViewWrapper: + @feature_view( + name='titanic_parquet', + description='Some features from the titanic dataset', + source=titanic_source_parquet, + ) + class TitanicPassenger: passenger_id = Entity(dtype=Int32()) # Input values @@ -482,7 +477,7 @@ class TitanicPassenger(FeatureView): is_male, is_female = sex.one_hot_encode(['male', 'female']) is_mr = name.contains('Mr.') - return TitanicPassenger() + return TitanicPassenger @pytest.fixture @@ -499,12 +494,9 @@ def titanic_feature_store( @pytest.fixture -def alot_of_transforations_feature_view(titanic_source: CsvFileSource) -> FeatureView: - class TitanicPassenger(FeatureView): - - metadata = FeatureViewMetadata( - name='titanic', description='Some features from the titanic dataset', source=titanic_source - ) +def alot_of_transforations_feature_view(titanic_source: CsvFileSource) -> FeatureViewWrapper: + @feature_view(name='titanic', description='Some features from the titanic dataset', source=titanic_source) + class TitanicPassenger: passenger_id = Entity(dtype=Int32()) @@ -512,7 +504,7 @@ class TitanicPassenger(FeatureView): age = Float() name = String() sex = String() - survived = Int8() + survived = Bool() sibsp = Int32() cabin = String().fill_na('Nada') @@ -533,12 +525,12 @@ class TitanicPassenger(FeatureView): logical_and = is_mr & survived logical_or = is_mr | survived - return TitanicPassenger() + return TitanicPassenger @pytest.fixture def alot_of_transforation_feature_store( - alot_of_transforations_feature_view: FeatureView, + alot_of_transforations_feature_view: FeatureViewWrapper, ) -> ContractStore: feature_store = ContractStore.empty() feature_store.add_feature_view(alot_of_transforations_feature_view) @@ -547,8 +539,8 @@ def alot_of_transforation_feature_store( @pytest.fixture def combined_feature_store( - titanic_feature_view: FeatureView, - breast_scan_feature_viewout_with_datetime: FeatureView, + titanic_feature_view: FeatureViewWrapper, + breast_scan_feature_viewout_with_datetime: FeatureViewWrapper, ) -> ContractStore: feature_store = ContractStore.empty() feature_store.add_feature_view(titanic_feature_view) @@ -557,17 +549,16 @@ def combined_feature_store( @pytest.fixture -def titanic_feature_view_scd(titanic_source_scd: CsvFileSource) -> FeatureView: +def titanic_feature_view_scd(titanic_source_scd: CsvFileSource) -> FeatureViewWrapper: redis = RedisConfig.localhost() - class TitanicPassenger(FeatureView): - - metadata = FeatureViewMetadata( - name='titanic', - description='Some features from the titanic dataset', - source=titanic_source_scd, - stream_source=redis.stream(topic='titanic_stream').with_coder(JsonRecordCoder('json')), - ) + @feature_view( + name='titanic', + description='Some features from the titanic dataset', + source=titanic_source_scd, + stream_source=redis.stream(topic='titanic_stream').with_coder(JsonRecordCoder('json')), + ) + class TitanicPassenger: passenger_id = Entity(dtype=Int32()) @@ -597,13 +588,13 @@ class TitanicPassenger(FeatureView): is_male, is_female = sex.one_hot_encode(['male', 'female']) is_mr = name.contains('Mr.') - return TitanicPassenger() + return TitanicPassenger @pytest.fixture -def titanic_model_scd(titanic_feature_view_scd: FeatureView) -> ModelContractWrapper: +def titanic_model_scd(titanic_feature_view_scd: FeatureViewWrapper) -> ModelContractWrapper: - features = titanic_feature_view_scd + features = titanic_feature_view_scd() @model_contract( name='titanic', @@ -627,8 +618,8 @@ class Titanic: @pytest.fixture def titanic_feature_store_scd( - titanic_feature_view_scd: FeatureView, - titanic_feature_view_parquet: FeatureView, + titanic_feature_view_scd: FeatureViewWrapper, + titanic_feature_view_parquet: FeatureViewWrapper, titanic_model_scd: ModelContractWrapper, ) -> ContractStore: feature_store = ContractStore.empty() @@ -641,7 +632,7 @@ def titanic_feature_store_scd( @dataclass class FeatureData: data: pl.DataFrame - view: FeatureView + view: FeatureViewWrapper @dataclass @@ -658,9 +649,8 @@ def point_in_time_data_test() -> DataTest: placeholder_ds = FileSource.parquet_at('placeholder') - class CreditHistory(FeatureView): - - metadata = FeatureView.metadata_with('credit_history', description='', batch_source=placeholder_ds) + @feature_view(name='credit_history', description='', source=placeholder_ds) + class CreditHistory: dob_ssn = String().as_entity() event_timestamp = EventTimestamp() @@ -671,21 +661,16 @@ class CreditHistory(FeatureView): bankruptcies = Int32() - class CreditHistoryAggregate(FeatureView): - - metadata = FeatureView.metadata_with( - 'credit_history_agg', description='', batch_source=placeholder_ds - ) - + @feature_view(name='credit_history_agg', description='', source=placeholder_ds) + class CreditHistoryAggregate: dob_ssn = String().as_entity() event_timestamp = EventTimestamp() credit_card_due = Int64() credit_sum = credit_card_due.aggregate().over(weeks=1).sum() - class Loan(FeatureView): - - metadata = FeatureView.metadata_with('loan', description='', batch_source=placeholder_ds) + @feature_view(name='loan', description='', source=placeholder_ds) + class Loan: loan_id = Int32().as_entity() event_timestamp = EventTimestamp() @@ -760,9 +745,9 @@ class Loan(FeatureView): return DataTest( sources=[ - FeatureData(data=credit_data, view=CreditHistory()), - FeatureData(data=loan_data, view=Loan()), - FeatureData(data=credit_data, view=CreditHistoryAggregate()), + FeatureData(data=credit_data, view=CreditHistory), + FeatureData(data=loan_data, view=Loan), + FeatureData(data=credit_data, view=CreditHistoryAggregate), ], entities=entities, feature_reference=[ @@ -782,9 +767,8 @@ def point_in_time_data_test_wituout_event_timestamp() -> DataTest: placeholder_ds = FileSource.parquet_at('placeholder') - class CreditHistory(FeatureView): - - metadata = FeatureView.metadata_with('credit_history', description='', batch_source=placeholder_ds) + @feature_view(name='credit_history', description='', source=placeholder_ds) + class CreditHistory: dob_ssn = String().as_entity() event_timestamp = EventTimestamp() @@ -795,21 +779,16 @@ class CreditHistory(FeatureView): bankruptcies = Int32() - class CreditHistoryAggregate(FeatureView): - - metadata = FeatureView.metadata_with( - 'credit_history_agg', description='', batch_source=placeholder_ds - ) - + @feature_view(name='credit_history_agg', description='', source=placeholder_ds) + class CreditHistoryAggregate: dob_ssn = String().as_entity() event_timestamp = EventTimestamp() credit_card_due = Int64() credit_sum = credit_card_due.aggregate().over(weeks=1).sum() - class Loan(FeatureView): - - metadata = FeatureView.metadata_with('loan', description='', batch_source=placeholder_ds) + @feature_view(name='loan', description='', source=placeholder_ds) + class Loan: loan_id = Int32().as_entity() event_timestamp = EventTimestamp() @@ -882,9 +861,9 @@ class Loan(FeatureView): return DataTest( sources=[ - FeatureData(data=credit_data, view=CreditHistory()), - FeatureData(data=loan_data, view=Loan()), - FeatureData(data=credit_data, view=CreditHistoryAggregate()), + FeatureData(data=credit_data, view=CreditHistory), + FeatureData(data=loan_data, view=Loan), + FeatureData(data=credit_data, view=CreditHistoryAggregate), ], entities=entities, feature_reference=[ diff --git a/pyproject.toml b/pyproject.toml index da145eed..309e4931 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aligned" -version = "0.0.100" +version = "0.0.101" description = "A data managment and lineage tool for ML applications." authors = ["Mats E. Mollestad "] license = "Apache-2.0" diff --git a/test_data/credit_history_mater.parquet b/test_data/credit_history_mater.parquet index 5d09461370afdd5560b4d388f2a54a37f5f42711..dff7a83350a15932963880afa321b88426847edb 100644 GIT binary patch delta 641 zcmX@Zb)IX2%w%>(0eJ>F2_^;x24;2!Mn;YuyB!$Wbr_a!bN~u5G;p4AnEabbosnbW zKRL#f$+C>rDi&r2CWglGMyAH*oPr9bMg|s!X7Lv0qJk>M21bUK@ur66mXqrkC0I`} zs?A`UJeyIOO_V{@M$~R1mx`1`Ni&Wh)Rg6NyQ2WFxGtpbk2%jq%7{tDTG<0zQjT8IAEXJnBA@*YO0%lRp zKTP@{P!TwJAF~U8q$3c7g9yh!AfBAa;#ThlVtM!(q=X0MB;^IACAnK>B;{CUr-X;4 zC54A&B&7r8{i}e6SQGM9UX%~#sTRdM@O)@FG$kS(ZvE}wG&uL zGLY|-on+*dmTc@-3N^1BC>V=kpd(NSkAY?&1A%t9J32->0u2rUSrcjOl@sNjV-R8F jmyw(slosU?mJ?y@XP6Y|XggV-Rg{B)nSmj|F~|@AY%HAr delta 593 zcmX@lb%tw#%w$hSAsGoK1_lOZb_Pa9j*9ILjO;oF=?@)%LJSXfKA2p=sLtBQsK#M4 z*^yCVB8#cu@{J%R4V-7dN*H$Rp6I2)G>2*O1tv)WQ3lZ>(F`3?HXwsdluc$bKcmvb zef(;sMg|s!X7Lv0#^$_&D#ivzhL-WBhUS)>f*KZP1}28a@kXY`<`X|ku%2U7TgNh) zk5PFtE0eZ}N zd=EH8CB$~IgCzd2%Ahz)MC=Dh@&~(U5QEqkW-&H34zX90=QE3P{$SDvfeO#bdzoD{ zLLGr12t+sr0{peP zK3Rapt=_d9DCX{085JH>km6>U6&c}`9bpt?7!_t&l@e~?Xb#d|1vW4pM7o2St}Z|d zXr?E~SRm~RwHag>5CjwfHCX^16Y1!b?&#|11ah%cRfpP`eG7Oc4r-p{@M5cqz0}BBZ3<2=wTQ za4^Pr@Z6*E$bq8=;~(I`_$PSqz|j~FzBjw2Efx%-lXm9KdvCt?zBg~$EYHU%q2n|$ zL{$(}s+tLMLsN)9bWI8*SgxBg%t}}yy^<6eLPOJ!hKxa?8oIn*q%u?43yJMXmNtGG z7KBRBR7XtZNu{DID}^e+UMu^N5FXUHfWcqtwBL`_Q}N2BstD(?9qUNKXl*DE_0pQ_C7*&cGO2-NyPfldmO4&gMZzP z>zMAHoPr;WbpTT}7ha&8S z>H+69KFK+(Iese(_U&!7&vL#N_@Vzj;anD)rk?XV=)Wucg))P7KD7u_j;Cii1cr4A z@nHx*&a-tmlmxsSpG4@H{MO3yQoWd&_EyntUf63WLK2*(1$25EL^$*a&%v4x@_UFb zkY5r&M<5=U@Kk!E2=hhrOJjZ+*Dhq1c0uy->LQ2*71kz!>>&i{zZQQa5!kA literal 1862 zcmcIl%}*0S6rY_f+a)w4CeCCxE+!n<7zNpWP}&AerU-?Ip)Fq*F9j+kLQ2{VA3YdP zh8SavCr>?i_P~*Y@lPRO=!u(wKHCWHqyE@1GM8Vh;3ddhC8Sdr;meqf&|n5_*3a(q+ROQj>r zK*Wi}Mx5kGIJ^{bQqg28;@nTf$H$_H<)ta3X?BvG=GYjRfyUN*kJ$joS3*Sw331|s zM2|(6VF@_N)F25VN`pTwx{~}4%8#jiq9Xlf{_D~6O5VGq;%HY%;~f=uXN<*vzhdc& z6-S=K6vavryisYdyR}oSx`mxxS@1dKb!yjxodsWuoq23CxJm?EA+H5sK_p!)6xd^D+QJzUV&_xCMKo-~8kphH?Y4J0K9r)sj%8sco*MqDs zWk$^>vw=#v>@uCOz!1e(M_331?@0`ic?*vKHPB@3+ty9^TCSOvId8vJtUrCi{B=L$ z{6!yoKGPC zh-?jl5`dQzQwTkq-(6c>scmFt+;wzYkoGEyKthPLgig0ED`^}6 diff --git a/test_data/data/parquet_unix.parquet b/test_data/data/parquet_unix.parquet index b91a3f17f42d6faf76d61c2197ddeb26c81a93b9..b6f126ec3bc818af71da5c1daaaabea24638eda2 100644 GIT binary patch delta 278 zcmX@keVlv32UCqhulNg<*%%}^^qFj~cmP>IAXmZ!B$?S^k~&GkpJISgFeQ^Mm}R43 zlE^wm8ANSF9dtz5fDATKHW@Pp4Gx2*ncg!YHbCt{vOymzWT13Nw-u-eqImLLMl}I+ zeUmRU+8MxHASuBCvtN`!QjCE?QjiHK1~U%GglOOF$Rxrj08f}^ra}4u0uVI#E F0s!2xLB0S0 delta 278 zcmX@keVlv32h&Sh!hh~6vN1?-+{{WjuocJx0=W_PQ3oASHXwsdlugErL4)J=tApljN+26y_jupbZekljkz3 z383qne3{YC0OkTo2@aV3q70H^3=EQjOh7T1aX=)+QkYXGr!t#kn7?@q HvosR`BD+;i diff --git a/test_data/test_model.parquet b/test_data/test_model.parquet index 5902b9a9f155dd21fe214adacc36f442ab6b99ad..7ed436f551a1f7e5317575c151e5e415454ef9ff 100644 GIT binary patch delta 52 ycmdnQwux0t+*8vNJF+is^vFnKvFRWC8%O9tLj! delta 52 wcmdnQwuxVgw2^@-Z^nFvCPQ9xP-60J47uX#fBK