Skip to content

Commit

Permalink
Fixed mapping value in file source
Browse files Browse the repository at this point in the history
  • Loading branch information
MatsMoll committed Mar 4, 2024
1 parent 0aa5364 commit 3e8912d
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 17 deletions.
5 changes: 2 additions & 3 deletions aligned/compiler/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ModelMetadata:
# Will log the feature inputs to a model. Therefore, enabling log and wait etc.
# feature_logger: WritableBatchSource | None = field(default=None)
contacts: list[str] | None = field(default=None)
tags: dict[str, str] | None = field(default=None)
tags: list[str] | None = field(default=None)
description: str | None = field(default=None)
prediction_source: BatchDataSource | None = field(default=None)
prediction_stream: StreamDataSource | None = field(default=None)
Expand Down Expand Up @@ -101,7 +101,6 @@ def as_view(self) -> CompiledFeatureView | None:

return CompiledFeatureView(
name=self.metadata.name,
tags={},
source=view.source,
entities=view.entities,
features=view.features,
Expand Down Expand Up @@ -227,7 +226,7 @@ def model_contract(
name: str,
features: list[FeatureReferencable] | FeatureInputVersions,
contacts: list[str] | None = None,
tags: dict[str, str] | None = None,
tags: list[str] | None = None,
description: str | None = None,
prediction_source: BatchDataSource | None = None,
prediction_stream: StreamDataSource | None = None,
Expand Down
14 changes: 12 additions & 2 deletions aligned/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from aligned.compiler.model import ModelContractWrapper
from aligned.data_file import DataFileReference, upsert_on_column
from aligned.data_source.batch_data_source import BatchDataSource
from aligned.data_source.batch_data_source import BatchDataSource, ColumnFeatureMappable
from aligned.enricher import Enricher
from aligned.exceptions import UnableToFindFileException
from aligned.feature_source import (
Expand Down Expand Up @@ -676,9 +676,19 @@ async def insert_into(
import polars as pl

columns = write_request.all_returned_columns

if isinstance(source, ColumnFeatureMappable):
new_cols = source.feature_identifier_for(columns)

mappings = dict(zip(columns, new_cols))
values = values.rename(mappings)
columns = new_cols
existing_df = (await source.to_lazy_polars()).rename(mappings)
else:
existing_df = await source.to_lazy_polars()

new_df = (await values.to_lazy_polars()).select(columns)
try:
existing_df = await source.to_lazy_polars()
write_df = pl.concat([new_df, existing_df.select(columns)], how='vertical_relaxed')
except UnableToFindFileException:
write_df = new_df
Expand Down
10 changes: 5 additions & 5 deletions aligned/feature_view/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class FeatureViewMetadata:
application_source: BatchDataSource | None = field(default=None)
materialized_source: BatchDataSource | None = field(default=None)
contacts: list[str] | None = field(default=None)
tags: dict[str, str] = field(default_factory=dict)
tags: list[str] | None = field(default=None)
acceptable_freshness: timedelta | None = field(default=None)
unacceptable_freshness: timedelta | None = field(default=None)

Expand Down Expand Up @@ -98,7 +98,7 @@ def feature_view(
application_source: BatchDataSource | None = None,
materialized_source: BatchDataSource | None = None,
contacts: list[str] | None = None,
tags: dict[str, str] | None = None,
tags: list[str] | None = None,
acceptable_freshness: timedelta | None = None,
unacceptable_freshness: timedelta | None = None,
) -> Callable[[Type[T]], FeatureViewWrapper[T]]:
Expand All @@ -112,7 +112,7 @@ def decorator(cls: Type[T]) -> FeatureViewWrapper[T]:
application_source=application_source,
materialized_source=materialized_source,
contacts=contacts,
tags=tags or {},
tags=tags,
acceptable_freshness=acceptable_freshness,
unacceptable_freshness=unacceptable_freshness,
)
Expand Down Expand Up @@ -418,7 +418,7 @@ def metadata_with(
application_source: BatchDataSource | None = None,
staging_source: BatchDataSource | None = None,
contacts: list[str] | None = None,
tags: dict[str, str] | None = None,
tags: list[str] | None = None,
) -> FeatureViewMetadata:
from aligned import HttpStreamSource

Expand All @@ -430,7 +430,7 @@ def metadata_with(
application_source=application_source,
materialized_source=staging_source,
contacts=contacts,
tags=tags or {},
tags=tags,
)

@classmethod
Expand Down
4 changes: 3 additions & 1 deletion aligned/feature_view/tests/test_hidden_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

class TestView(FeatureView):

metadata = FeatureViewMetadata(name='test', description='test', tags={}, source=source.table('test'))
metadata = FeatureViewMetadata(
name='test', description='test', tags=['Test'], source=source.table('test')
)

test_id = Entity(String())

Expand Down
6 changes: 4 additions & 2 deletions aligned/schemas/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
@dataclass
class CompiledFeatureView(Codable):
name: str
tags: dict[str, str]
source: BatchDataSource

entities: set[Feature]
features: set[Feature]
derived_features: set[DerivedFeature]

tags: list[str] | None = field(default=None)
description: str | None = field(default=None)
aggregated_features: set[AggregatedFeature] = field(default_factory=set)

Expand All @@ -46,7 +47,6 @@ class CompiledFeatureView(Codable):

def __pre_serialize__(self) -> CompiledFeatureView:
assert isinstance(self.name, str)
assert isinstance(self.tags, dict)
assert isinstance(self.source, BatchDataSource)

for entity in self.entities:
Expand All @@ -58,6 +58,8 @@ def __pre_serialize__(self) -> CompiledFeatureView:
for aggregated_feature in self.aggregated_features:
assert isinstance(aggregated_feature, AggregatedFeature)

if self.tags is not None:
assert isinstance(self.tags, list)
if self.description is not None:
assert isinstance(self.description, str)
if self.event_timestamp is not None:
Expand Down
2 changes: 1 addition & 1 deletion aligned/schemas/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class Model(Codable):
predictions_view: PredictionsView
description: str | None = field(default=None)
contacts: list[str] | None = field(default=None)
tags: dict[str, str] | None = field(default=None)
tags: list[str] | None = field(default=None)
dataset_store: DatasetStore | None = field(default=None)
exposed_at_url: str | None = field(default=None)

Expand Down
12 changes: 10 additions & 2 deletions aligned/tests/test_model_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ async def test_model_insert_predictions() -> None:

path = 'test_data/test_model.parquet'

@model_contract(name='test_model', features=[], prediction_source=FileSource.parquet_at(path))
@model_contract(
name='test_model',
features=[],
prediction_source=FileSource.parquet_at(path).with_renames({'some_id': 'id'}),
)
class TestModel:
id = Int32().as_entity()

Expand All @@ -124,9 +128,13 @@ class TestModel:

await store.insert_into(FeatureLocation.model('test_model'), {'id': [1, 2, 3], 'a': [10, 14, 20]})

stored_data = pl.read_parquet(path).select(expected_frame.columns)
preds = await store.model('test_model').all_predictions().to_polars()

stored_data = pl.read_parquet(path).select(id=pl.col('some_id'), a=pl.col('a'))
assert stored_data.equals(expected_frame)

assert preds.select(expected_frame.columns).equals(expected_frame)


@pytest.mark.asyncio
async def test_model_upsert_predictions() -> None:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aligned"
version = "0.0.74"
version = "0.0.75"
description = "A data managment and lineage tool for ML applications."
authors = ["Mats E. Mollestad <mats@mollestad.no>"]
license = "Apache-2.0"
Expand Down
Binary file modified test_data/test_model.parquet
Binary file not shown.

0 comments on commit 3e8912d

Please sign in to comment.