diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index b3defb570a..c6f457fbda 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -3,7 +3,7 @@ import uuid import warnings from abc import ABC, abstractmethod -from collections import Counter +from collections import Counter, defaultdict from copy import deepcopy from dataclasses import field from statistics import mean @@ -17,6 +17,7 @@ from .artifact import Artifact from .dataclass import AbstractField, InternalField, NonPositionalField, OptionalField +from .dict_utils import dict_get from .logging_utils import get_logger from .metric_utils import InstanceInput, MetricRequest, MetricResponse from .operator import ( @@ -622,12 +623,24 @@ def compute( pass +def scores_dict_from_instances_dict( + instances_dict: Dict[str, List[Dict[str, Any]]], score_name: str +): + to_ret = {} + for key, instances in instances_dict.items(): + to_ret[key] = [ + instance["score"]["instance"][score_name] for instance in instances + ] + return to_ret + + class InstanceMetric(SingleStreamOperator, MetricWithConfidenceInterval): """Class for metrics for which a global score can be calculated by aggregating the instance scores (possibly with additional instance inputs). - InstanceMetric currently allows two reductions: + InstanceMetric currently allows three reductions: 1. 'mean', which calculates the mean of instance scores, - 2. 'group_mean', which first applies an aggregation function specified in the reduction_map + 2. 'max', which finds the maximal instance score, + 3. 'group_mean', which first applies an aggregation function specified in the reduction_map to instance scores grouped by the field grouping_field (which must not be None), and returns the mean of the group scores; if grouping_field is None, grouping is disabled. See _validate_group_mean_reduction for formatting instructions. @@ -747,9 +760,131 @@ def accuracy_diff(subgroup_scores_dict, expected_subgroup_types=['original', 'pa self.subgroup_column in instance["task_data"] for instance in instances ), f"each instance task_data dict must have a key {self.subgroup_column}" + # flake8: noqa: C901 def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: instances, global_score = self.compute_instance_scores(stream) + # not clear to me why all types of aggregations (of which name and actual callable are delivered via "agg_func" + # in "reduction_map") are only allowed for groups and not over the whole list of instances. + # I am trying to unify this here + assert ( + len(self.reduction_map) == 1 + ), f"@@@@@ @@@@@ @@@@@@@@@@@@@@@ offending is: {type(self)}" + reduction_type, reduction_params = next(iter(self.reduction_map.items())) + assert ( + reduction_type in ["max", "mean", "group_mean"] + ), f"Reduction {reduction_type} is not supported, please specify a valid reduction method in reduction_map {self.reduction_map}." + + if self.subgroup_column is not None: + # this we check here, not necessarily within grouped. We allow subgroups also for the whole stream of instances + assert all( + self.subgroup_column in instance["task_data"] for instance in instances + ), f"each instance task_data dict must have a key {self.subgroup_column}" + # and assert that there is an aggregate_function_name and aggregate_function. Currently, these arrive + # within reduction_params, and only for the case of grouped_mean. Need to take them out + + if reduction_type == "group_mean" or self.subgroup_column is not None: + self._validate_group_mean_reduction(instances=instances) + + reduction_fields = ( # we get reduction fields out of grouping + [self.main_score] + if "score_fields" not in reduction_params + else list(set(reduction_params["score_fields"])) + ) + + if reduction_type != "group_mean": + grouped_instances = {"all": instances} + else: # for future: make grouping a separate arg, not to be inferred from the name of the aggregation + grouped_instances = defaultdict(list) + group_by = "task_data/group_id" + for instance in instances: + try: + group_name = dict_get(instance, group_by) + except Exception as e: + raise ValueError( + f"Reduction type is group_mean, however instance {instance} does not contain subfield 'task_data/group_id'" + ) from e + grouped_instances[group_name].append(instance) + # instances are now grouped by task_data/group_id, if reduction_type == 'group_mean', else - all instance make one group named 'all' + + # continue to calculate the global score for each group (!) first: + # If reduction_type == 'group_mean', apply the aggregation specified by reduction_params, which, in turn, + # also applies considerations of subgroups, when subgroup_column is not None (these considerations, if any, + # are already coded in the aggregation function specified by the reduction_params). + # If reduction_type != 'group_mean', aggregate with either self.average_item_scores or self.max_item_scores, + # as indicated by reduction_type (== either 'mean' or 'max') + + aggregation_function = None + if reduction_type == "mean": + aggregation_function = self.average_item_scores + elif reduction_type == "max": + aggregation_function = self.max_item_scores + else: # reduction_type == 'group_mean' and reduction_param specifies the aggregation function to employ (over + # scores, not over instances, but we will see to it). + aggregation_function = reduction_params[1] + # currently, sub_group is only applicable for reduction_type == 'group_mean', but in future: make it general + # if self.subgroup_column is not None, generate a dict, associated with each group, where the instances + # are grouped to lists by the subgroup_column of their instance. + if self.subgroup_column is not None: + # in the current code, this is an indication of grouping. Should be separated + for group_name, group in grouped_instances.items(): + sub_grouped_instances = defaultdict(list) + sub_group_by = "task_data/" + self.subgroup_column + for instance in group: + try: + sub_group_name = dict_get(instance, sub_group_by) + except Exception as e: + raise ValueError( + f"subgroup_column is {self.subgroup_column}, however instance {instance} does not contain subfield '{sub_group_by}'" + ) from e + sub_grouped_instances[sub_group_name].append(instance) + grouped_instances[ + group_name + ] = sub_grouped_instances # replaced the list by dict of split lists, per sub_group value + + # if applicable ( or reduction_type == 'group_mean', and hence reduction_params indicates an aggregation to apply) + # -- compute score by the sub_groups, per their aggregation function (lambda..) + # otherwise + groups_global_scores = {} + for group_name in grouped_instances.keys(): + groups_global_scores[group_name] = {} + for score_name in reduction_fields: + scores_dict = scores_dict_from_instances_dict( + grouped_instances[group_name], score_name + ) + groups_global_scores[group_name][score_name] = reduction_params[ + "agg_func" + ][1](scores_dict) + # for each score_name in reduction_fields, each group now has a score, computed through its subgroups, the score sits in + # the group's global_score (only of the group), named score_name (as the name of the score in the ["score"]["instance"] + # section of the instances + + # we now turn to compute the global score of the whole stream, by averaging over groups, if there were any + # (if reduction_type == 'group_mean'), or make the global_score of the sole group (called 'all') - the global_score + # of the whole stream. In the former case, we also prefix the score_name by + # "group_" + str(reduction_params["agg_func"][0]) + "_"+ + # and further prefix the above by "fixed_" in case that CI is done over the group-scores (as just computed) + # and not over the whole input stream + + # , prefixed + # as is done in the current code. + # we now turn to deal with ci, and accordingly, prefix (or not) the names of these global scores by "fixed_" + + aggregation_function_name = reduction_type + ## aggregation_func: over scores, not instances + if reduction_type != "group_mean": + aggregation_func = nan_mean if reduction_type == "mean" else nan_max + else: + aggregation_function_name = reduction_params["agg_func"][0] + aggregation_func = reduction_params["agg_func"][1] + + # now see if (further) to split by subfield. This sub_group should also be independent of the grouping + # the following is just for ruff + assert aggregation_func != aggregation_function_name + + # for field_name in reduction_fields: + # print() + for reduction_type, reduction_params in self.reduction_map.items(): assert ( reduction_type in self.implemented_reductions @@ -1844,9 +1979,7 @@ class TokenOverlap(InstanceMetric): single_reference_per_prediction = False prediction_type = "str" - def compute( - self, references: List[Any], prediction: Any, task_data: List[Dict] - ) -> dict: + def compute(self, references: List[Any], prediction: Any, task_data: Dict) -> dict: results = [ self._compute_single_ref(str(reference), str(prediction)) for reference in references