Skip to content

Commit

Permalink
advance to shape score aggregations into operators
Browse files Browse the repository at this point in the history
Signed-off-by: dafnapension <dafnashein@yahoo.com>
  • Loading branch information
dafnapension committed May 12, 2024
1 parent a7326e5 commit 57d9a0a
Showing 1 changed file with 60 additions and 1 deletion.
61 changes: 60 additions & 1 deletion src/unitxt/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
StreamingOperator,
StreamInstanceOperator,
)
from .operators import CopyFields
from .operators import CopyFields, FilterByCondition
from .random_utils import get_seed
from .settings_utils import get_settings
from .stream import MultiStream, Stream
Expand Down Expand Up @@ -1119,6 +1119,65 @@ def compute(self, references: List[Any], prediction: Any, task_data: Dict) -> di
pass


class Aggregator(SingleStreamOperator):
"""Given a stream of individually scored instances, and a score_name, generate the stream-global score for that score_name.
For a given score_name, each instance is assumed to have a value in its instance["score"]["instance"][score_name].
This operator computes the global score from all these instance-scores, and writes this computed global score in the
instance["score"]["global"] section of each instance in the stream.
"""

def process(
self,
stream: Stream,
stream_name: Optional[str] = None,
score_names: Optional[List[str]] = None,
) -> Generator:
if score_names is None:
score_names = ["score"]
global_score = {}
instances = []
for instance in stream:
if "score" not in instance:
instance["score"] = {"global": global_score, "instance": {}}
else:
global_score = instance["score"]["global"]
instances.append(instance)

for score_name in self.score_names:
gs = self.aggregate_instance_scores_to_a_global_score(instances, score_name)
global_score.update(
{score_name: gs, score_name + "_agg_name": self.aggregator_name}
)
# all instances link to same global_score object, and hence all instances now have an updated global score
yield from instances

def aggregate_instance_scores_to_a_global_score(
self, instances: List[Dict[str, Any]], score_name: str
) -> float:
from .metrics import MetricWithConfidenceInterval

return MetricWithConfidenceInterval.average_item_scores(instances, score_name)


class FilterAggregator(Aggregator):
"""Filter the instances by a given filter, and aggregate over the remaining instances."""

filter_by_condition: FilterByCondition = None

def process(
self,
stream: Stream,
stream_name: Optional[str] = None,
score_names: Optional[List[str]] = None,
) -> Generator:
if self.filter_by_condition is None:
instances = stream
else:
instances = self.filter_by_condition(stream)
yield from super().process(stream=instances, score_names=score_names)


class Accuracy(InstanceMetric):
main_score = "accuracy"
ci_scores = ["accuracy"]
Expand Down

0 comments on commit 57d9a0a

Please sign in to comment.