Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added some conveniance methods for feature_wrappers #23

Merged
merged 1 commit into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions aligned/compiler/feature_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,10 +642,10 @@ def __floordiv__(self, other: FeatureFactory | Any) -> Float:
feature.transformation = RatioFactory(self, LiteralValue.from_value(other))
return feature

def __abs__(self) -> Float:
def __abs__(self) -> Int64:
from aligned.compiler.transformation_factory import AbsoluteFactory

feature = Float()
feature = Int64()
feature.transformation = AbsoluteFactory(self)
return feature

Expand Down Expand Up @@ -850,6 +850,30 @@ def aggregate(self) -> ArithmeticAggregation:
return ArithmeticAggregation(self)


class Int8(ArithmeticFeature, CouldBeEntityFeature, CouldBeModelVersion):
def copy_type(self) -> Int8:
return Int8()

@property
def dtype(self) -> FeatureType:
return FeatureType.int8()

def aggregate(self) -> ArithmeticAggregation:
return ArithmeticAggregation(self)


class Int16(ArithmeticFeature, CouldBeEntityFeature, CouldBeModelVersion):
def copy_type(self) -> Int16:
return Int16()

@property
def dtype(self) -> FeatureType:
return FeatureType.int16()

def aggregate(self) -> ArithmeticAggregation:
return ArithmeticAggregation(self)


class Int32(ArithmeticFeature, CouldBeEntityFeature, CouldBeModelVersion):
def copy_type(self) -> Int32:
return Int32()
Expand Down
4 changes: 3 additions & 1 deletion aligned/data_source/batch_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ def multi_source_features_for(

source, _ = requests[0]
if isinstance(source, BatchSourceModification):
return source.wrap_job(type(source.source).multi_source_features_for(facts, requests))
return source.wrap_job(
type(source.source).multi_source_features_for(facts, requests) # type: ignore
)
elif isinstance(source, DataFileReference):
from aligned.local.job import FileFactualJob

Expand Down
35 changes: 35 additions & 0 deletions aligned/feature_view/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import copy
import logging
import polars as pl
import pandas as pd

from abc import ABC, abstractproperty
from dataclasses import dataclass, field
Expand All @@ -24,6 +26,7 @@
resolve_keys,
)
from aligned.data_source.stream_data_source import StreamDataSource
from aligned.retrival_job import ConvertableToRetrivalJob, RetrivalJob
from aligned.schemas.derivied_feature import (
AggregatedFeature,
)
Expand All @@ -34,10 +37,13 @@
if TYPE_CHECKING:
from aligned.feature_store import FeatureViewStore
from datetime import datetime
from aligned.validation.interface import Validator

# Enables code compleation in the select method
T = TypeVar('T')

ConvertableData = TypeVar('ConvertableData', dict, pl.DataFrame, pd.DataFrame)


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -330,6 +336,35 @@ class MyView:
compiled = self.compile()
return await FeatureView.freshness_in_source(compiled, compiled.source)

def from_data(self, data: ConvertableToRetrivalJob) -> RetrivalJob:
request = self.compile().request_all
return RetrivalJob.from_convertable(data, request)

def drop_invalid(self, data: ConvertableData, validator: Validator | None = None) -> ConvertableData:
from aligned.retrival_job import DropInvalidJob

if not validator:
from aligned.validation.pandera import PanderaValidator

validator = PanderaValidator()

features = list(DropInvalidJob.features_to_validate(self.compile().request_all.needed_requests))

if isinstance(data, dict):
validate_data = pd.DataFrame(data)
else:
validate_data = data

if isinstance(validate_data, pl.DataFrame):
return validator.validate_polars(features, validate_data.lazy()).collect()
elif isinstance(validate_data, pd.DataFrame):
validated = validator.validate_pandas(features, validate_data)
if isinstance(data, dict):
return validated.to_dict(orient='list')
return validated # type: ignore
else:
raise ValueError(f'Invalid data type: {type(data)}')


class FeatureView(ABC):
"""
Expand Down
63 changes: 63 additions & 0 deletions aligned/feature_view/tests/test_joined_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import pytest
from aligned import feature_view, Int32, FileSource
import polars as pl


@feature_view(name='left', source=FileSource.csv_at('some_file.csv'))
class LeftData:

some_id = Int32().as_entity()

feature = Int32()


@feature_view(name='right', source=FileSource.csv_at('some_file.csv'))
class RightData:

some_id = Int32().as_entity()

other_feature = Int32()


@pytest.mark.asyncio
async def test_join_different_types_polars() -> None:

left_data = LeftData.from_data( # type: ignore
pl.DataFrame(
{'some_id': [1, 2, 3], 'feature': [2, 3, 4]}, schema={'some_id': pl.Int8, 'feature': pl.Int32}
)
)

right_data = RightData.from_data( # type: ignore
pl.DataFrame(
{'some_id': [1, 3, 2], 'other_feature': [3, 4, 5]},
schema={'some_id': pl.Int16, 'other_feature': pl.Int32},
)
)

expected_df = pl.DataFrame(
data={'some_id': [1, 2, 3], 'feature': [2, 3, 4], 'other_feature': [3, 5, 4]},
schema={
'some_id': pl.Int32,
'feature': pl.Int32,
'other_feature': pl.Int32,
},
)

new_data = left_data.join(right_data, 'inner', left_on='some_id', right_on='some_id')
result = await new_data.to_polars()

joined = result.collect().sort('some_id', descending=False)
assert joined.frame_equal(expected_df.select(joined.columns))


@pytest.mark.asyncio
async def test_unique_entities() -> None:

left_data = LeftData.from_data( # type: ignore
pl.DataFrame(
{'some_id': [1, 3, 3], 'feature': [2, 3, 4]}, schema={'some_id': pl.Int8, 'feature': pl.Int32}
)
)

left_data.unique_entities()
76 changes: 65 additions & 11 deletions aligned/retrival_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,10 @@ def derive_features(self, requests: list[RetrivalRequest] | None = None) -> Retr
def combined_features(self, requests: list[RetrivalRequest] | None = None) -> RetrivalJob:
return CombineFactualJob([self], requests or self.retrival_requests)

def ensure_types(self, requests: list[RetrivalRequest]) -> RetrivalJob:
def ensure_types(self, requests: list[RetrivalRequest] | None = None) -> RetrivalJob:
if not requests:
requests = self.retrival_requests

return EnsureTypesJob(job=self, requests=requests)

def select_columns(self, include_features: set[str]) -> RetrivalJob:
Expand All @@ -392,6 +395,14 @@ def update_vector_index(self, indexes: list[VectorIndex]) -> RetrivalJob:
def validate_entites(self) -> RetrivalJob:
return ValidateEntitiesJob(self)

def unique_on(self, unique_on: list[str], sort_key: str | None = None) -> RetrivalJob:
return UniqueRowsJob(job=self, unique_on=unique_on, sort_key=sort_key)

def unique_entities(self) -> RetrivalJob:
request = self.request_result

return self.unique_on(unique_on=request.entity_columns, sort_key=request.event_timestamp)

def fill_missing_columns(self) -> RetrivalJob:
return FillMissingColumnsJob(self)

Expand Down Expand Up @@ -585,6 +596,29 @@ async def to_polars(self) -> pl.LazyFrame:
left = await self.left_job.to_polars()
right = await self.right_job.to_polars()

return_request = self.left_job.request_result

# Need to ensure that the data types are the same. Otherwise will the join fail
for left_col, right_col in zip(self.left_on, self.right_on):
polars_type = [
feature
for feature in return_request.features.union(return_request.entities)
if feature.name == left_col
]
if not polars_type:
raise ValueError(f'Unable to find {left_col} in left request {return_request}.')

polars_type = polars_type[0].dtype.polars_type

left_column_dtypes = dict(zip(left.columns, left.dtypes))
right_column_dtypes = dict(zip(right.columns, right.dtypes))

if not left_column_dtypes[left_col].is_(polars_type):
left = left.with_columns(pl.col(left_col).cast(polars_type))

if not right_column_dtypes[right_col].is_(polars_type):
right = right.with_columns(pl.col(right_col).cast(polars_type))

return left.join(right, left_on=self.left_on, right_on=self.right_on, how=self.method)

def log_each_job(self) -> RetrivalJob:
Expand Down Expand Up @@ -816,20 +850,21 @@ def request_result(self) -> RequestResult:
def retrival_requests(self) -> list[RetrivalRequest]:
return self.job.retrival_requests

@property
def features_to_validate(self) -> set[Feature]:
return RequestResult.from_request_list(
[request for request in self.retrival_requests if not request.aggregated_features]
).features
@staticmethod
def features_to_validate(retrival_requests: list[RetrivalRequest]) -> set[Feature]:
result = RequestResult.from_request_list(
[request for request in retrival_requests if not request.aggregated_features]
)
return result.features.union(result.entities)

async def to_pandas(self) -> pd.DataFrame:
return await self.validator.validate_pandas(
list(self.features_to_validate), await self.job.to_pandas()
return self.validator.validate_pandas(
list(DropInvalidJob.features_to_validate(self.retrival_requests)), await self.job.to_pandas()
)

async def to_polars(self) -> pl.LazyFrame:
return await self.validator.validate_polars(
list(self.features_to_validate), await self.job.to_polars()
return self.validator.validate_polars(
list(DropInvalidJob.features_to_validate(self.retrival_requests)), await self.job.to_polars()
)

def with_subfeatures(self) -> RetrivalJob:
Expand Down Expand Up @@ -918,6 +953,25 @@ def remove_derived_features(self) -> RetrivalJob:
return self.job.remove_derived_features()


@dataclass
class UniqueRowsJob(RetrivalJob, ModificationJob):

job: RetrivalJob
unique_on: list[str]
sort_key: str | None = field(default=None)

async def to_pandas(self) -> pd.DataFrame:
return (await self.to_polars()).collect().to_pandas()

async def to_polars(self) -> pl.LazyFrame:
data = await self.job.to_polars()

if self.sort_key:
data = data.sort(self.sort_key, descending=True)

return data.unique(self.unique_on, keep='first').lazy()


@dataclass
class ValidateEntitiesJob(RetrivalJob, ModificationJob):

Expand All @@ -932,7 +986,7 @@ async def to_pandas(self) -> pd.DataFrame:

return data

async def to_polars(self) -> pl.DataFrame:
async def to_polars(self) -> pl.LazyFrame:
data = await self.job.to_polars()

for request in self.retrival_requests:
Expand Down
24 changes: 23 additions & 1 deletion aligned/schemas/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ class FeatureType(Codable):

@property
def is_numeric(self) -> bool:
return self.name in {'bool', 'int32', 'int64', 'float', 'double'} # Can be represented as an int
return self.name in {
'bool',
'int8',
'int16',
'int32',
'int64',
'float',
'double',
} # Can be represented as an int

@property
def python_type(self) -> type:
Expand All @@ -107,6 +115,8 @@ def python_type(self) -> type:

return {
'string': str,
'int8': int,
'int16': int,
'int32': int,
'int64': int,
'float': float,
Expand All @@ -127,6 +137,8 @@ def pandas_type(self) -> str | type:

return {
'string': str,
'int8': 'Int8',
'int16': 'Int16',
'int32': 'Int32',
'int64': 'Int64',
'float': np.float64,
Expand All @@ -149,6 +161,8 @@ def polars_type(self) -> type:
def feature_factory(self) -> ff.FeatureFactory:
return {
'string': ff.String(),
'int8': ff.Int8(),
'int16': ff.Int16(),
'int32': ff.Int32(),
'int64': ff.Int64(),
'float': ff.Float(),
Expand Down Expand Up @@ -186,6 +200,14 @@ def from_polars(polars_type: pl.DataType) -> FeatureType:
def string() -> FeatureType:
return FeatureType(name='string')

@staticmethod
def int8() -> FeatureType:
return FeatureType(name='int8')

@staticmethod
def int16() -> FeatureType:
return FeatureType(name='int16')

@staticmethod
def int32() -> FeatureType:
return FeatureType(name='int32')
Expand Down
Loading