Skip to content

Commit

Permalink
save work in progress for further dev
Browse files Browse the repository at this point in the history
Signed-off-by: dafnapension <dafnashein@yahoo.com>
  • Loading branch information
dafnapension committed Apr 29, 2024
1 parent 622c80e commit 0ef5cdf
Showing 1 changed file with 139 additions and 6 deletions.
145 changes: 139 additions & 6 deletions src/unitxt/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import uuid
import warnings
from abc import ABC, abstractmethod
from collections import Counter
from collections import Counter, defaultdict
from copy import deepcopy
from dataclasses import field
from statistics import mean
Expand All @@ -17,6 +17,7 @@

from .artifact import Artifact
from .dataclass import AbstractField, InternalField, NonPositionalField, OptionalField
from .dict_utils import dict_get
from .logging_utils import get_logger
from .metric_utils import InstanceInput, MetricRequest, MetricResponse
from .operator import (
Expand Down Expand Up @@ -622,12 +623,24 @@ def compute(
pass


def scores_dict_from_instances_dict(
instances_dict: Dict[str, List[Dict[str, Any]]], score_name: str
):
to_ret = {}
for key, instances in instances_dict.items():
to_ret[key] = [
instance["score"]["instance"][score_name] for instance in instances
]
return to_ret


class InstanceMetric(SingleStreamOperator, MetricWithConfidenceInterval):
"""Class for metrics for which a global score can be calculated by aggregating the instance scores (possibly with additional instance inputs).
InstanceMetric currently allows two reductions:
InstanceMetric currently allows three reductions:
1. 'mean', which calculates the mean of instance scores,
2. 'group_mean', which first applies an aggregation function specified in the reduction_map
2. 'max', which finds the maximal instance score,
3. 'group_mean', which first applies an aggregation function specified in the reduction_map
to instance scores grouped by the field grouping_field (which must not be None), and returns the mean
of the group scores; if grouping_field is None, grouping is disabled.
See _validate_group_mean_reduction for formatting instructions.
Expand Down Expand Up @@ -747,9 +760,131 @@ def accuracy_diff(subgroup_scores_dict, expected_subgroup_types=['original', 'pa
self.subgroup_column in instance["task_data"] for instance in instances
), f"each instance task_data dict must have a key {self.subgroup_column}"

# flake8: noqa: C901
def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
instances, global_score = self.compute_instance_scores(stream)

# not clear to me why all types of aggregations (of which name and actual callable are delivered via "agg_func"
# in "reduction_map") are only allowed for groups and not over the whole list of instances.
# I am trying to unify this here
assert (
len(self.reduction_map) == 1
), f"@@@@@ @@@@@ @@@@@@@@@@@@@@@ offending is: {type(self)}"
reduction_type, reduction_params = next(iter(self.reduction_map.items()))
assert (
reduction_type in ["max", "mean", "group_mean"]
), f"Reduction {reduction_type} is not supported, please specify a valid reduction method in reduction_map {self.reduction_map}."

if self.subgroup_column is not None:
# this we check here, not necessarily within grouped. We allow subgroups also for the whole stream of instances
assert all(
self.subgroup_column in instance["task_data"] for instance in instances
), f"each instance task_data dict must have a key {self.subgroup_column}"
# and assert that there is an aggregate_function_name and aggregate_function. Currently, these arrive
# within reduction_params, and only for the case of grouped_mean. Need to take them out

if reduction_type == "group_mean" or self.subgroup_column is not None:
self._validate_group_mean_reduction(instances=instances)

reduction_fields = ( # we get reduction fields out of grouping
[self.main_score]
if "score_fields" not in reduction_params
else list(set(reduction_params["score_fields"]))
)

if reduction_type != "group_mean":
grouped_instances = {"all": instances}
else: # for future: make grouping a separate arg, not to be inferred from the name of the aggregation
grouped_instances = defaultdict(list)
group_by = "task_data/group_id"
for instance in instances:
try:
group_name = dict_get(instance, group_by)
except Exception as e:
raise ValueError(
f"Reduction type is group_mean, however instance {instance} does not contain subfield 'task_data/group_id'"
) from e
grouped_instances[group_name].append(instance)
# instances are now grouped by task_data/group_id, if reduction_type == 'group_mean', else - all instance make one group named 'all'

# continue to calculate the global score for each group (!) first:
# If reduction_type == 'group_mean', apply the aggregation specified by reduction_params, which, in turn,
# also applies considerations of subgroups, when subgroup_column is not None (these considerations, if any,
# are already coded in the aggregation function specified by the reduction_params).
# If reduction_type != 'group_mean', aggregate with either self.average_item_scores or self.max_item_scores,
# as indicated by reduction_type (== either 'mean' or 'max')

aggregation_function = None
if reduction_type == "mean":
aggregation_function = self.average_item_scores
elif reduction_type == "max":
aggregation_function = self.max_item_scores
else: # reduction_type == 'group_mean' and reduction_param specifies the aggregation function to employ (over
# scores, not over instances, but we will see to it).
aggregation_function = reduction_params[1]
# currently, sub_group is only applicable for reduction_type == 'group_mean', but in future: make it general
# if self.subgroup_column is not None, generate a dict, associated with each group, where the instances
# are grouped to lists by the subgroup_column of their instance.
if self.subgroup_column is not None:
# in the current code, this is an indication of grouping. Should be separated
for group_name, group in grouped_instances.items():
sub_grouped_instances = defaultdict(list)
sub_group_by = "task_data/" + self.subgroup_column
for instance in group:
try:
sub_group_name = dict_get(instance, sub_group_by)
except Exception as e:
raise ValueError(
f"subgroup_column is {self.subgroup_column}, however instance {instance} does not contain subfield '{sub_group_by}'"
) from e
sub_grouped_instances[sub_group_name].append(instance)
grouped_instances[
group_name
] = sub_grouped_instances # replaced the list by dict of split lists, per sub_group value

# if applicable ( or reduction_type == 'group_mean', and hence reduction_params indicates an aggregation to apply)
# -- compute score by the sub_groups, per their aggregation function (lambda..)
# otherwise
groups_global_scores = {}
for group_name in grouped_instances.keys():
groups_global_scores[group_name] = {}
for score_name in reduction_fields:
scores_dict = scores_dict_from_instances_dict(
grouped_instances[group_name], score_name
)
groups_global_scores[group_name][score_name] = reduction_params[
"agg_func"
][1](scores_dict)
# for each score_name in reduction_fields, each group now has a score, computed through its subgroups, the score sits in
# the group's global_score (only of the group), named score_name (as the name of the score in the ["score"]["instance"]
# section of the instances

# we now turn to compute the global score of the whole stream, by averaging over groups, if there were any
# (if reduction_type == 'group_mean'), or make the global_score of the sole group (called 'all') - the global_score
# of the whole stream. In the former case, we also prefix the score_name by
# "group_" + str(reduction_params["agg_func"][0]) + "_"+
# and further prefix the above by "fixed_" in case that CI is done over the group-scores (as just computed)
# and not over the whole input stream

# , prefixed
# as is done in the current code.
# we now turn to deal with ci, and accordingly, prefix (or not) the names of these global scores by "fixed_"

aggregation_function_name = reduction_type
## aggregation_func: over scores, not instances
if reduction_type != "group_mean":
aggregation_func = nan_mean if reduction_type == "mean" else nan_max
else:
aggregation_function_name = reduction_params["agg_func"][0]
aggregation_func = reduction_params["agg_func"][1]

# now see if (further) to split by subfield. This sub_group should also be independent of the grouping
# the following is just for ruff
assert aggregation_func != aggregation_function_name

# for field_name in reduction_fields:
# print()

for reduction_type, reduction_params in self.reduction_map.items():
assert (
reduction_type in self.implemented_reductions
Expand Down Expand Up @@ -1844,9 +1979,7 @@ class TokenOverlap(InstanceMetric):
single_reference_per_prediction = False
prediction_type = "str"

def compute(
self, references: List[Any], prediction: Any, task_data: List[Dict]
) -> dict:
def compute(self, references: List[Any], prediction: Any, task_data: Dict) -> dict:
results = [
self._compute_single_ref(str(reference), str(prediction))
for reference in references
Expand Down

0 comments on commit 0ef5cdf

Please sign in to comment.