Skip to content

Commit

Permalink
Merge pull request #20 from MatsMoll/matsei/agg-update
Browse files Browse the repository at this point in the history
Chore: more fixes and better support for aggregations
  • Loading branch information
MatsMoll authored Nov 19, 2023
2 parents 1272d94 + 0cc12c6 commit 79426ad
Show file tree
Hide file tree
Showing 15 changed files with 436 additions and 194 deletions.
133 changes: 77 additions & 56 deletions aligned/compiler/aggregation_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,37 @@
String,
TransformationFactory,
)
from aligned.schemas.derivied_feature import AggregateOver, AggregationTimeWindow, DerivedFeature
from aligned.schemas.transformation import Transformation


def aggregate_over(
group_by: list[FeatureReferance],
time_column: FeatureReferance | None,
time_window: timedelta | None,
every_interval: timedelta | None,
condition: DerivedFeature | None,
) -> AggregateOver:
if not time_window:
return AggregateOver(group_by)

if not time_column:
raise ValueError(
f'Aggregation {group_by} over {time_column} have a time window, but no event timestamp to use'
)

return AggregateOver(
group_by, AggregationTimeWindow(time_window, time_column, every_interval), condition=condition
)


@dataclass
class ConcatStringsAggrigationFactory(TransformationFactory, AggregationTransformationFactory):

feature: String
group_by: list[FeatureReferance]
separator: str | None = None
time_window: timedelta | None = None
every_interval: timedelta | None = None

@property
def using_features(self) -> list[FeatureFactory]:
Expand All @@ -28,21 +49,21 @@ def compile(self) -> Transformation:

return ConcatStringAggregation(
key=self.feature.feature_referance().name,
group_keys=[feature.name for feature in self.group_by],
separator=self.separator,
separator=self.separator or '',
)

def with_group_by(self, values: list[FeatureReferance]) -> TransformationFactory:
self.group_by = values
return self
def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, self.time_window, self.every_interval, None)


@dataclass
class SumAggregationFactory(TransformationFactory, AggregationTransformationFactory):

feature: FeatureFactory
group_by: list[FeatureReferance]
time_window: timedelta | None = None
every_interval: timedelta | None = None

@property
def using_features(self) -> list[FeatureFactory]:
Expand All @@ -53,20 +74,20 @@ def compile(self) -> Transformation:

return SumAggregation(
key=self.feature.feature_referance().name,
group_keys=[feature.name for feature in self.group_by],
)

def with_group_by(self, values: list[FeatureReferance]) -> TransformationFactory:
self.group_by = values
return self
def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, self.time_window, self.every_interval, None)


@dataclass
class MeanAggregationFactory(TransformationFactory, AggregationTransformationFactory):

feature: FeatureFactory
group_by: list[FeatureReferance]
time_window: timedelta | None = None
every_interval: timedelta | None = None

@property
def using_features(self) -> list[FeatureFactory]:
Expand All @@ -77,20 +98,20 @@ def compile(self) -> Transformation:

return MeanAggregation(
key=self.feature.feature_referance().name,
group_keys=[feature.name for feature in self.group_by],
)

def with_group_by(self, values: list[FeatureReferance]) -> TransformationFactory:
self.group_by = values
return self
def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, self.time_window, self.every_interval, None)


@dataclass
class MinAggregationFactory(TransformationFactory, AggregationTransformationFactory):

feature: FeatureFactory
group_by: list[FeatureReferance]
time_window: timedelta | None = None
every_interval: timedelta | None = None

@property
def using_features(self) -> list[FeatureFactory]:
Expand All @@ -101,20 +122,20 @@ def compile(self) -> Transformation:

return MinAggregation(
key=self.feature.feature_referance().name,
group_keys=[feature.name for feature in self.group_by],
)

def with_group_by(self, values: list[FeatureReferance]) -> TransformationFactory:
self.group_by = values
return self
def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, self.time_window, self.every_interval, None)


@dataclass
class MaxAggregationFactory(TransformationFactory, AggregationTransformationFactory):

feature: FeatureFactory
group_by: list[FeatureReferance]
time_window: timedelta | None = None
every_interval: timedelta | None = None

@property
def using_features(self) -> list[FeatureFactory]:
Expand All @@ -125,20 +146,20 @@ def compile(self) -> Transformation:

return MaxAggregation(
key=self.feature.feature_referance().name,
group_keys=[feature.name for feature in self.group_by],
)

def with_group_by(self, values: list[FeatureReferance]) -> TransformationFactory:
self.group_by = values
return self
def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, self.time_window, self.every_interval, None)


@dataclass
class CountAggregationFactory(TransformationFactory, AggregationTransformationFactory):

feature: FeatureFactory
group_by: list[FeatureReferance]
time_window: timedelta | None = None
every_interval: timedelta | None = None

@property
def using_features(self) -> list[FeatureFactory]:
Expand All @@ -149,20 +170,20 @@ def compile(self) -> Transformation:

return CountAggregation(
key=self.feature.feature_referance().name,
group_keys=[feature.name for feature in self.group_by],
)

def with_group_by(self, values: list[FeatureReferance]) -> TransformationFactory:
self.group_by = values
return self
def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, self.time_window, self.every_interval, None)


@dataclass
class CountDistinctAggregationFactory(TransformationFactory, AggregationTransformationFactory):

feature: FeatureFactory
group_by: list[FeatureReferance]
time_window: timedelta | None = None
every_interval: timedelta | None = None

@property
def using_features(self) -> list[FeatureFactory]:
Expand All @@ -173,20 +194,20 @@ def compile(self) -> Transformation:

return CountDistinctAggregation(
key=self.feature.feature_referance().name,
group_keys=[feature.name for feature in self.group_by],
)

def with_group_by(self, values: list[FeatureReferance]) -> TransformationFactory:
self.group_by = values
return self
def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, self.time_window, self.every_interval, None)


@dataclass
class StdAggregationFactory(TransformationFactory, AggregationTransformationFactory):

feature: FeatureFactory
group_by: list[FeatureReferance]
time_window: timedelta | None = None
every_interval: timedelta | None = None

@property
def using_features(self) -> list[FeatureFactory]:
Expand All @@ -197,20 +218,20 @@ def compile(self) -> Transformation:

return StdAggregation(
key=self.feature.feature_referance().name,
group_keys=[feature.name for feature in self.group_by],
)

def with_group_by(self, values: list[FeatureReferance]) -> TransformationFactory:
self.group_by = values
return self
def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, self.time_window, self.every_interval, None)


@dataclass
class VarianceAggregationFactory(TransformationFactory, AggregationTransformationFactory):

feature: FeatureFactory
group_by: list[FeatureReferance]
time_window: timedelta | None = None
every_interval: timedelta | None = None

@property
def using_features(self) -> list[FeatureFactory]:
Expand All @@ -221,20 +242,20 @@ def compile(self) -> Transformation:

return VarianceAggregation(
key=self.feature.feature_referance().name,
group_keys=[feature.name for feature in self.group_by],
)

def with_group_by(self, values: list[FeatureReferance]) -> TransformationFactory:
self.group_by = values
return self
def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, self.time_window, self.every_interval, None)


@dataclass
class MedianAggregationFactory(TransformationFactory, AggregationTransformationFactory):

feature: FeatureFactory
group_by: list[FeatureReferance]
time_window: timedelta | None = None
every_interval: timedelta | None = None

@property
def using_features(self) -> list[FeatureFactory]:
Expand All @@ -245,21 +266,21 @@ def compile(self) -> Transformation:

return MedianAggregation(
key=self.feature.feature_referance().name,
group_keys=[feature.name for feature in self.group_by],
)

def with_group_by(self, values: list[FeatureReferance]) -> TransformationFactory:
self.group_by = values
return self
def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, self.time_window, self.every_interval, None)


@dataclass
class PercentileAggregationFactory(TransformationFactory, AggregationTransformationFactory):

feature: FeatureFactory
percentile: float
group_by: list[FeatureReferance]
time_window: timedelta | None = None
every_interval: timedelta | None = None

@property
def using_features(self) -> list[FeatureFactory]:
Expand All @@ -271,9 +292,9 @@ def compile(self) -> Transformation:
return PercentileAggregation(
key=self.feature.feature_referance().name,
percentile=self.percentile,
group_keys=[feature.name for feature in self.group_by],
)

def with_group_by(self, values: list[FeatureReferance]) -> TransformationFactory:
self.group_by = values
return self
def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, self.time_window, self.every_interval, None)
18 changes: 0 additions & 18 deletions aligned/compiler/constraint_factory.py

This file was deleted.

Loading

0 comments on commit 79426ad

Please sign in to comment.