From b7a541ec83fc4860fb8d5e6c49fc2a99858d8b18 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Sat, 27 Apr 2024 21:00:38 +0300 Subject: [PATCH 01/12] save work in progress for further dev Signed-off-by: dafnapension --- src/unitxt/metrics.py | 145 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 139 insertions(+), 6 deletions(-) diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index 701d2c323..c68c7150a 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -3,7 +3,7 @@ import uuid import warnings from abc import ABC, abstractmethod -from collections import Counter +from collections import Counter, defaultdict from copy import deepcopy from dataclasses import field from statistics import mean @@ -17,6 +17,7 @@ from .artifact import Artifact from .dataclass import AbstractField, InternalField, NonPositionalField, OptionalField +from .dict_utils import dict_get from .logging_utils import get_logger from .metric_utils import InstanceInput, MetricRequest, MetricResponse from .operator import ( @@ -622,12 +623,24 @@ def compute( pass +def scores_dict_from_instances_dict( + instances_dict: Dict[str, List[Dict[str, Any]]], score_name: str +): + to_ret = {} + for key, instances in instances_dict.items(): + to_ret[key] = [ + instance["score"]["instance"][score_name] for instance in instances + ] + return to_ret + + class InstanceMetric(SingleStreamOperator, MetricWithConfidenceInterval): """Class for metrics for which a global score can be calculated by aggregating the instance scores (possibly with additional instance inputs). - InstanceMetric currently allows two reductions: + InstanceMetric currently allows three reductions: 1. 'mean', which calculates the mean of instance scores, - 2. 'group_mean', which first applies an aggregation function specified in the reduction_map + 2. 'max', which finds the maximal instance score, + 3. 'group_mean', which first applies an aggregation function specified in the reduction_map to instance scores grouped by the field grouping_field (which must not be None), and returns the mean of the group scores; if grouping_field is None, grouping is disabled. See _validate_group_mean_reduction for formatting instructions. @@ -747,9 +760,131 @@ def accuracy_diff(subgroup_scores_dict, expected_subgroup_types=['original', 'pa self.subgroup_column in instance["task_data"] for instance in instances ), f"each instance task_data dict must have a key {self.subgroup_column}" + # flake8: noqa: C901 def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: instances, global_score = self.compute_instance_scores(stream) + # not clear to me why all types of aggregations (of which name and actual callable are delivered via "agg_func" + # in "reduction_map") are only allowed for groups and not over the whole list of instances. + # I am trying to unify this here + assert ( + len(self.reduction_map) == 1 + ), f"@@@@@ @@@@@ @@@@@@@@@@@@@@@ offending is: {type(self)}" + reduction_type, reduction_params = next(iter(self.reduction_map.items())) + assert ( + reduction_type in ["max", "mean", "group_mean"] + ), f"Reduction {reduction_type} is not supported, please specify a valid reduction method in reduction_map {self.reduction_map}." + + if self.subgroup_column is not None: + # this we check here, not necessarily within grouped. We allow subgroups also for the whole stream of instances + assert all( + self.subgroup_column in instance["task_data"] for instance in instances + ), f"each instance task_data dict must have a key {self.subgroup_column}" + # and assert that there is an aggregate_function_name and aggregate_function. Currently, these arrive + # within reduction_params, and only for the case of grouped_mean. Need to take them out + + if reduction_type == "group_mean" or self.subgroup_column is not None: + self._validate_group_mean_reduction(instances=instances) + + reduction_fields = ( # we get reduction fields out of grouping + [self.main_score] + if "score_fields" not in reduction_params + else list(set(reduction_params["score_fields"])) + ) + + if reduction_type != "group_mean": + grouped_instances = {"all": instances} + else: # for future: make grouping a separate arg, not to be inferred from the name of the aggregation + grouped_instances = defaultdict(list) + group_by = "task_data/group_id" + for instance in instances: + try: + group_name = dict_get(instance, group_by) + except Exception as e: + raise ValueError( + f"Reduction type is group_mean, however instance {instance} does not contain subfield 'task_data/group_id'" + ) from e + grouped_instances[group_name].append(instance) + # instances are now grouped by task_data/group_id, if reduction_type == 'group_mean', else - all instance make one group named 'all' + + # continue to calculate the global score for each group (!) first: + # If reduction_type == 'group_mean', apply the aggregation specified by reduction_params, which, in turn, + # also applies considerations of subgroups, when subgroup_column is not None (these considerations, if any, + # are already coded in the aggregation function specified by the reduction_params). + # If reduction_type != 'group_mean', aggregate with either self.average_item_scores or self.max_item_scores, + # as indicated by reduction_type (== either 'mean' or 'max') + + aggregation_function = None + if reduction_type == "mean": + aggregation_function = self.average_item_scores + elif reduction_type == "max": + aggregation_function = self.max_item_scores + else: # reduction_type == 'group_mean' and reduction_param specifies the aggregation function to employ (over + # scores, not over instances, but we will see to it). + aggregation_function = reduction_params[1] + # currently, sub_group is only applicable for reduction_type == 'group_mean', but in future: make it general + # if self.subgroup_column is not None, generate a dict, associated with each group, where the instances + # are grouped to lists by the subgroup_column of their instance. + if self.subgroup_column is not None: + # in the current code, this is an indication of grouping. Should be separated + for group_name, group in grouped_instances.items(): + sub_grouped_instances = defaultdict(list) + sub_group_by = "task_data/" + self.subgroup_column + for instance in group: + try: + sub_group_name = dict_get(instance, sub_group_by) + except Exception as e: + raise ValueError( + f"subgroup_column is {self.subgroup_column}, however instance {instance} does not contain subfield '{sub_group_by}'" + ) from e + sub_grouped_instances[sub_group_name].append(instance) + grouped_instances[ + group_name + ] = sub_grouped_instances # replaced the list by dict of split lists, per sub_group value + + # if applicable ( or reduction_type == 'group_mean', and hence reduction_params indicates an aggregation to apply) + # -- compute score by the sub_groups, per their aggregation function (lambda..) + # otherwise + groups_global_scores = {} + for group_name in grouped_instances.keys(): + groups_global_scores[group_name] = {} + for score_name in reduction_fields: + scores_dict = scores_dict_from_instances_dict( + grouped_instances[group_name], score_name + ) + groups_global_scores[group_name][score_name] = reduction_params[ + "agg_func" + ][1](scores_dict) + # for each score_name in reduction_fields, each group now has a score, computed through its subgroups, the score sits in + # the group's global_score (only of the group), named score_name (as the name of the score in the ["score"]["instance"] + # section of the instances + + # we now turn to compute the global score of the whole stream, by averaging over groups, if there were any + # (if reduction_type == 'group_mean'), or make the global_score of the sole group (called 'all') - the global_score + # of the whole stream. In the former case, we also prefix the score_name by + # "group_" + str(reduction_params["agg_func"][0]) + "_"+ + # and further prefix the above by "fixed_" in case that CI is done over the group-scores (as just computed) + # and not over the whole input stream + + # , prefixed + # as is done in the current code. + # we now turn to deal with ci, and accordingly, prefix (or not) the names of these global scores by "fixed_" + + aggregation_function_name = reduction_type + ## aggregation_func: over scores, not instances + if reduction_type != "group_mean": + aggregation_func = nan_mean if reduction_type == "mean" else nan_max + else: + aggregation_function_name = reduction_params["agg_func"][0] + aggregation_func = reduction_params["agg_func"][1] + + # now see if (further) to split by subfield. This sub_group should also be independent of the grouping + # the following is just for ruff + assert aggregation_func != aggregation_function_name + + # for field_name in reduction_fields: + # print() + for reduction_type, reduction_params in self.reduction_map.items(): assert ( reduction_type in self.implemented_reductions @@ -1898,9 +2033,7 @@ class TokenOverlap(InstanceMetric): single_reference_per_prediction = False prediction_type = "str" - def compute( - self, references: List[Any], prediction: Any, task_data: List[Dict] - ) -> dict: + def compute(self, references: List[Any], prediction: Any, task_data: Dict) -> dict: results = [ self._compute_single_ref(str(reference), str(prediction)) for reference in references From e8b957c36b177451d726d9b76b8d6b6cccf8bf71 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Mon, 29 Apr 2024 13:02:41 +0300 Subject: [PATCH 02/12] restructure aggregation of InstanceMetric to independent components Signed-off-by: dafnapension --- src/unitxt/metrics.py | 1309 +++++++++++++++------------------ tests/library/test_metrics.py | 87 +-- 2 files changed, 622 insertions(+), 774 deletions(-) diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index c68c7150a..6be255304 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -623,341 +623,330 @@ def compute( pass -def scores_dict_from_instances_dict( - instances_dict: Dict[str, List[Dict[str, Any]]], score_name: str -): - to_ret = {} - for key, instances in instances_dict.items(): - to_ret[key] = [ - instance["score"]["instance"][score_name] for instance in instances - ] - return to_ret - - class InstanceMetric(SingleStreamOperator, MetricWithConfidenceInterval): """Class for metrics for which a global score can be calculated by aggregating the instance scores (possibly with additional instance inputs). - InstanceMetric currently allows three reductions: - 1. 'mean', which calculates the mean of instance scores, - 2. 'max', which finds the maximal instance score, - 3. 'group_mean', which first applies an aggregation function specified in the reduction_map - to instance scores grouped by the field grouping_field (which must not be None), and returns the mean - of the group scores; if grouping_field is None, grouping is disabled. - See _validate_group_mean_reduction for formatting instructions. + Class InstanceMetric has a couple of aggregating functions implemented, + average_item_scores and max_item_scores, + each accepting a list of instances, and a score_name, and computes an aggregation over the scores (each being a float) + already stored in the instances, in instance["score"]["instance"][score_name] of each instance. + InstanceMetric stores the aggregated score in the analogous position: + in instance["score"]["global"][score_name] of each instance of the stream. + A different name (perhaps more informative) for that global score can be specified by the user. + + User can specify one of these already implemented aggregating function, or introduce a new one + per their need, and specify it via input argument 'aggregating' as detailed below. + + InstanceMetric facilitates a grouped aggregation: + The instances are split to groups according to the value sitting in a field whose name is specified + by the user, typically: "task_data/group_id". Then, the input aggregating function is applied + to each group separately, yielding group_score for each group, and the global score that is stored in + each instance of the stream, is the average over these group_score. + To this end, the user specifies the 'grouping' input argument, as detailed below. + + Facilitating a special variation of aggregation, InstanceMetric offers an easy expression of an aggregation + (over the whole stream or each group, in the latter case the final score is averaged over the groups) + driven by first (further) splitting the list to be aggregated over (again: the whole stream or a group) + to sub-lists, by the value sitting in another instance field specified by the user, and then either + the aggregation is only carried over a specific set of sublists (because the user is only interested in them), + or first carried over one set of sublists, and then over a second set of these sublists, and the final score + of the group-or-whole-stream is set to be the ratio between these two results. + The expression of such type aggregating functions is detailed below, for input argument 'aggregating' + + Users are encouraged to write an extension of InstanceMetric and add to it any aggregating + function they see fit, as demonstrated, for example, in class MinAccuracy. + """ + # for confidence_interval n_resamples: int = OptionalField( default_factory=lambda: settings.num_resamples_for_instance_metrics ) - # some group_mean aggregation functions (3rd element of "agg_func" list in the reduction) - # only require a list of instance scores (e.g., mean, median, etc.). Others aggregation functions - # require an additional column (e.g., a subgroup identifier) by which the instance scores will be grouped - # if subgroup_column is not None, a column by the specified name will be required in task_data - subgroup_column = None - implemented_reductions: List[str] = field( - default_factory=lambda: ["mean", "group_mean", "max"] - ) - - reduction_map: Dict[str, List[str]] = AbstractField() + # list of names of scores to be aggregated over. each sitting in instance["score"]["instance"]. + # if None, [self.main_score] is used to aggregate over + score_names: List[str] = None + + # if not None, must be of same length as score_names. + # specifies one to one the name of the field in "score/global" to hold the aggregated value from + # going over the respective score_name. if to_score_names is None - a name is computed via backward + # compatibility -- reflecting the other input args for aggregating. + to_score_names: List[str] = None + + # when grouping is not None, aggregation is done over groups -- splits of the stream of the instance, + # and then averaged over the groups aggregated results. + # when not None, it must consist of two fields: + # "group_by_field" which specifies the field in the instance whose value determines the group to which the instance belongs. + # example: "task_data/group_id" + # the second field of grouping, "ci_samples_from_groups_scores", is a boolean specifying whether resampling should be + # done from the individual groups' scores (True), as if each group is represented by one instance whose score instance + # is the group's aggregated score, or from the whole stream (False), where each resample is then split to + # groups, the score of which is then computed, and finally averaged with the other groups' scores. + grouping: dict = None + + # how to aggregate over the scores in the instances. Each and every score_name in score_names is aggregated (over + # the instances in the stream or group) by this aggregating function. + # Potentially, to be overridden by the subclasses. + aggregating: dict = None + # if not set by subclasses, it is set here to { + # "aggregating_function_name": "mean", + # "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + # } + + # another example: the user specifies a callable that aggregates a list of floats into one float. + # also specified is a name of a field, hereafter named subgroup_column, whose value indicates the + # sub-list into which each instance (of the stream or group) belongs. Finally, one or two sets of values + # of that subgroup_columns are specified, by which one or two sets of instances (from the stream or group) + # are identified. If one set, named the subgroup, the aggregation is only carried over that subgroup (of interest + # to the user), and the result of that aggregation becomes the score of the group-or-stream. + # If two sets, named control and comparison, the ratio between these two aggregations is set to be the score + # of the group-or-stream. reference_field: str = NonPositionalField(default="references") prediction_field: str = NonPositionalField(default="prediction") - def _validate_group_mean_reduction(self, instances: List[dict]): - """Ensure that group_mean reduction_map is properly formatted. - - Example: Apply the variance (np.var) to group Accuracy instance scores. This class would be specified as follows: - - class GroupVarianceAccuracy(Accuracy): - reduction_map = {'group_mean': {'agg_func': ['variance', np.var, True]}} - - reduction_map must be a dict with values containing - - an 'agg_func' field with value being a 3-element list where - - 1st element is a string name of the aggregation function (used in naming the CI report) - - 2nd element is the callable aggregation function - - 3rd element is a Boolean indicator of whether, during bootstrap CI calculation, the groups are to be sampled as single units. - If True, the group scores are calculated and then resampled. This treats the group units as the unit of - interest for which the CI is being compared. - If False, the instances are resampled individually, and the groups determined - (meaning the groups may be of slightly different size or composition from the original - depending on the resampling of the instances). - - Optional: 'score_fields' key with list value containing the string names of fields to apply the aggregation to - - If not present, the parent class main_score is used. - - The aggregation function (2nd element of agg_func) can be one of two types: - 1. simple: calculate a summary statistic from a single group of values (e.g. mean, median, etc.). - This is best suited for cases where the instances are independent of each other, other than belonging to the same group - 2. comparison: requires subgroup_column to be specified. This function conducts - a comparison between scores for differing values of subgroup_column (e.g., 'original' vs 'paraphrase'). - An example is where the original instance is a question, and the others are various paraphrases - or perturbations of this question. Here, the function would return, say, a comparison of the instance accuracies - rather than, say, the average instance accuracy. - In these cases, we recommend setting the 3rd parameter to be True so that the groups are resampled together. - - Example: - class GroupVsBaselineDiffAccuracy(Accuracy): - subgroup_column = 'variant_type' - reduction_map = {'group_mean': {'agg_func': ['accuracy_diff', accuracy_diff, True],}} - - # where the function is defined as - def accuracy_diff(subgroup_scores_dict, expected_subgroup_types=['original', 'paraphrase']): - validate_subgroup_types(subgroup_scores_dict, expected_subgroup_types) - from statistics import mean - return mean(subgroup_scores_dict['paraphrase']) - mean(subgroup_scores_dict['original']) - The input dataset should look like: - - 'group_id' 'question' 'variant_type' - 1 'How do you fix a car engine?' 'original' - 1 'What is the best way to fix an engine?' 'paraphrase' - 1 'How do you repair a car engine?' 'paraphrase' - 1 'How do I repair my engine?' 'paraphrase' - 2 'Why are ants eating my food?' 'original' - """ - # instances need to all have task_data field with field group_id - assert all( - "task_data" in instance for instance in instances - ), "each instance must have an task_data field" - assert all( - isinstance(instance["task_data"], dict) for instance in instances - ), "each instance must have an task_data field that is a dict" - assert all( - "group_id" in instance["task_data"] for instance in instances - ), "each instance task_data dict must have a key group_id" - - # validate the reduction_map - assert ( - "group_mean" in self.reduction_map - ), "reduction_map must have a 'group_mean' key" - fields = self.reduction_map["group_mean"] - # for group_mean, expects a dict - assert isinstance(fields, dict) - assert ( - "agg_func" in fields - ), "fields should have a key 'agg_func' whose value is a 3-element list of a function name, function definition, and a boolean indicator" - assert isinstance( - fields["agg_func"], list - ), "fields['agg_func'] should be a list" + def verify(self): + assert isinstance(self.aggregating, dict), "aggregating must be a dict" + assert len(self.aggregating) == 2, "aggregating must consist of two fields" assert ( - len(fields["agg_func"]) == 3 - ), "fields['agg_func'] should be a 3-element list" - assert isinstance( - fields["agg_func"][0], str - ), "first item in fields['agg_func'] should be a string name of a function" + "aggregating_function_name" in self.aggregating + and "aggregating_function" in self.aggregating + ), "aggregating must contain both fields: 'aggregating_function_name' and 'aggregating_function'" assert callable( - fields["agg_func"][1] - ), "second item in fields['agg_func'] should be a callable function" - assert isinstance( - fields["agg_func"][2], bool - ), "third item in fields['agg_func'] should be a boolean value" - if "score_fields" in fields: - assert isinstance(fields["score_fields"], list) - - # for aggregation functions that use the subgroup_column (expect a dict of lists), check that - # this field exists - if self.subgroup_column is not None: - assert all( - self.subgroup_column in instance["task_data"] for instance in instances - ), f"each instance task_data dict must have a key {self.subgroup_column}" - - # flake8: noqa: C901 - def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: - instances, global_score = self.compute_instance_scores(stream) + self.aggregating["aggregating_function"] + ), "self.aggregating['aggregating_function'] must be a callable" + + if self.grouping is not None: + assert isinstance( + self.grouping, dict + ), "if specified, grouping must be a dict" + assert len(self.grouping) == 2, "grouping must consist of two fields" + assert ( + "group_by_field" in self.grouping + and "ci_samples_from_groups_scores" in self.grouping + ), "grouping must consist of both fields 'group_by_field' and 'ci_samples_from_groups_scores'" + assert isinstance( + self.grouping["ci_samples_from_groups_scores"], bool + ), "grouping['ci_samples_from_groups_scores'] must be boolean" - # not clear to me why all types of aggregations (of which name and actual callable are delivered via "agg_func" - # in "reduction_map") are only allowed for groups and not over the whole list of instances. - # I am trying to unify this here assert ( - len(self.reduction_map) == 1 - ), f"@@@@@ @@@@@ @@@@@@@@@@@@@@@ offending is: {type(self)}" - reduction_type, reduction_params = next(iter(self.reduction_map.items())) + self.score_names is not None + ), "score_names should have been set by prepare, if not by a subclass" assert ( - reduction_type in ["max", "mean", "group_mean"] - ), f"Reduction {reduction_type} is not supported, please specify a valid reduction method in reduction_map {self.reduction_map}." - - if self.subgroup_column is not None: - # this we check here, not necessarily within grouped. We allow subgroups also for the whole stream of instances - assert all( - self.subgroup_column in instance["task_data"] for instance in instances - ), f"each instance task_data dict must have a key {self.subgroup_column}" - # and assert that there is an aggregate_function_name and aggregate_function. Currently, these arrive - # within reduction_params, and only for the case of grouped_mean. Need to take them out - - if reduction_type == "group_mean" or self.subgroup_column is not None: - self._validate_group_mean_reduction(instances=instances) - - reduction_fields = ( # we get reduction fields out of grouping - [self.main_score] - if "score_fields" not in reduction_params - else list(set(reduction_params["score_fields"])) - ) + self.to_score_names is not None + ), "to_score_names should have been set by prepare, if not by a subclass" + assert len(self.score_names) == len( + self.to_score_names + ), "'score_names' and 'to_score_names' must have the same length" + + def prepare(self): + if self.aggregating is None: + self.aggregating = { + "aggregating_function_name": "mean", + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + + if self.score_names is None: + self.score_names = [self.main_score] + self.prefix = "" + if self.to_score_names is None: + if self.grouping is not None: + self.prefix = "group_" + if self.grouping["ci_samples_from_groups_scores"]: + self.prefix = "fixed_group_" + self.prefix += self.aggregating["aggregating_function_name"] + self.prefix += "_" + # for backward compatibility, only when grouping do we note the aggregation function name + # we suggest to always add it + self.to_score_names = [ + self.prefix + score_name for score_name in self.score_names + ] + super().prepare() - if reduction_type != "group_mean": + def score_groups_globally( + self, instances: List[Dict[str, Any]], score_names: List[str] + ) -> dict: + if self.grouping is None: grouped_instances = {"all": instances} - else: # for future: make grouping a separate arg, not to be inferred from the name of the aggregation + else: grouped_instances = defaultdict(list) - group_by = "task_data/group_id" for instance in instances: try: - group_name = dict_get(instance, group_by) + group_name = dict_get(instance, self.grouping["group_by_field"]) except Exception as e: raise ValueError( - f"Reduction type is group_mean, however instance {instance} does not contain subfield 'task_data/group_id'" + f"Reduction type is group_mean, grouping is to be empoloyed, however instance {instance} does not contain subfield '{self.grouping['group_by_field']}'" ) from e grouped_instances[group_name].append(instance) - # instances are now grouped by task_data/group_id, if reduction_type == 'group_mean', else - all instance make one group named 'all' - + # instances are now grouped by task_data/group_id (generally: by self.grouping["by field"]), + # if self.grouping is not None, else - all instance make one group named 'all' # continue to calculate the global score for each group (!) first: - # If reduction_type == 'group_mean', apply the aggregation specified by reduction_params, which, in turn, - # also applies considerations of subgroups, when subgroup_column is not None (these considerations, if any, - # are already coded in the aggregation function specified by the reduction_params). - # If reduction_type != 'group_mean', aggregate with either self.average_item_scores or self.max_item_scores, - # as indicated by reduction_type (== either 'mean' or 'max') - - aggregation_function = None - if reduction_type == "mean": - aggregation_function = self.average_item_scores - elif reduction_type == "max": - aggregation_function = self.max_item_scores - else: # reduction_type == 'group_mean' and reduction_param specifies the aggregation function to employ (over - # scores, not over instances, but we will see to it). - aggregation_function = reduction_params[1] - # currently, sub_group is only applicable for reduction_type == 'group_mean', but in future: make it general - # if self.subgroup_column is not None, generate a dict, associated with each group, where the instances - # are grouped to lists by the subgroup_column of their instance. - if self.subgroup_column is not None: - # in the current code, this is an indication of grouping. Should be separated - for group_name, group in grouped_instances.items(): - sub_grouped_instances = defaultdict(list) - sub_group_by = "task_data/" + self.subgroup_column - for instance in group: - try: - sub_group_name = dict_get(instance, sub_group_by) - except Exception as e: - raise ValueError( - f"subgroup_column is {self.subgroup_column}, however instance {instance} does not contain subfield '{sub_group_by}'" - ) from e - sub_grouped_instances[sub_group_name].append(instance) - grouped_instances[ - group_name - ] = sub_grouped_instances # replaced the list by dict of split lists, per sub_group value - - # if applicable ( or reduction_type == 'group_mean', and hence reduction_params indicates an aggregation to apply) - # -- compute score by the sub_groups, per their aggregation function (lambda..) - # otherwise + # build the global score for each group, (potentially the only group called 'all') groups_global_scores = {} - for group_name in grouped_instances.keys(): + for group_name, group in grouped_instances.items(): groups_global_scores[group_name] = {} - for score_name in reduction_fields: - scores_dict = scores_dict_from_instances_dict( - grouped_instances[group_name], score_name - ) - groups_global_scores[group_name][score_name] = reduction_params[ - "agg_func" - ][1](scores_dict) - # for each score_name in reduction_fields, each group now has a score, computed through its subgroups, the score sits in - # the group's global_score (only of the group), named score_name (as the name of the score in the ["score"]["instance"] - # section of the instances - - # we now turn to compute the global score of the whole stream, by averaging over groups, if there were any - # (if reduction_type == 'group_mean'), or make the global_score of the sole group (called 'all') - the global_score - # of the whole stream. In the former case, we also prefix the score_name by - # "group_" + str(reduction_params["agg_func"][0]) + "_"+ - # and further prefix the above by "fixed_" in case that CI is done over the group-scores (as just computed) - # and not over the whole input stream - - # , prefixed - # as is done in the current code. - # we now turn to deal with ci, and accordingly, prefix (or not) the names of these global scores by "fixed_" - - aggregation_function_name = reduction_type - ## aggregation_func: over scores, not instances - if reduction_type != "group_mean": - aggregation_func = nan_mean if reduction_type == "mean" else nan_max - else: - aggregation_function_name = reduction_params["agg_func"][0] - aggregation_func = reduction_params["agg_func"][1] - - # now see if (further) to split by subfield. This sub_group should also be independent of the grouping - # the following is just for ruff - assert aggregation_func != aggregation_function_name + for score_name in score_names: + groups_global_scores[group_name][score_name] = self.aggregating[ + "aggregating_function" + ](instances=group, score_name=score_name) + + # for each score_name in score_names, each group now has a score, computed through its subgroups, if applicable. + # the score sits in the group's own global_score (only of the group), named score_name (as the name of the score in + # the ["score"]["instance"] section of the instances + return groups_global_scores + + def average_groups_global_scores( + self, instances: List[Dict[str, Any]], score_name: str + ) -> float: + groups_global_scores = self.score_groups_globally( + instances=instances, score_names=[score_name] + ) + return nan_mean( + [ + groups_global_scores[group_name][score_name] + for group_name in groups_global_scores + ] + ) - # for field_name in reduction_fields: - # print() + # flake8: noqa: C901 + # flake8: noqa: C408 + # flake8: noqa: C416 + def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: + instances, global_score = self.compute_instance_scores(stream) - for reduction_type, reduction_params in self.reduction_map.items(): - assert ( - reduction_type in self.implemented_reductions - ), f"Reduction {reduction_type} is not implemented, use one of {self.implemented_reductions}" - - field_name_full_prefix = "" - # used for passing to the bootstrapping, depends on whether the groups are fixed or not - aggregation_function = None - if reduction_type == "mean": - aggregation_function = self.average_item_scores - reduction_fields = list(set(reduction_params)) - # no group reduction, so resample instances individually - scores_to_resample = instances - elif reduction_type == "max": - aggregation_function = self.max_item_scores - reduction_fields = list(set(reduction_params)) - # no group reduction, so resample instances individually - scores_to_resample = instances - elif reduction_type == "group_mean": - aggregation_function = self.average_item_scores - self._validate_group_mean_reduction(instances=instances) - reduction_fields = ( - [self.main_score] - if "score_fields" not in reduction_params - else list(set(reduction_params["score_fields"])) - ) - aggregation_function_name = str(reduction_params["agg_func"][0]) - field_name_full_prefix = "group_" + aggregation_function_name + "_" - do_resample_as_group = reduction_params["agg_func"][2] - if do_resample_as_group: - # append fixed_ to name because resamples the groups as fixed units - field_name_full_prefix = "fixed_" + field_name_full_prefix - ( - scores_to_resample, - aggregation_function, - ) = self._set_up_group_mean_aggregation( - instances, reduction_params, reduction_fields + # each instance now has, in its "score/instance" field, a dict mattping each + # score name from self.score_names (at least these) to the instance's score for + # that score name. + # We now proceed to calculate (update) global score. + # to generalize the process, we say this global score is calculated by groups, + # since also when self.grouping is None, we say we deal with groups, a single group + # in this case, being the whole input stream. + + # calculate global scores for each score_name, for each group + groups_global_scores = self.score_groups_globally(instances, self.score_names) + + # and update the overall global score, of the whole stream from them. + for score_name, to_score_name in zip(self.score_names, self.to_score_names): + if self.grouping is None: + # there is only one group here + global_score.update( + {to_score_name: groups_global_scores["all"][score_name]} ) + else: - raise ValueError( - f"Reduction {reduction_type} is not supported, please specify a valid reduction method in reduction_map {self.reduction_map}." + global_score.update( + { + to_score_name: nan_mean( + [ + groups_global_scores[group_name][score_name] + for group_name in groups_global_scores.keys() + ] + ) + } ) - # calculate global scores for each reduction field - for field_name in reduction_fields: - field_name_full = field_name_full_prefix + field_name - # if group resampling (3rd element of agg_func parameter) is True, then - # 1. scores_to_resample are the group scores, and - # 2. aggregation_function is to take the raw mean - # if no group resampling (3rd element of agg_func parameter) is False, then - # 1. scores_to_resample are the original instance scores, and - # 2. aggregation_function is to apply the group aggregation from the instance scores - # either way, the application of aggregation_function to scores_to_resample yields the global score - global_score[field_name_full] = aggregation_function( - scores_to_resample, field_name + if self.main_score in self.score_names: + position = self.score_names.index(self.main_score) + global_score["score"] = global_score[self.to_score_names[position]] + global_score["score_name"] = self.to_score_names[position] + + # finally: the CI: + # if no grouping, or grouping["ci_samples_from_groups_scores"] is false: + # ci as usual, over the whole input stream, with aggregation function that + # was used above for the whole stream or the individual groups + # need to specify which fields should have CIs calculated for them through ci_scores + # (will not automatically calculate CIs for fields in reduction map) + if self.ci_scores is not None: + if ( + self.grouping is None + or not self.grouping["ci_samples_from_groups_scores"] + ): + confidence_interval = self.score_based_confidence_interval( + instances=instances, + score_names=list(set(self.ci_scores)), + ci_score_prefix=self.prefix, + aggregation_func=self.aggregating["aggregating_function"] + if self.grouping is None + else self.average_groups_global_scores, ) - if field_name == self.main_score: - global_score["score"] = global_score[field_name_full] - global_score["score_name"] = field_name_full - - # need to specify which fields should have CIs calculated for them through ci_scores - # (will not automatically calculate CIs for fields in reduction map) - if self.ci_scores is not None: + else: + # dress the individual groups's score like instance scores: for each group generate + # a dict having just the "score" field, and in it -- just the "instance" section, + # and in that section: all the score_names whose values is the aggregation over that group. + # then sample from them, aggregating by simple average: + to_sample_from = [ + {"score": {"instance": groups_global_scores[group_name]}} + for group_name in groups_global_scores.keys() + ] confidence_interval = self.score_based_confidence_interval( - instances=scores_to_resample, + instances=to_sample_from, score_names=list(set(self.ci_scores)), - ci_score_prefix=field_name_full_prefix, - aggregation_func=aggregation_function, + ci_score_prefix=self.prefix, + aggregation_func=self.average_item_scores, ) - global_score.update(confidence_interval) + + global_score.update(confidence_interval) + + # finally, update all the instances with the global score now all computed: + for instance in instances: + instance["score"]["global"] = global_score yield from instances + @staticmethod + def prepare_for_subgroup_score( + subgroup_column: str, subgroup_types: List[str], score_aggregator: callable + ) -> callable: + def subgroup_column_score(instances: List[Dict[str, Any]], score_name: str): + needed_scores = [] + for instance in instances: + try: + subgroup_value = dict_get(instance, subgroup_column) + except ValueError as ve: + raise ValueError( + f"subgroup_column, {subgroup_column}, is specified, but is not found in instance {instance}" + ) from ve + if subgroup_value in subgroup_types: + needed_scores.append(instance["score"]["instance"][score_name]) + + return score_aggregator(needed_scores) if len(needed_scores) > 0 else np.nan + + return subgroup_column_score + + @staticmethod + # score_calculator receives two lists of float scores, named control_subgroup, and comparison_subgroup, + # and returns one float from them + # the method does not check the mutual exclusion of control_subgroup_types vs comparison_subgroup_types + # and arbitrarily append to the control group an instance whose type belong to both (if any such instance) + def prepare_for_control_reference_score( + subgroup_column: str, + control_subgroup_types: List[str], + comparison_subgroup_types: List[str], + score_calculator: callable, + ) -> callable: + def subgroup_column_control_comparison_score( + instances: List[Dict[str, Any]], score_name: str + ): + needed_controls = [] + needed_comparisons = [] + for instance in instances: + try: + subgroup_value = dict_get(instance, subgroup_column) + except ValueError as ve: + raise ValueError( + f"subgroup_column, {subgroup_column}, is specified, but is not found in instance {instance}" + ) from ve + if subgroup_value in control_subgroup_types: + needed_controls.append(instance["score"]["instance"][score_name]) + elif subgroup_value in comparison_subgroup_types: + needed_comparisons.append(instance["score"]["instance"][score_name]) + + return score_calculator( + control_subgroup=needed_controls, comparison_subgroup=needed_comparisons + ) + + return subgroup_column_control_comparison_score + def compute_instance_scores( self, stream: Stream, stream_name: Optional[str] = None ): @@ -997,103 +986,14 @@ def compute_instance_scores( return instances, global_score - def get_group_scores( - self, instances: List[dict], score_names: List[str], group_aggregation_func - ): - """Group scores by the group_id and subgroup_type fields of each instance, and compute group_aggregation_func by group. - - Args: - instances: List of observation instances with instance-level scores (fields) computed. - score_names: List of instance score names in each instance to apply the aggregation function. - group_aggregation_func: Callable aggregation function accepting a list of numeric scores; - or, if self.subgroup_column is not None, a dict of subgroup types scores by subgroup_column value. - callable function returns a single score for the group - - Returns: - List of dicts, each corresponding to a group of instances (defined by 'group_id'), - with an aggregate group score for each score_name - """ - from collections import defaultdict - - # three-level defaultdict: - # first is the grouping, second is the field name, the third is the subgroup_type (by default 'default') - group_to_instance_scores = defaultdict( - lambda: defaultdict(lambda: defaultdict(list)) - ) - - # check if function has fields for subgroup_column - uses_subgroups = self.subgroup_column is not None - default_subgroup_name = "default" - # loop through the instances and group the scores - for instance in instances: - task_data = instance["task_data"] - group_key = task_data["group_id"] - # for functions that do comparisons between subgroup_column groups - # if function doesn't use subgroup_column, or none is present, set "default" as default value, and pass all scores - subgroup_type = ( - task_data[self.subgroup_column] - if uses_subgroups - else default_subgroup_name - ) - for score_name in score_names: - group_to_instance_scores[group_key][score_name][subgroup_type].append( - instance["score"]["instance"][score_name] - ) - - # if group_aggregation_func expects a subgroup-types score dict, pass it; otherwise pass the default type list of scores - return [ - { - "score": { - "instance": { - score_name: group_aggregation_func( - score_dict - if uses_subgroups - else score_dict[default_subgroup_name] - ) - for score_name, score_dict in group_scores.items() - } - } - } - for group_scores in group_to_instance_scores.values() - ] - - def _set_up_group_mean_aggregation( - self, instances, reduction_params, reduction_fields - ): - group_aggregation_func = reduction_params["agg_func"][1] - # if treat groups as units - do_resample_as_group = reduction_params["agg_func"][2] - if do_resample_as_group: - # pass the group aggregate---not instance---scores to resample as usual - aggregation_function = self.average_item_scores - scores_to_resample = self.get_group_scores( - instances, reduction_fields, group_aggregation_func - ) - else: - # pass the instance scores to resample, and calculate the group aggregation on the resamplings - scores_to_resample = instances - - def aggregation_function( - instances, - field_name, - group_aggregation_func=group_aggregation_func, - ): - group_scores = self.get_group_scores( - instances, [field_name], group_aggregation_func - ) - return nan_mean( - [group["score"]["instance"][field_name] for group in group_scores] - ) - - return scores_to_resample, aggregation_function - @abstractmethod def compute(self, references: List[Any], prediction: Any, task_data: Dict) -> dict: pass class Accuracy(InstanceMetric): - reduction_map = {"mean": ["accuracy"]} + grouping = None + score_names = ["accuracy"] main_score = "accuracy" ci_scores = ["accuracy"] @@ -1149,11 +1049,33 @@ def compute( class MaxAccuracy(Accuracy): """Calculate the maximal accuracy over all instances as the global score.""" - reduction_map = {"max": ["accuracy"]} + aggregating = { + "aggregating_function_name": "max", + "aggregating_function": MetricWithConfidenceInterval.max_item_scores, + } + + +class MinAccuracy(Accuracy): + """Calculate the minimal accuracy over all instances as the global score.""" + + def min_item_score(self, instances: List[Dict[str, Any]], score_name: str) -> float: + raw_scores = [ + instance["score"]["instance"][score_name] for instance in instances + ] + non_nan_raw_scores = [score for score in raw_scores if not np.isnan(score)] + if len(non_nan_raw_scores) == 0: + return np.nan + return np.min(non_nan_raw_scores) + + def prepare(self): + self.aggregating = { + "aggregating_function_name": "min", + "aggregating_function": self.min_item_score, + } + super().prepare() class UnsortedListExactMatch(InstanceMetric): - reduction_map = {"mean": ["unsorted_list_exact_match"]} main_score = "unsorted_list_exact_match" ci_scores = ["unsorted_list_exact_match"] @@ -1167,7 +1089,6 @@ def compute( class StringContainment(InstanceMetric): - reduction_map = {"mean": ["string_containment"]} main_score = "string_containment" ci_scores = ["string_containment"] @@ -1655,7 +1576,6 @@ def compute(self, references, predictions, task_data: List[Dict]): # Computes char edit distance, ignoring whitespace class CharEditDistance(InstanceMetric): main_score = "char_edit_distance" - reduction_map = {"mean": [main_score]} ci_scores = [main_score] prediction_type = "str" single_reference_per_prediction = True @@ -1686,7 +1606,7 @@ def compute(self, references, prediction: str, task_data: List[Dict]) -> dict: class CharEditDistanceAccuracy(CharEditDistance): main_score = "char_edit_dist_accuracy" - reduction_map = {"mean": [main_score]} + ci_scores = [main_score] accuracy_metric = True @@ -2027,7 +1947,8 @@ def lower(text): class TokenOverlap(InstanceMetric): - reduction_map = {"mean": ["f1", "precision", "recall"]} + score_names = ["f1", "precision", "recall"] + main_score = "f1" ci_scores = ["f1", "precision", "recall"] single_reference_per_prediction = False @@ -2209,11 +2130,11 @@ class LlamaIndexCorrectness(InstanceMetric): model_name: str = "" main_score: str = "" prediction_type: str = "str" - reduction_map: Dict[str, List[str]] = None + aggregating: dict = None + openai_models: List[str] = ["gpt-3.5-turbo"] - anthropic_models: List[ - str - ] = [] # this is here for the sake of documentation for future models + # anthropic_models is here for the sake of documentation for future models: + anthropic_models: List[str] = [] mock_models: List[str] = ["mock"] external_api_models = openai_models + anthropic_models @@ -2247,14 +2168,12 @@ def _model_using_extrnal_api(self): def prepare(self): """Initialization method for the metric. Initializes the CorrectnessEvaluator with the OpenAI model.""" - super().prepare() - self.model_name_normalized = self.model_name.replace(".", "_").replace("-", "_") self.main_score: str = ( f"correctness_llama_index_by_{self.model_name_normalized}_judge" ) - self.reduction_map: Dict[str, List[str]] = {"mean": [self.main_score]} + super().prepare() from llama_index.core.evaluation import CorrectnessEvaluator @@ -2758,7 +2677,6 @@ def _compute( class MRR(RetrievalMetric): - reduction_map = {"mean": ["mrr"]} main_score = "mrr" ci_scores = ["mrr"] @@ -2775,7 +2693,6 @@ def _compute( class MAP(RetrievalMetric): - reduction_map = {"mean": ["map"]} main_score = "map" ci_scores = ["map"] @@ -2800,17 +2717,17 @@ def _compute( class RetrievalAtK(RetrievalMetric): k_list: List[int] main_score: str = None - reduction_map: Dict[str, List[str]] = None + aggregating: dict = None def prepare(self): - super().prepare() self.main_score = self.score_name("match", self.k_list[0]) self.ci_scores = [ self.score_name(measure, k) for measure in ["precision", "recall", "match"] for k in self.k_list ] - self.reduction_map = {"mean": self.ci_scores} + self.score_names = self.ci_scores + super().prepare() @staticmethod def score_name(measure: str, k: int): @@ -2938,78 +2855,34 @@ def set_n_resamples(self, n_resample): pass -def validate_subgroup_types( - subgroup_scores_dict: Dict[str, List], - control_subgroup_types: List[str], - comparison_subgroup_types: List[str], -): - """Validate a dict of subgroup type instance score lists, and subgroup type lists. - - Args: - subgroup_scores_dict: dict where keys are subgroup types and values are lists of instance scores. - control_subgroup_types: list of subgroup types (potential keys of subgroup_scores_dict) that are the control (baseline) group - comparison_subgroup_types: list of subgroup types (potential keys of subgroup_scores_dict) that are the group - to be compared to the control group. - - Returns: - dict with all NaN scores removed; control_subgroup_types and comparison_subgroup_types will have non-unique elements removed - """ - # note: subgroup_scores_dict is already a defaultdict of lists, so don't need to check that keys in control_ and comparison_subgroup_types exist in it - # remove any NaNs - subgroup_scores_dict.update( - { - subgroup_name: [score for score in score_list if not np.isnan(score)] - for subgroup_name, score_list in subgroup_scores_dict.items() - } - ) - assert isinstance( - control_subgroup_types, list - ), "control_subgroup_types must be a list" - assert isinstance( - comparison_subgroup_types, list - ), "comparison_subgroup_types must be a list" - # make sure each list is unique, so that labels aren't double-counted - control_subgroup_types = list(set(control_subgroup_types)) - comparison_subgroup_types = list(set(comparison_subgroup_types)) - - return subgroup_scores_dict, control_subgroup_types, comparison_subgroup_types - - def performance_drop_rate( - subgroup_scores_dict: Dict[str, List], - control_subgroup_types: List[str], - comparison_subgroup_types: List[str], + control_subgroup: List[float], + comparison_subgroup: List[float], ): """Percentage decrease of mean performance on test elements relative to that on a baseline (control). from https://arxiv.org/pdf/2306.04528.pdf. Args: - subgroup_scores_dict: dict where keys are subgroup types and values are lists of instance scores. - control_subgroup_types: list of subgroup types (potential keys of subgroup_scores_dict) that are the control (baseline) group - comparison_subgroup_types: list of subgroup types (potential keys of subgroup_scores_dict) that are the group - to be compared to the control group. + control_subgroup: list of scores of the instances that belong to the control (baseline) subgroup + comparison_subgroup: list of scores of the instances that belong to the subgroup + to be compared to the control subgroup. Returns: numeric PDR metric. If only one element (no test set) or the first is 0 (percentage change is undefined) return NaN otherwise, calculate PDR """ - ( - subgroup_scores_dict, - control_subgroup_types, - comparison_subgroup_types, - ) = validate_subgroup_types( - subgroup_scores_dict, control_subgroup_types, comparison_subgroup_types - ) + no_nan_control_subgroup = [ + score for score in control_subgroup if not np.isnan(score) + ] + no_nan_comparison_subgroup = [ + score for score in comparison_subgroup if not np.isnan(score) + ] # combine all scores from each label (if there are more than 1 in each group) into a list - group_scores_list = [ - np.concatenate( - [subgroup_scores_dict[subgroup_name] for subgroup_name in name_list] - ) - for name_list in [control_subgroup_types, comparison_subgroup_types] - ] + group_scores_list = [no_nan_control_subgroup, no_nan_comparison_subgroup] + if any(len(scores) == 0 for scores in group_scores_list): # no comparison can be made since there is not at least one score per type return np.nan @@ -3065,10 +2938,21 @@ def interpret_effect_size(x: float): )[0] +def abs_normalized_cohens_h( + control_subgroup: List[float], + comparison_subgroup: List[float], + interpret=False, +): + return np.abs( + normalized_cohens_h( + control_subgroup=control_subgroup, comparison_subgroup=comparison_subgroup + ) + ) + + def normalized_cohens_h( - subgroup_scores_dict: Dict[str, List], - control_subgroup_types: List[str], - comparison_subgroup_types: List[str], + control_subgroup: List[float], + comparison_subgroup: List[float], interpret=False, ): """Cohen's h effect size between two proportions, normalized to interval [-1,1]. @@ -3101,34 +2985,28 @@ def normalized_cohens_h( Returns: float score between -1 and 1, and a string interpretation if interpret=True """ - ( - subgroup_scores_dict, - control_subgroup_types, - comparison_subgroup_types, - ) = validate_subgroup_types( - subgroup_scores_dict, control_subgroup_types, comparison_subgroup_types - ) + no_nan_control_subgroup = [ + score for score in control_subgroup if not np.isnan(score) + ] + no_nan_comparison_subgroup = [ + score for score in comparison_subgroup if not np.isnan(score) + ] # requires scores to be in [0,1] - for subgroup_name, score_list in subgroup_scores_dict.items(): - assert all( - 0 <= score <= 1 for score in score_list - ), f"all {subgroup_name} scores must be in [0,1]" + assert all( + 0 <= score <= 1 for score in no_nan_control_subgroup + ), "all control scores must be in [0,1]" - # combine all scores from each label (if there are more than 1 in each group) into a list - group_scores_list = [ - np.concatenate( - [subgroup_scores_dict[subgroup_name] for subgroup_name in name_list] - ) - for name_list in [control_subgroup_types, comparison_subgroup_types] - ] + assert all( + 0 <= score <= 1 for score in no_nan_comparison_subgroup + ), "all comparison scores must be in [0,1]" - if any(len(scores) == 0 for scores in group_scores_list): + if len(no_nan_control_subgroup) == 0 or len(no_nan_comparison_subgroup) == 0: # no comparison can be made since there is not at least one score per type h, norm_h = np.nan, np.nan else: - control_mean = mean(group_scores_list[0]) - comparison_mean = mean(group_scores_list[1]) + control_mean = mean(no_nan_control_subgroup) + comparison_mean = mean(no_nan_comparison_subgroup) h = 2 * (np.arcsin(np.sqrt(comparison_mean)) - np.arcsin(np.sqrt(control_mean))) norm_h = np.clip(a=h / np.pi, a_min=-1, a_max=1) @@ -3138,10 +3016,21 @@ def normalized_cohens_h( return norm_h, interpret_effect_size(h) +def abs_normalized_hedges_g( + control_subgroup: List[float], + comparison_subgroup: List[float], + interpret=False, +): + return np.abs( + normalized_hedges_g( + control_subgroup=control_subgroup, comparison_subgroup=comparison_subgroup + ) + ) + + def normalized_hedges_g( - subgroup_scores_dict: Dict[str, List[float]], - control_subgroup_types: List[str], - comparison_subgroup_types: List[str], + control_subgroup: List[float], + comparison_subgroup: List[float], interpret=False, ): """Hedge's g effect size between mean of two samples, normalized to interval [-1,1]. Better than Cohen's d for small sample sizes. @@ -3149,31 +3038,23 @@ def normalized_hedges_g( Takes into account the variances within the samples, not just the means. Args: - subgroup_scores_dict: dict where keys are subgroup types and values are lists of instance scores. - control_subgroup_types: list of subgroup types (potential keys of subgroup_scores_dict) that are the control (baseline) group - comparison_subgroup_types: list of subgroup types (potential keys of subgroup_scores_dict) that are the group + control_subgroup: list of scores of instances that belong to the control (baseline) subgroup + comparison_subgroup: list of scoresof the instances that belong to the comparison subgroup -- the subgroup to be compared to the control group. interpret: boolean, whether to interpret the significance of the score or not Returns: float score between -1 and 1, and a string interpretation if interpret=True """ - ( - subgroup_scores_dict, - control_subgroup_types, - comparison_subgroup_types, - ) = validate_subgroup_types( - subgroup_scores_dict, control_subgroup_types, comparison_subgroup_types - ) - - # combine all scores from each label (if there are more than 1 in each group) into a list - group_scores_list = [ - np.concatenate( - [subgroup_scores_dict[subgroup_name] for subgroup_name in name_list] - ) - for name_list in [control_subgroup_types, comparison_subgroup_types] + no_nan_control_subgroup = [ + score for score in control_subgroup if not np.isnan(score) + ] + no_nan_comparison_subgroup = [ + score for score in comparison_subgroup if not np.isnan(score) ] - group_n = [len(scores) for scores in group_scores_list] + group_scores_list = [no_nan_control_subgroup, no_nan_comparison_subgroup] + + group_n = [len(no_nan_control_subgroup), len(no_nan_comparison_subgroup)] if any(nn == 0 for nn in group_n) or all(nn <= 1 for nn in group_n): # if at least one sample size is 0 for one type, no comparison can be made at all # if both sample sizes are 1, then the denominator is undefined since divide by n1 + n2 - 2 @@ -3181,7 +3062,7 @@ def normalized_hedges_g( g, norm_g = np.nan, np.nan else: # otherwise, calculate the variances - group_mean = [mean(scores) for scores in group_scores_list] + group_mean = [mean(no_nan_control_subgroup), mean(no_nan_comparison_subgroup)] # sample variance with 1 degree of freedom (denominator n-1); if n=1, return 0 since otherwise throws an error group_var = [ 0.0 if nn == 1 else np.var(scores, ddof=1) @@ -3217,306 +3098,269 @@ def normalized_hedges_g( return norm_g, interpret_effect_size(g) -def mean_subgroup_score( - subgroup_scores_dict: Dict[str, List], subgroup_types: List[str] -): - """Return the mean instance score for a subset (possibly a single type) of variants (not a comparison). - - Args: - subgroup_scores_dict: dict where keys are subgroup types and values are lists of instance scores. - subgroup_types: the keys (subgroup types) for which the average will be computed. - - Returns: - float score - """ - subgroup_scores_dict, subgroup_types, _ = validate_subgroup_types( - subgroup_scores_dict, subgroup_types, [] - ) - - # combine all desired subgroup scores - score_list = np.concatenate( - [subgroup_scores_dict[subgroup_name] for subgroup_name in subgroup_types] - ) - if len(score_list) == 0: - # no scores to use - return np.nan - return mean(score_list) - - # metrics using mean reduction class GroupMeanAccuracy(Accuracy): - reduction_map = {"group_mean": {"agg_func": ["mean", nan_mean, False]}} + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": False, + } class FixedGroupMeanAccuracy(Accuracy): # the same as GroupMeanAccuracy, except the groups are fixed and are resampled together - reduction_map = {"group_mean": {"agg_func": ["mean", nan_mean, True]}} + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } # same as above, now using StringContainment class GroupMeanStringContainment(StringContainment): - reduction_map = {"group_mean": {"agg_func": ["mean", nan_mean, False]}} + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": False, + } class FixedGroupMeanStringContainment(StringContainment): # the same as GroupMeanStringContainment, except the groups are fixed and are resampled together - reduction_map = {"group_mean": {"agg_func": ["mean", nan_mean, True]}} + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } # take only the (fixed) group mean of baseline or other (paraphrases) scores class FixedGroupMeanBaselineAccuracy(Accuracy): - subgroup_column = "variant_type" - # take mean of "original" variants only - reduction_map = { - "group_mean": { - "agg_func": [ - "mean_baseline", - lambda scd: mean_subgroup_score( - subgroup_scores_dict=scd, subgroup_types=["original"] - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "mean_baseline", + "aggregating_function": InstanceMetric.prepare_for_subgroup_score( + subgroup_column="task_data/variant_type", + score_aggregator=nan_mean, + subgroup_types=["original"], + ), } class FixedGroupMeanParaphraseAccuracy(Accuracy): - subgroup_column = "variant_type" - # take mean of "paraphrase" variants only - reduction_map = { - "group_mean": { - "agg_func": [ - "mean_paraphrase", - lambda scd: mean_subgroup_score( - subgroup_scores_dict=scd, subgroup_types=["paraphrase"] - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "mean_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_subgroup_score( + subgroup_column="task_data/variant_type", + score_aggregator=nan_mean, + subgroup_types=["paraphrase"], + ), } # same as above but using StringContainment class FixedGroupMeanBaselineStringContainment(StringContainment): - subgroup_column = "variant_type" - # take mean of "original" variants only - reduction_map = { - "group_mean": { - "agg_func": [ - "mean_baseline", - lambda scd: mean_subgroup_score( - subgroup_scores_dict=scd, subgroup_types=["original"] - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "mean_baseline", + "aggregating_function": InstanceMetric.prepare_for_subgroup_score( + subgroup_column="task_data/variant_type", + score_aggregator=nan_mean, + subgroup_types=["original"], + ), } class FixedGroupMeanParaphraseStringContainment(StringContainment): - subgroup_column = "variant_type" - # take mean of "paraphrase" variants only - reduction_map = { - "group_mean": { - "agg_func": [ - "mean_paraphrase", - lambda scd: mean_subgroup_score( - subgroup_scores_dict=scd, subgroup_types=["paraphrase"] - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "mean_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_subgroup_score( + subgroup_column="task_data/variant_type", + score_aggregator=nan_mean, + subgroup_types=["paraphrase"], + ), } # using PDR class FixedGroupPDRParaphraseAccuracy(Accuracy): - subgroup_column = "variant_type" - reduction_map = { - "group_mean": { - "agg_func": [ - "pdr_paraphrase", - lambda scd: performance_drop_rate( - subgroup_scores_dict=scd, - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "pdr_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_control_reference_score( + subgroup_column="task_data/variant_type", + control_subgroup_types=["original"], + comparison_subgroup_types=["paraphrase"], + score_calculator=performance_drop_rate, + ), } class FixedGroupPDRParaphraseStringContainment(StringContainment): - subgroup_column = "variant_type" - reduction_map = { - "group_mean": { - "agg_func": [ - "pdr_paraphrase", - lambda scd: performance_drop_rate( - subgroup_scores_dict=scd, - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "pdr_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_control_reference_score( + subgroup_column="task_data/variant_type", + control_subgroup_types=["original"], + comparison_subgroup_types=["paraphrase"], + score_calculator=performance_drop_rate, + ), } class GroupMeanTokenOverlap(TokenOverlap): - reduction_map = { - "group_mean": { - "agg_func": ["mean", nan_mean, False], - "score_fields": ["f1", "precision", "recall"], - } + score_names = ["f1", "precision", "recall"] + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": False, } # using Cohens's h for proportions class FixedGroupNormCohensHParaphraseAccuracy(Accuracy): - subgroup_column = "variant_type" - reduction_map = { - "group_mean": { - "agg_func": [ - "norm_cohens_h_paraphrase", - lambda scd: normalized_cohens_h( - subgroup_scores_dict=scd, - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "norm_cohens_h_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_control_reference_score( + subgroup_column="task_data/variant_type", + control_subgroup_types=["original"], + comparison_subgroup_types=["paraphrase"], + score_calculator=normalized_cohens_h, + ), } class FixedGroupNormCohensHParaphraseStringContainment(StringContainment): - subgroup_column = "variant_type" - reduction_map = { - "group_mean": { - "agg_func": [ - "norm_cohens_h_paraphrase", - lambda scd: normalized_cohens_h( - subgroup_scores_dict=scd, - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "norm_cohens_h_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_control_reference_score( + subgroup_column="task_data/variant_type", + control_subgroup_types=["original"], + comparison_subgroup_types=["paraphrase"], + score_calculator=normalized_cohens_h, + ), } # using Hedges' g (takes into account internal variation in group scores) class FixedGroupNormHedgesGParaphraseAccuracy(Accuracy): - subgroup_column = "variant_type" - reduction_map = { - "group_mean": { - "agg_func": [ - "norm_hedges_g_paraphrase", - lambda scd: normalized_hedges_g( - subgroup_scores_dict=scd, - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "norm_hedges_g_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_control_reference_score( + subgroup_column="task_data/variant_type", + control_subgroup_types=["original"], + comparison_subgroup_types=["paraphrase"], + score_calculator=normalized_hedges_g, + ), } class FixedGroupNormHedgesGParaphraseStringContainment(StringContainment): - subgroup_column = "variant_type" - reduction_map = { - "group_mean": { - "agg_func": [ - "norm_hedges_g_paraphrase", - lambda scd: normalized_hedges_g( - subgroup_scores_dict=scd, - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "norm_hedges_g_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_control_reference_score( + subgroup_column="task_data/variant_type", + control_subgroup_types=["original"], + comparison_subgroup_types=["paraphrase"], + score_calculator=normalized_hedges_g, + ), } # for above metrics, take absolute value of group score first; this measures variation in either direction class FixedGroupAbsvalNormCohensHParaphraseAccuracy(Accuracy): - subgroup_column = "variant_type" - reduction_map = { - "group_mean": { - "agg_func": [ - "absval_norm_cohens_h_paraphrase", - lambda scd: np.abs( - normalized_cohens_h( - subgroup_scores_dict=scd, - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - ) - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "absval_norm_cohens_h_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_control_reference_score( + subgroup_column="task_data/variant_type", + control_subgroup_types=["original"], + comparison_subgroup_types=["paraphrase"], + score_calculator=abs_normalized_cohens_h, + ), } class FixedGroupAbsvalNormCohensHParaphraseStringContainment(StringContainment): - subgroup_column = "variant_type" - reduction_map = { - "group_mean": { - "agg_func": [ - "absval_norm_cohens_h_paraphrase", - lambda scd: np.abs( - normalized_cohens_h( - subgroup_scores_dict=scd, - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - ) - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "absval_norm_cohens_h_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_control_reference_score( + subgroup_column="task_data/variant_type", + control_subgroup_types=["original"], + comparison_subgroup_types=["paraphrase"], + score_calculator=abs_normalized_cohens_h, + ), } class FixedGroupAbsvalNormHedgesGParaphraseAccuracy(Accuracy): - subgroup_column = "variant_type" - reduction_map = { - "group_mean": { - "agg_func": [ - "absval_norm_hedges_g_paraphrase", - lambda scd: np.abs( - normalized_hedges_g( - subgroup_scores_dict=scd, - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - ) - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "absval_norm_hedges_g_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_control_reference_score( + subgroup_column="task_data/variant_type", + control_subgroup_types=["original"], + comparison_subgroup_types=["paraphrase"], + score_calculator=abs_normalized_hedges_g, + ), } class FixedGroupAbsvalNormHedgesGParaphraseStringContainment(StringContainment): - subgroup_column = "variant_type" - reduction_map = { - "group_mean": { - "agg_func": [ - "absval_norm_hedges_g_paraphrase", - lambda scd: np.abs( - normalized_hedges_g( - subgroup_scores_dict=scd, - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - ) - ), - True, - ], - } + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": True, + } + aggregating = { + "aggregating_function_name": "absval_norm_hedges_g_paraphrase", + "aggregating_function": InstanceMetric.prepare_for_control_reference_score( + subgroup_column="task_data/variant_type", + control_subgroup_types=["original"], + comparison_subgroup_types=["paraphrase"], + score_calculator=abs_normalized_hedges_g, + ), } @@ -3565,14 +3409,18 @@ def compute( class BinaryAccuracy(InstanceMetric): """Calculate accuracy for a binary task, using 0.5 as the threshold in the case of float predictions.""" - reduction_map = {"mean": ["accuracy_binary"]} + grouping = None + score_names = ["accuracy_binary"] main_score = "accuracy_binary" ci_scores = ["accuracy_binary"] threshold = 0.5 prediction_type = "Union[float,int]" single_reference_per_prediction = True - + aggregating = { + "aggregating_function_name": "mean", + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } def _validate_reference(self, reference): super()._validate_reference(reference) assert reference[0] in [ @@ -3580,6 +3428,13 @@ def _validate_reference(self, reference): 1, ], f"all references of {self.main_score} must by 0 or 1" + def compute( + self, references: List[Any], prediction: Any, task_data: List[Dict] + ) -> dict: + float_prediction = to_float_or_default(prediction) + prediction = str(int(float_prediction > self.threshold)) + references = ["1"] if references[0].lower() in self.pos_classes else ["0"] + def compute( self, references: List[float], prediction: float, task_data: List[Dict] ) -> dict: diff --git a/tests/library/test_metrics.py b/tests/library/test_metrics.py index 966d2418a..67404339d 100644 --- a/tests/library/test_metrics.py +++ b/tests/library/test_metrics.py @@ -38,6 +38,7 @@ KendallTauMetric, LlamaIndexCorrectness, MaxAccuracy, + MinAccuracy, NormalizedSacrebleu, Perplexity, PrecisionBinary, @@ -191,38 +192,37 @@ def test_accuracy(self): for output, target in zip(outputs, instance_targets): self.assertDictEqual(output["score"]["instance"], target) - def test_accuracy_max_aggregation(self): - metric = MaxAccuracy() - + def test_accuracy_max_min_aggregation(self): predictions = ["A", "B", "C"] references = [["B", "C"], ["A"], ["B", "C"]] - outputs = apply_metric( - metric=metric, predictions=predictions, references=references - ) + for number, metric in enumerate([MaxAccuracy(), MinAccuracy()]): + outputs = apply_metric( + metric=metric, predictions=predictions, references=references + ) - expected_global_result = { - "accuracy": 1, - "score": 1, - "score_name": "accuracy", - } + expected_global_result = { + "accuracy": 1 if number == 0 else 0, + "score": 1 if number == 0 else 0, + "score_name": "accuracy", + } - global_result = outputs[0]["score"]["global"].copy() - # Only check the keys that are expected, i.e. exist in expected_global_result - global_result = { - key: value - for key, value in global_result.items() - if key in expected_global_result - } - self.assertDictEqual(global_result, expected_global_result) + global_result = outputs[0]["score"]["global"].copy() + # Only check the keys that are expected, i.e. exist in expected_global_result + global_result = { + key: value + for key, value in global_result.items() + if key in expected_global_result + } + self.assertDictEqual(global_result, expected_global_result) - instance_targets = [ - {"accuracy": 0.0, "score": 0.0, "score_name": "accuracy"}, - {"accuracy": 0.0, "score": 0.0, "score_name": "accuracy"}, - {"accuracy": 1.0, "score": 1.0, "score_name": "accuracy"}, - ] - for output, target in zip(outputs, instance_targets): - self.assertDictEqual(output["score"]["instance"], target) + instance_targets = [ + {"accuracy": 0.0, "score": 0.0, "score_name": "accuracy"}, + {"accuracy": 0.0, "score": 0.0, "score_name": "accuracy"}, + {"accuracy": 1.0, "score": 1.0, "score_name": "accuracy"}, + ] + for output, target in zip(outputs, instance_targets): + self.assertDictEqual(output["score"]["instance"], target) def test_f1_micro(self): metric = F1Micro() @@ -923,17 +923,11 @@ def test_grouped_instance_metrics(self): def test_grouped_instance_metric_errors(self): """Test certain value and assertion error raises for grouped instance metrics (with group_mean reduction).""" - from dataclasses import field - from statistics import mean - from typing import List class NoAggFuncReduction(Accuracy): - implemented_reductions: List[str] = field( - default_factory=lambda: ["mean", "group_mean", "some_other_func"] - ) - reduction_map = {"some_other_func": {"agg_func": ["mean", mean, False]}} + aggregating = {"aggregating_function_name": "unknown"} - with self.assertRaises(ValueError): + with self.assertRaises(AssertionError): # should raise error because no aggregation_function will be defined, since only mean and group_mean are implemented metric = NoAggFuncReduction() apply_metric( @@ -944,7 +938,7 @@ class NoAggFuncReduction(Accuracy): ) class NoAggFunc(Accuracy): - reduction_map = {"group_mean": {"func": ["mean", mean]}} + aggregating = 9 with self.assertRaises(AssertionError): # should raise error because no "agg_func" field in group_mean @@ -957,7 +951,10 @@ class NoAggFunc(Accuracy): ) class NoCallableAggFunc(Accuracy): - reduction_map = {"group_mean": {"agg_func": ["mean", "some string", False]}} + aggregating = { + "aggregating_function_name": "no_callable", + "aggregating_function": 9, + } with self.assertRaises(AssertionError): # should raise error because second field of agg_func should be callable @@ -970,7 +967,10 @@ class NoCallableAggFunc(Accuracy): ) class NoBooleanGrouping(Accuracy): - reduction_map = {"group_mean": {"agg_func": ["mean", mean, 1]}} + grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": 12.5, + } with self.assertRaises(AssertionError): # should raise error because third field in agg_func is not boolean @@ -1259,15 +1259,8 @@ def _test_grouped_instance_confidence_interval( task_data=GROUPED_INSTANCE_ADDL_INPUTS, ) # get first element of reduction_map values - reduction_params = next(iter(metric.reduction_map.values())) - prefix = "fixed_group" if reduction_params["agg_func"][2] else "group" - group_score_name = "_".join( - [ - prefix, - metric.reduction_map["group_mean"]["agg_func"][0], - metric.main_score, - ] - ) + prefix = metric.prefix + group_score_name = prefix + metric.main_score if expected_global_result is None: expected_global_result = { @@ -1285,7 +1278,7 @@ def _test_grouped_instance_confidence_interval( score_value, expected_global_result[score_name], places=5, - msg=f"{group_score_name} score mismatch for {metric.__class__.__name__}, got {expected_global_result[score_name]} but expected {score_value}", + msg=f"{score_name} score mismatch for {metric.__class__.__name__}, expected {expected_global_result[score_name]} but got {score_value}", ) else: # An output score that is not expected From e79e0022e888d3bd2af59d50dbeec968031a9a3f Mon Sep 17 00:00:00 2001 From: dafnapension Date: Wed, 1 May 2024 23:25:16 +0300 Subject: [PATCH 03/12] further componentized grouping, filtering and control-comparison, and moved to MetricWithConfidence on the way to expand to global Signed-off-by: dafnapension --- src/unitxt/metrics.py | 436 +++++++++++++++++++++++------------------- 1 file changed, 236 insertions(+), 200 deletions(-) diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index 6be255304..ea7e6c7aa 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -207,6 +207,41 @@ class MetricWithConfidenceInterval(Metric): confidence_level: float = 0.95 ci_scores: List[str] = None + grouping: dict = None + # when grouping is not None, aggregation is done over groups -- splits of the stream of the instance, + # and then averaged over the groups aggregated results. + # when not None, it must consist of two fields: + # "group_by_field" which specifies the field in the instance whose value determines the group to which the instance belongs. + # example: "task_data/group_id" + # the second field of grouping, "ci_samples_from_groups_scores", is a boolean specifying whether resampling should be + # done from the individual groups' scores (True), as if each group is represented by one instance whose score instance + # is the group's aggregated score, or from the whole stream (False), where each resample is then split to + # groups, the score of which is then computed, and finally averaged with the other groups' scores. + + aggregating: dict = None + # How to yield one score, float, from a list of instances: either the whole stream or one group. + # For InstanceMetric, this aggregation is over the instance scores, already sitting in each instance, in subfield + # instance["score"]["instance"], which is a dict mapping score_name to (instance) score value. + # Tyically, to be overridden by the subclasses. If None, then for InstanceMetric - the default of average_item_scores, + # and for GlobalMetric -- this is the metric itself. + # If not set by subclasses, it is set here to { + # "aggregating_function_name": "mean", + # "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + # } + + subgroup_filtering: dict = ( + None # {"subgroup_column": str, "subgroup_types": List[str]} + ) + # The stream-or-group to be aggregated over, is first filtered, maintaining only instances in which the + # field specified as "subgroup_column" contain a value that is a member of "subgroup_types". + # Useful when the user is only interested in these. + + control_comparison: dict = None # {"subgroup_column": str, "control_subgroup_types": List[str], "comparison_subgroup_types": List[str], "control_comparison_score_calculator": callable[List[float], List[float] -> float]} + # The scores from the instances, of the whole-stream-or-group in which the value sitting in field "subgroup_column" + # belongs to "countrol_subgroup_types" are gathered into a list of floats, and similarly for the scores from the + # instances that belong to the comparison group, and then these two lists are fed into the callable specified in + # "control_comparison_score_calculator", which returns the (global) score for the whole-stream-or-group. + @staticmethod def new_random_generator(): # The np.random.default_rng expects a 32-bit int, while hash(..) can return a 64-bit integer. @@ -421,6 +456,111 @@ def metric(sample_refs, sample_preds, sample_task_data): result[f"{score_name}_ci_high"] = ci.high return result + def score_groups_globally( + self, instances: List[Dict[str, Any]], score_names: Optional[List[str]] = None + ) -> dict: + if self.grouping is None: + grouped_instances = {"all": instances} + else: + grouped_instances = defaultdict(list) + for instance in instances: + try: + group_name = dict_get(instance, self.grouping["group_by_field"]) + except Exception as e: + raise ValueError( + f"grouping input arg is not None, grouping is to be empoloyed, however instance {instance} does not contain subfield '{self.grouping['group_by_field']}'" + ) from e + grouped_instances[group_name].append(instance) + # instances are now grouped by task_data/group_id (generally: by self.grouping["by field"]), + # if self.grouping is not None, else - all instance make one group named 'all' + # continue to calculate the global score for each group (!) first: + # build the global score for each group, (potentially the only group called 'all') + + if self.subgroup_filtering: + for group_name, group in grouped_instances.items(): + filtered_group = [] + for instance in group: + try: + subgroup_type = dict_get( + instance, self.subgroup_filtering["subgroup_column"] + ) + except Exception as e: + raise ValueError( + f"subgroup_filtering input arg is not None, however instance {instance} does not contain subfield '{self.subgroup_filtering['subgroup_column']}'" + ) from e + if subgroup_type in self.subgroup_filtering["subgroup_types"]: + filtered_group.append(instance) + grouped_instances[group_name] = filtered_group + + # control_comparison: dict = None #{"subgroup_column": str, "control_subgroup_types": List[str], "comparison_subgroup_types": List[str], "control_comparison_score_calculator": callable[float, float -> float]} + if self.control_comparison: + subgroup_column = self.control_comparison["subgroup_column"] + for group_name, group in grouped_instances.items(): + control_group = [] + comparison_group = [] + for instance in group: + try: + subgroup_type = dict_get(instance, subgroup_column) + except Exception as e: + raise ValueError( + f"control_comparison input arg is not None, however instance {instance} does not contain subfield '{subgroup_column}'" + ) from e + if ( + subgroup_type + in self.control_comparison["control_subgroup_types"] + ): + control_group.append(instance) + elif ( + subgroup_type + in self.control_comparison["comparison_subgroup_types"] + ): + comparison_group.append(instance) + grouped_instances[group_name] = { + "control": control_group, + "comparison": comparison_group, + } + + groups_global_scores = {} + for group_name, group in grouped_instances.items(): + groups_global_scores[group_name] = {} + if isinstance(self, InstanceMetric): + for score_name in score_names: + if isinstance(group, list): # not split to control and comparison + groups_global_scores[group_name][score_name] = self.aggregating[ + "aggregating_function" + ](instances=group, score_name=score_name) + else: + control_scores = [ + instance["score"]["instance"][score_name] + for instance in group["control"] + ] + comparison_scores = [ + instance["score"]["instance"][score_name] + for instance in group["comparison"] + ] + groups_global_scores[group_name][ + score_name + ] = self.control_comparison[ + "control_comparison_score_calculator" + ](control_scores, comparison_scores) + elif isinstance(self, GlobalMetric): + raise ValueError( + "What are you doing here, nowhere in GlobalMetric is this method invoked" + ) + elif isinstance(self, BulkInstanceMetric): + raise ValueError( + "What are you doing here, nowhere in BulkInstanceMetric is this method invoked" + ) + else: + raise ValueError( + f"Unrecognized extension of MetricWithConfidence: {type(self)}" + ) + + # for each score_name in score_names, each group now has a score, computed through its subgroups, if applicable. + # the score sits in the group's own global_score (only of the group), named score_name (as the name of the score in + # the ["score"]["instance"] section of the instances + return groups_global_scores + class GlobalMetric(SingleStreamOperator, MetricWithConfidenceInterval): """A class for computing metrics that require joint calculations over all instances and are not just aggregation of scores of individuals instances. @@ -637,21 +777,21 @@ class InstanceMetric(SingleStreamOperator, MetricWithConfidenceInterval): User can specify one of these already implemented aggregating function, or introduce a new one per their need, and specify it via input argument 'aggregating' as detailed below. - InstanceMetric facilitates a grouped aggregation: + Aggregation can be subject any of the following variations, or both (or none, of course) + When grouping input arg is not none, the aggregation is done in a grouped manner: The instances are split to groups according to the value sitting in a field whose name is specified by the user, typically: "task_data/group_id". Then, the input aggregating function is applied to each group separately, yielding group_score for each group, and the global score that is stored in each instance of the stream, is the average over these group_score. To this end, the user specifies the 'grouping' input argument, as detailed below. - Facilitating a special variation of aggregation, InstanceMetric offers an easy expression of an aggregation - (over the whole stream or each group, in the latter case the final score is averaged over the groups) - driven by first (further) splitting the list to be aggregated over (again: the whole stream or a group) + Aggregation over the whole stream, or any group (as applicable) can be driven by + first (further) splitting the list to be aggregated over (again: the whole stream or a group) to sub-lists, by the value sitting in another instance field specified by the user, and then either the aggregation is only carried over a specific set of sublists (because the user is only interested in them), or first carried over one set of sublists, and then over a second set of these sublists, and the final score of the group-or-whole-stream is set to be the ratio between these two results. - The expression of such type aggregating functions is detailed below, for input argument 'aggregating' + The expression of such type aggregating functions is detailed for input args subgroup_filtering, and control_comparison Users are encouraged to write an extension of InstanceMetric and add to it any aggregating function they see fit, as demonstrated, for example, in class MinAccuracy. @@ -673,35 +813,6 @@ class InstanceMetric(SingleStreamOperator, MetricWithConfidenceInterval): # compatibility -- reflecting the other input args for aggregating. to_score_names: List[str] = None - # when grouping is not None, aggregation is done over groups -- splits of the stream of the instance, - # and then averaged over the groups aggregated results. - # when not None, it must consist of two fields: - # "group_by_field" which specifies the field in the instance whose value determines the group to which the instance belongs. - # example: "task_data/group_id" - # the second field of grouping, "ci_samples_from_groups_scores", is a boolean specifying whether resampling should be - # done from the individual groups' scores (True), as if each group is represented by one instance whose score instance - # is the group's aggregated score, or from the whole stream (False), where each resample is then split to - # groups, the score of which is then computed, and finally averaged with the other groups' scores. - grouping: dict = None - - # how to aggregate over the scores in the instances. Each and every score_name in score_names is aggregated (over - # the instances in the stream or group) by this aggregating function. - # Potentially, to be overridden by the subclasses. - aggregating: dict = None - # if not set by subclasses, it is set here to { - # "aggregating_function_name": "mean", - # "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - # } - - # another example: the user specifies a callable that aggregates a list of floats into one float. - # also specified is a name of a field, hereafter named subgroup_column, whose value indicates the - # sub-list into which each instance (of the stream or group) belongs. Finally, one or two sets of values - # of that subgroup_columns are specified, by which one or two sets of instances (from the stream or group) - # are identified. If one set, named the subgroup, the aggregation is only carried over that subgroup (of interest - # to the user), and the result of that aggregation becomes the score of the group-or-stream. - # If two sets, named control and comparison, the ratio between these two aggregations is set to be the score - # of the group-or-stream. - reference_field: str = NonPositionalField(default="references") prediction_field: str = NonPositionalField(default="prediction") @@ -763,38 +874,6 @@ def prepare(self): ] super().prepare() - def score_groups_globally( - self, instances: List[Dict[str, Any]], score_names: List[str] - ) -> dict: - if self.grouping is None: - grouped_instances = {"all": instances} - else: - grouped_instances = defaultdict(list) - for instance in instances: - try: - group_name = dict_get(instance, self.grouping["group_by_field"]) - except Exception as e: - raise ValueError( - f"Reduction type is group_mean, grouping is to be empoloyed, however instance {instance} does not contain subfield '{self.grouping['group_by_field']}'" - ) from e - grouped_instances[group_name].append(instance) - # instances are now grouped by task_data/group_id (generally: by self.grouping["by field"]), - # if self.grouping is not None, else - all instance make one group named 'all' - # continue to calculate the global score for each group (!) first: - # build the global score for each group, (potentially the only group called 'all') - groups_global_scores = {} - for group_name, group in grouped_instances.items(): - groups_global_scores[group_name] = {} - for score_name in score_names: - groups_global_scores[group_name][score_name] = self.aggregating[ - "aggregating_function" - ](instances=group, score_name=score_name) - - # for each score_name in score_names, each group now has a score, computed through its subgroups, if applicable. - # the score sits in the group's own global_score (only of the group), named score_name (as the name of the score in - # the ["score"]["instance"] section of the instances - return groups_global_scores - def average_groups_global_scores( self, instances: List[Dict[str, Any]], score_name: str ) -> float: @@ -893,60 +972,6 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato yield from instances - @staticmethod - def prepare_for_subgroup_score( - subgroup_column: str, subgroup_types: List[str], score_aggregator: callable - ) -> callable: - def subgroup_column_score(instances: List[Dict[str, Any]], score_name: str): - needed_scores = [] - for instance in instances: - try: - subgroup_value = dict_get(instance, subgroup_column) - except ValueError as ve: - raise ValueError( - f"subgroup_column, {subgroup_column}, is specified, but is not found in instance {instance}" - ) from ve - if subgroup_value in subgroup_types: - needed_scores.append(instance["score"]["instance"][score_name]) - - return score_aggregator(needed_scores) if len(needed_scores) > 0 else np.nan - - return subgroup_column_score - - @staticmethod - # score_calculator receives two lists of float scores, named control_subgroup, and comparison_subgroup, - # and returns one float from them - # the method does not check the mutual exclusion of control_subgroup_types vs comparison_subgroup_types - # and arbitrarily append to the control group an instance whose type belong to both (if any such instance) - def prepare_for_control_reference_score( - subgroup_column: str, - control_subgroup_types: List[str], - comparison_subgroup_types: List[str], - score_calculator: callable, - ) -> callable: - def subgroup_column_control_comparison_score( - instances: List[Dict[str, Any]], score_name: str - ): - needed_controls = [] - needed_comparisons = [] - for instance in instances: - try: - subgroup_value = dict_get(instance, subgroup_column) - except ValueError as ve: - raise ValueError( - f"subgroup_column, {subgroup_column}, is specified, but is not found in instance {instance}" - ) from ve - if subgroup_value in control_subgroup_types: - needed_controls.append(instance["score"]["instance"][score_name]) - elif subgroup_value in comparison_subgroup_types: - needed_comparisons.append(instance["score"]["instance"][score_name]) - - return score_calculator( - control_subgroup=needed_controls, comparison_subgroup=needed_comparisons - ) - - return subgroup_column_control_comparison_score - def compute_instance_scores( self, stream: Stream, stream_name: Optional[str] = None ): @@ -3138,11 +3163,11 @@ class FixedGroupMeanBaselineAccuracy(Accuracy): } aggregating = { "aggregating_function_name": "mean_baseline", - "aggregating_function": InstanceMetric.prepare_for_subgroup_score( - subgroup_column="task_data/variant_type", - score_aggregator=nan_mean, - subgroup_types=["original"], - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + subgroup_filtering = { + "subgroup_column": "task_data/variant_type", + "subgroup_types": ["original"], } @@ -3153,11 +3178,11 @@ class FixedGroupMeanParaphraseAccuracy(Accuracy): } aggregating = { "aggregating_function_name": "mean_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_subgroup_score( - subgroup_column="task_data/variant_type", - score_aggregator=nan_mean, - subgroup_types=["paraphrase"], - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + subgroup_filtering = { + "subgroup_column": "task_data/variant_type", + "subgroup_types": ["paraphrase"], } @@ -3167,13 +3192,14 @@ class FixedGroupMeanBaselineStringContainment(StringContainment): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } + aggregating = { "aggregating_function_name": "mean_baseline", - "aggregating_function": InstanceMetric.prepare_for_subgroup_score( - subgroup_column="task_data/variant_type", - score_aggregator=nan_mean, - subgroup_types=["original"], - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + subgroup_filtering = { + "subgroup_column": "task_data/variant_type", + "subgroup_types": ["original"], } @@ -3184,11 +3210,11 @@ class FixedGroupMeanParaphraseStringContainment(StringContainment): } aggregating = { "aggregating_function_name": "mean_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_subgroup_score( - subgroup_column="task_data/variant_type", - score_aggregator=nan_mean, - subgroup_types=["paraphrase"], - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + subgroup_filtering = { + "subgroup_column": "task_data/variant_type", + "subgroup_types": ["paraphrase"], } @@ -3200,12 +3226,13 @@ class FixedGroupPDRParaphraseAccuracy(Accuracy): } aggregating = { "aggregating_function_name": "pdr_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_control_reference_score( - subgroup_column="task_data/variant_type", - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - score_calculator=performance_drop_rate, - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + control_comparison = { + "subgroup_column": "task_data/variant_type", + "control_subgroup_types": ["original"], + "comparison_subgroup_types": ["paraphrase"], + "control_comparison_score_calculator": performance_drop_rate, } @@ -3216,12 +3243,13 @@ class FixedGroupPDRParaphraseStringContainment(StringContainment): } aggregating = { "aggregating_function_name": "pdr_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_control_reference_score( - subgroup_column="task_data/variant_type", - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - score_calculator=performance_drop_rate, - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + control_comparison = { + "subgroup_column": "task_data/variant_type", + "control_subgroup_types": ["original"], + "comparison_subgroup_types": ["paraphrase"], + "control_comparison_score_calculator": performance_drop_rate, } @@ -3241,12 +3269,13 @@ class FixedGroupNormCohensHParaphraseAccuracy(Accuracy): } aggregating = { "aggregating_function_name": "norm_cohens_h_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_control_reference_score( - subgroup_column="task_data/variant_type", - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - score_calculator=normalized_cohens_h, - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + control_comparison = { + "subgroup_column": "task_data/variant_type", + "control_subgroup_types": ["original"], + "comparison_subgroup_types": ["paraphrase"], + "control_comparison_score_calculator": normalized_cohens_h, } @@ -3257,12 +3286,13 @@ class FixedGroupNormCohensHParaphraseStringContainment(StringContainment): } aggregating = { "aggregating_function_name": "norm_cohens_h_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_control_reference_score( - subgroup_column="task_data/variant_type", - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - score_calculator=normalized_cohens_h, - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + control_comparison = { + "subgroup_column": "task_data/variant_type", + "control_subgroup_types": ["original"], + "comparison_subgroup_types": ["paraphrase"], + "control_comparison_score_calculator": normalized_cohens_h, } @@ -3274,12 +3304,13 @@ class FixedGroupNormHedgesGParaphraseAccuracy(Accuracy): } aggregating = { "aggregating_function_name": "norm_hedges_g_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_control_reference_score( - subgroup_column="task_data/variant_type", - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - score_calculator=normalized_hedges_g, - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + control_comparison = { + "subgroup_column": "task_data/variant_type", + "control_subgroup_types": ["original"], + "comparison_subgroup_types": ["paraphrase"], + "control_comparison_score_calculator": normalized_hedges_g, } @@ -3290,12 +3321,13 @@ class FixedGroupNormHedgesGParaphraseStringContainment(StringContainment): } aggregating = { "aggregating_function_name": "norm_hedges_g_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_control_reference_score( - subgroup_column="task_data/variant_type", - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - score_calculator=normalized_hedges_g, - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + control_comparison = { + "subgroup_column": "task_data/variant_type", + "control_subgroup_types": ["original"], + "comparison_subgroup_types": ["paraphrase"], + "control_comparison_score_calculator": normalized_hedges_g, } @@ -3307,12 +3339,13 @@ class FixedGroupAbsvalNormCohensHParaphraseAccuracy(Accuracy): } aggregating = { "aggregating_function_name": "absval_norm_cohens_h_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_control_reference_score( - subgroup_column="task_data/variant_type", - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - score_calculator=abs_normalized_cohens_h, - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + control_comparison = { + "subgroup_column": "task_data/variant_type", + "control_subgroup_types": ["original"], + "comparison_subgroup_types": ["paraphrase"], + "control_comparison_score_calculator": abs_normalized_cohens_h, } @@ -3323,12 +3356,13 @@ class FixedGroupAbsvalNormCohensHParaphraseStringContainment(StringContainment): } aggregating = { "aggregating_function_name": "absval_norm_cohens_h_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_control_reference_score( - subgroup_column="task_data/variant_type", - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - score_calculator=abs_normalized_cohens_h, - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + control_comparison = { + "subgroup_column": "task_data/variant_type", + "control_subgroup_types": ["original"], + "comparison_subgroup_types": ["paraphrase"], + "control_comparison_score_calculator": abs_normalized_cohens_h, } @@ -3339,12 +3373,13 @@ class FixedGroupAbsvalNormHedgesGParaphraseAccuracy(Accuracy): } aggregating = { "aggregating_function_name": "absval_norm_hedges_g_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_control_reference_score( - subgroup_column="task_data/variant_type", - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - score_calculator=abs_normalized_hedges_g, - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + control_comparison = { + "subgroup_column": "task_data/variant_type", + "control_subgroup_types": ["original"], + "comparison_subgroup_types": ["paraphrase"], + "control_comparison_score_calculator": abs_normalized_hedges_g, } @@ -3355,12 +3390,13 @@ class FixedGroupAbsvalNormHedgesGParaphraseStringContainment(StringContainment): } aggregating = { "aggregating_function_name": "absval_norm_hedges_g_paraphrase", - "aggregating_function": InstanceMetric.prepare_for_control_reference_score( - subgroup_column="task_data/variant_type", - control_subgroup_types=["original"], - comparison_subgroup_types=["paraphrase"], - score_calculator=abs_normalized_hedges_g, - ), + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } + control_comparison = { + "subgroup_column": "task_data/variant_type", + "control_subgroup_types": ["original"], + "comparison_subgroup_types": ["paraphrase"], + "control_comparison_score_calculator": abs_normalized_hedges_g, } From 3da94a2700315be85522ef4b01ca8ae6e2419f37 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Thu, 2 May 2024 01:50:59 +0300 Subject: [PATCH 04/12] Expand grouping on to global metrics Signed-off-by: dafnapension --- src/unitxt/metrics.py | 124 +++++++++++++++++++++++----------- tests/library/test_metrics.py | 64 +++++++++++++++++- 2 files changed, 146 insertions(+), 42 deletions(-) diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index ea7e6c7aa..da8a9d4f1 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -7,7 +7,7 @@ from copy import deepcopy from dataclasses import field from statistics import mean -from typing import Any, Dict, Generator, List, Optional, Tuple +from typing import Any, Dict, Generator, List, Optional, Tuple, Union import evaluate import numpy @@ -218,17 +218,6 @@ class MetricWithConfidenceInterval(Metric): # is the group's aggregated score, or from the whole stream (False), where each resample is then split to # groups, the score of which is then computed, and finally averaged with the other groups' scores. - aggregating: dict = None - # How to yield one score, float, from a list of instances: either the whole stream or one group. - # For InstanceMetric, this aggregation is over the instance scores, already sitting in each instance, in subfield - # instance["score"]["instance"], which is a dict mapping score_name to (instance) score value. - # Tyically, to be overridden by the subclasses. If None, then for InstanceMetric - the default of average_item_scores, - # and for GlobalMetric -- this is the metric itself. - # If not set by subclasses, it is set here to { - # "aggregating_function_name": "mean", - # "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - # } - subgroup_filtering: dict = ( None # {"subgroup_column": str, "subgroup_types": List[str]} ) @@ -468,7 +457,7 @@ def score_groups_globally( group_name = dict_get(instance, self.grouping["group_by_field"]) except Exception as e: raise ValueError( - f"grouping input arg is not None, grouping is to be empoloyed, however instance {instance} does not contain subfield '{self.grouping['group_by_field']}'" + f"grouping input arg is not None, grouping is to be empoloyed, however instance {instance} does not contain subfield '{group_name}'" ) from e grouped_instances[group_name].append(instance) # instances are now grouped by task_data/group_id (generally: by self.grouping["by field"]), @@ -522,8 +511,8 @@ def score_groups_globally( groups_global_scores = {} for group_name, group in grouped_instances.items(): - groups_global_scores[group_name] = {} if isinstance(self, InstanceMetric): + groups_global_scores[group_name] = {} for score_name in score_names: if isinstance(group, list): # not split to control and comparison groups_global_scores[group_name][score_name] = self.aggregating[ @@ -544,12 +533,24 @@ def score_groups_globally( "control_comparison_score_calculator" ](control_scores, comparison_scores) elif isinstance(self, GlobalMetric): - raise ValueError( - "What are you doing here, nowhere in GlobalMetric is this method invoked" - ) + if isinstance(group, list): + if len(group) == 0: + groups_global_scores[group_name] = np.nan + else: + predictions, references, task_data, group = self.consume_stream( + group + ) + self._validate_references_and_prediction( + references, predictions + ) + groups_global_scores[group_name] = self._compute( + references=references, + predictions=predictions, + task_data=task_data, + ) elif isinstance(self, BulkInstanceMetric): raise ValueError( - "What are you doing here, nowhere in BulkInstanceMetric is this method invoked" + "What are you doing here? nowhere in BulkInstanceMetric is this method invoked" ) else: raise ValueError( @@ -561,6 +562,52 @@ def score_groups_globally( # the ["score"]["instance"] section of the instances return groups_global_scores + # currently: if invoked from InstanceMetric, score_name is not None, and result is float + # and if invoked from GlobalMetric, score_name is None, and result is dict + def average_groups_global_scores( + self, instances: List[Dict[str, Any]], score_name: Optional[str] = None + ) -> Union[float, Dict]: + groups_global_scores = self.score_groups_globally( + instances=instances, + score_names=[score_name] if score_name is not None else None, + ) + assert len(groups_global_scores) > 0, "Where have all the groups gone?" + if len(groups_global_scores) == 1: + return next(iter(groups_global_scores.values())) + + if score_name is not None: + return nan_mean( + [ + groups_global_scores[group_name][score_name] + for group_name in groups_global_scores + ] + ) + # score_name is None + result = defaultdict(list) + # average over the groups. Each group global score there is a dict, being the global_score + # computed for the group, or nan (if the group nullified or something). + # nan-s are excluded, because typically the averaging is via nan_mean + # so hereunder we average over the different fields of the dict, each field separately. + # for generatily we prepare a recursive averaging here, because some of the fields in that + # global score may have a value being a list (like rouge with use_aggregator = False) + for _, group_global_score in groups_global_scores.items(): + if isinstance(group_global_score, dict): + for k, v in group_global_score.items(): + if isinstance(v, str): + result[k] = v + else: + result[k].append(v) + else: + assert np.isnan( + group_global_score + ), "group global score should be either a dict or np.nan" + for k, v in result.items(): + if isinstance(v, list): + # v should be either a str or a list, either a list of float, or a list of lists of floats + result[k] = np.array(result[k]) + result[k] = np.nanmean(result[k], axis=0) + return result + class GlobalMetric(SingleStreamOperator, MetricWithConfidenceInterval): """A class for computing metrics that require joint calculations over all instances and are not just aggregation of scores of individuals instances. @@ -627,7 +674,10 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato instance["score"]["instance"].update(instance_score) self._validate_references_and_prediction(references, predictions) - result = self._compute(references, predictions, task_data) + if self.grouping or self.subgroup_filtering or self.control_comparison: + result = self.average_groups_global_scores(instances) + else: + result = self._compute(references, predictions, task_data) global_score.update(result) @@ -637,9 +687,9 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato ) global_score.update(confidence_interval) - for instance in instances: - instance["score"]["global"] = global_score - yield instance + # all instances link to same global_score dictionary object, + # no need to update each individually + yield from instances def _compute( self, @@ -813,6 +863,16 @@ class InstanceMetric(SingleStreamOperator, MetricWithConfidenceInterval): # compatibility -- reflecting the other input args for aggregating. to_score_names: List[str] = None + # How to yield one score, float, from a list of instances: either the whole stream or one group. + # For InstanceMetric, this aggregation is over the instance scores, already sitting in each instance, in subfield + # instance["score"]["instance"], which is a dict mapping score_name to (instance) score value. + # Tyically, to be overridden by the subclasses. If None, then for InstanceMetric - the default of average_item_scores, + # If not set by subclasses, it is set by InstanceMetric to { + # "aggregating_function_name": "mean", + # "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + # } + aggregating: dict = None + reference_field: str = NonPositionalField(default="references") prediction_field: str = NonPositionalField(default="prediction") @@ -874,22 +934,7 @@ def prepare(self): ] super().prepare() - def average_groups_global_scores( - self, instances: List[Dict[str, Any]], score_name: str - ) -> float: - groups_global_scores = self.score_groups_globally( - instances=instances, score_names=[score_name] - ) - return nan_mean( - [ - groups_global_scores[group_name][score_name] - for group_name in groups_global_scores - ] - ) - # flake8: noqa: C901 - # flake8: noqa: C408 - # flake8: noqa: C416 def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: instances, global_score = self.compute_instance_scores(stream) @@ -966,10 +1011,7 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato global_score.update(confidence_interval) - # finally, update all the instances with the global score now all computed: - for instance in instances: - instance["score"]["global"] = global_score - + # all instances point to this global_score, so no need to update anything in them yield from instances def compute_instance_scores( diff --git a/tests/library/test_metrics.py b/tests/library/test_metrics.py index 67404339d..482c1cef7 100644 --- a/tests/library/test_metrics.py +++ b/tests/library/test_metrics.py @@ -720,10 +720,72 @@ def test_rouge(self): global_target = 5 / 6 self.assertAlmostEqual(global_target, outputs[0]["score"]["global"]["score"]) + def test_rouge_grouping_and_filtering(self): + metric = Rouge() + references = [ + ["hello", "there"], + ["general kenobi", "general yoda"], + ["general kenobi", "general yoda"], + ["general kenobi", "general yoda"], + ["lieutenant dan", "lieutenant colonel"], + ] + predictions = [ + "hello there", + "general kenobi", + "general kenobi", + "general kenobi", + "forrest gump", + ] + task_data = [ + {"group_id": "group1"}, + {"group_id": "group2"}, + {"group_id": "group2"}, + {"group_id": "group2"}, + {"group_id": "group3"}, + ] + outputs = apply_metric( + metric=metric, + predictions=predictions, + references=references, + task_data=task_data, + ) + + self.assertAlmostEqual( + ((2 / 3) + 3 + 0) / 5, outputs[0]["score"]["global"]["score"] + ) + + metric.grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": False, + } + outputs = apply_metric( + metric=metric, + predictions=predictions, + references=references, + task_data=task_data, + ) + self.assertAlmostEqual( + (1 + (2 / 3) + 0) / 3, outputs[0]["score"]["global"]["score"] + ) + + metric.subgroup_filtering = { + "subgroup_column": "task_data/group_id", + "subgroup_types": ["group1", "group2"], + } + outputs = apply_metric( + metric=metric, + predictions=predictions, + references=references, + task_data=task_data, + ) + self.assertAlmostEqual( + (1 + (2 / 3)) / 2, outputs[0]["score"]["global"]["score"] + ) + def test_rouge_l(self): metric = Rouge( n_resamples=None, # disable confidence interval calculation which fails for this metric configuration - use_aggregator=False, + use_aggregator=False, # returns lists for scores, not any aggregation thereof rouge_types=["rougeL"], ) references = [["hello", "there"], ["general kenobi", "general yoda"]] From 9aa03faccba8278ca3ff5ecd5fabb6d19b2a9c81 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Fri, 3 May 2024 22:48:24 +0300 Subject: [PATCH 05/12] grouped ci for GlobalMetric Signed-off-by: dafnapension --- src/unitxt/metrics.py | 72 +++++++++++++++++++++++++++-------- tests/library/test_metrics.py | 32 ++++++++++++++++ 2 files changed, 88 insertions(+), 16 deletions(-) diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index da8a9d4f1..6bfb5d193 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -161,16 +161,24 @@ def get_metric_name(self): return self.__id__ return self.__class__.__name__ - def consume_stream(self, stream: Stream): + def consume_stream( + self, + stream: Stream, + references_field_name="references", + prediction_field_name="prediction", + task_data_field_name="additional_inputs", + ): references = [] predictions = [] additional_inputs = [] instances = [] for instance in stream: - references.append(instance["references"]) - predictions.append(instance["prediction"]) + references.append(instance[references_field_name]) + predictions.append(instance[prediction_field_name]) additional_inputs.append( - instance["additional_inputs"] if "additional_inputs" in instance else {} + instance[task_data_field_name] + if task_data_field_name in instance + else {} ) instances.append(instance) return predictions, references, additional_inputs, instances @@ -538,7 +546,7 @@ def score_groups_globally( groups_global_scores[group_name] = np.nan else: predictions, references, task_data, group = self.consume_stream( - group + stream=group, task_data_field_name="task_data" ) self._validate_references_and_prediction( references, predictions @@ -563,7 +571,7 @@ def score_groups_globally( return groups_global_scores # currently: if invoked from InstanceMetric, score_name is not None, and result is float - # and if invoked from GlobalMetric, score_name is None, and result is dict + # and if invoked from GlobalMetric, score_name is None, and result is dict. def average_groups_global_scores( self, instances: List[Dict[str, Any]], score_name: Optional[str] = None ) -> Union[float, Dict]: @@ -674,18 +682,50 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato instance["score"]["instance"].update(instance_score) self._validate_references_and_prediction(references, predictions) - if self.grouping or self.subgroup_filtering or self.control_comparison: - result = self.average_groups_global_scores(instances) - else: - result = self._compute(references, predictions, task_data) - + # if grouping is None, the whole stream is treated as a single group + result = self.average_groups_global_scores(instances=instances) global_score.update(result) - score_name = global_score["score_name"] - confidence_interval = self.compute_global_confidence_intervals( - references, predictions, task_data, score_name - ) - global_score.update(confidence_interval) + if self.ci_scores is not None: + groups_global_scores = self.score_groups_globally(instances=instances) + if ( + self.grouping + and self.grouping["ci_samples_from_groups_scores"] + and all( + ( + group_score is np.nan + or all( + isinstance(group_score[score_name], float) + for score_name in self.ci_scores + ) + ) + for group_score in groups_global_scores.values() + ) + ): + # a dict having just the "score" field, and in it -- just the "instance" section, + # and in that section: all the score_names whose values is the aggregation over that group. + # then sample from them, aggregating, over each sample, by simple average. + # can be done only over scores that are simple float. if a list of float (as with rouge with use_aggregator = False) + # can not then sort the sample by order of their scores, because their scores are lists, and not single float + # in the following exclude groups that score to np.nan because they are empty, rather than a dict + to_sample_from = [ + {"score": {"instance": groups_global_scores[group_name]}} + for group_name in groups_global_scores.keys() + if isinstance(groups_global_scores[group_name], dict) + ] + confidence_interval = self.score_based_confidence_interval( + instances=to_sample_from, + score_names=list(set(self.ci_scores)), + ci_score_prefix="fixed_group_", + aggregation_func=self.average_item_scores, + ) + else: + # todo: change to enable the CI employ the grouped version, and not the + # bare metric._compute + confidence_interval = self.compute_global_confidence_intervals( + references, predictions, task_data, score_name + ) + global_score.update(confidence_interval) # all instances link to same global_score dictionary object, # no need to update each individually diff --git a/tests/library/test_metrics.py b/tests/library/test_metrics.py index 482c1cef7..9c8385dc9 100644 --- a/tests/library/test_metrics.py +++ b/tests/library/test_metrics.py @@ -758,6 +758,7 @@ def test_rouge_grouping_and_filtering(self): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": False, } + metric.ci_scores = ["rougeL"] outputs = apply_metric( metric=metric, predictions=predictions, @@ -767,6 +768,30 @@ def test_rouge_grouping_and_filtering(self): self.assertAlmostEqual( (1 + (2 / 3) + 0) / 3, outputs[0]["score"]["global"]["score"] ) + self.assertAlmostEqual( + 0.34900897136393977, outputs[0]["score"]["global"]["rougeL_ci_low"] + ) + self.assertAlmostEqual( + 0.9333333333333332, outputs[0]["score"]["global"]["rougeL_ci_high"] + ) + + metric.grouping["ci_samples_from_groups_scores"] = True + outputs = apply_metric( + metric=metric, + predictions=predictions, + references=references, + task_data=task_data, + ) + self.assertAlmostEqual( + (1 + (2 / 3) + 0) / 3, outputs[0]["score"]["global"]["score"] + ) + self.assertAlmostEqual( + 0.0, outputs[0]["score"]["global"]["fixed_group_rougeL_ci_low"] + ) + self.assertAlmostEqual( + 0.8888888888888888, + outputs[0]["score"]["global"]["fixed_group_rougeL_ci_high"], + ) metric.subgroup_filtering = { "subgroup_column": "task_data/group_id", @@ -781,6 +806,13 @@ def test_rouge_grouping_and_filtering(self): self.assertAlmostEqual( (1 + (2 / 3)) / 2, outputs[0]["score"]["global"]["score"] ) + self.assertAlmostEqual( + 0.6666666666666666, + outputs[0]["score"]["global"]["fixed_group_rougeL_ci_low"], + ) + self.assertAlmostEqual( + 1.0, outputs[0]["score"]["global"]["fixed_group_rougeL_ci_high"] + ) def test_rouge_l(self): metric = Rouge( From a45e3deeb1a7caa3a3b829ccaae14d0d2bbeb385 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Sat, 4 May 2024 00:22:07 +0300 Subject: [PATCH 06/12] run ci for global metric also with ci_scores is None for backward compatibility Signed-off-by: dafnapension --- src/unitxt/metrics.py | 78 ++++++++++++++++++++++--------------------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index 6bfb5d193..9f61c469a 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -685,47 +685,49 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato # if grouping is None, the whole stream is treated as a single group result = self.average_groups_global_scores(instances=instances) global_score.update(result) + + # moving on to ci score_name = global_score["score_name"] - if self.ci_scores is not None: - groups_global_scores = self.score_groups_globally(instances=instances) - if ( - self.grouping - and self.grouping["ci_samples_from_groups_scores"] - and all( - ( - group_score is np.nan - or all( - isinstance(group_score[score_name], float) - for score_name in self.ci_scores - ) + groups_global_scores = self.score_groups_globally(instances=instances) + if ( + self.ci_scores is not None + and self.grouping + and self.grouping["ci_samples_from_groups_scores"] + and all( + ( + group_score is np.nan + or all( + isinstance(group_score[score_name], float) + for score_name in self.ci_scores ) - for group_score in groups_global_scores.values() ) - ): - # a dict having just the "score" field, and in it -- just the "instance" section, - # and in that section: all the score_names whose values is the aggregation over that group. - # then sample from them, aggregating, over each sample, by simple average. - # can be done only over scores that are simple float. if a list of float (as with rouge with use_aggregator = False) - # can not then sort the sample by order of their scores, because their scores are lists, and not single float - # in the following exclude groups that score to np.nan because they are empty, rather than a dict - to_sample_from = [ - {"score": {"instance": groups_global_scores[group_name]}} - for group_name in groups_global_scores.keys() - if isinstance(groups_global_scores[group_name], dict) - ] - confidence_interval = self.score_based_confidence_interval( - instances=to_sample_from, - score_names=list(set(self.ci_scores)), - ci_score_prefix="fixed_group_", - aggregation_func=self.average_item_scores, - ) - else: - # todo: change to enable the CI employ the grouped version, and not the - # bare metric._compute - confidence_interval = self.compute_global_confidence_intervals( - references, predictions, task_data, score_name - ) - global_score.update(confidence_interval) + for group_score in groups_global_scores.values() + ) + ): + # a dict having just the "score" field, and in it -- just the "instance" section, + # and in that section: all the score_names whose values is the aggregation over that group. + # then sample from them, aggregating, over each sample, by simple average. + # can be done only over scores that are simple float. if a list of float (as with rouge with use_aggregator = False) + # can not then sort the sample by order of their scores, because their scores are lists, and not single float + # in the following exclude groups that score to np.nan because they are empty, rather than a dict + to_sample_from = [ + {"score": {"instance": groups_global_scores[group_name]}} + for group_name in groups_global_scores.keys() + if isinstance(groups_global_scores[group_name], dict) + ] + confidence_interval = self.score_based_confidence_interval( + instances=to_sample_from, + score_names=list(set(self.ci_scores)), + ci_score_prefix="fixed_group_", + aggregation_func=self.average_item_scores, + ) + else: + # todo: change to enable the CI employ the grouped version, and not the + # bare metric._compute + confidence_interval = self.compute_global_confidence_intervals( + references, predictions, task_data, score_name + ) + global_score.update(confidence_interval) # all instances link to same global_score dictionary object, # no need to update each individually From b63aed6c7513da2d8200efcb619fddc17dcaa319 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Sat, 4 May 2024 11:49:32 +0300 Subject: [PATCH 07/12] tuned CI to run grouped evaluation when non fixed Signed-off-by: dafnapension --- src/unitxt/metrics.py | 38 ++++++++++++++++++++++------------- tests/library/test_metrics.py | 4 ++-- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index 9f61c469a..5f9c19471 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -403,12 +403,19 @@ def statistic(arr, axis): # arr is a 2d array where each row is a resampling, so we # iterate over the rows and compute the metric on each resampling def metric(sample_refs, sample_preds, sample_task_data): + insts = [ + { + "references": sample_ref, + "prediction": sample_pred, + "task_data": sample_taskd, + } + for (sample_ref, sample_pred, sample_taskd) in zip( + sample_refs, sample_preds, sample_task_data + ) + ] try: - return self._compute( - references=sample_refs, - predictions=sample_preds, - task_data=sample_task_data, - )["score"] + to_ret = self.average_groups_global_scores(instances=insts) + return to_ret["score"] except Exception as e: # this happens in edge cases, for example, when the sampling creates a # sample where all strings are empty and this fails bleu. @@ -682,7 +689,7 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato instance["score"]["instance"].update(instance_score) self._validate_references_and_prediction(references, predictions) - # if grouping is None, the whole stream is treated as a single group + # When grouping is None, the whole stream is treated as a single group result = self.average_groups_global_scores(instances=instances) global_score.update(result) @@ -704,12 +711,17 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato for group_score in groups_global_scores.values() ) ): - # a dict having just the "score" field, and in it -- just the "instance" section, - # and in that section: all the score_names whose values is the aggregation over that group. - # then sample from them, aggregating, over each sample, by simple average. - # can be done only over scores that are simple float. if a list of float (as with rouge with use_aggregator = False) - # can not then sort the sample by order of their scores, because their scores are lists, and not single float - # in the following exclude groups that score to np.nan because they are empty, rather than a dict + # From each group score, generate one dict having just the "score" field, and in it -- just the "instance" section, + # being the groups own global scores: all the score_names the value of each is the result of applying metric + # over the instances of that group. + # Then, sample from these instances, then yield a score for each sample by a simple average of these instances' scores + # (independent of metric, which was only relevant for the group's own global score), via np.nanmean, axis=0, + # and finally, per the CI's roadmap, sort the samples' scores, and returun the percentiles of both ends. + # To this end, the sample's score needs to be a float (to be sortable with its 'colleagues'), and to this end + # (going backward on np.nanmean, axis=0), the score in each group's own global score needs to be a float and not + # a list of floats (as is the case, for example with rouge with use_aggregator = False). + # + # The following excludes groups that score to np.nan because they are empty(due to filtering), rather than a dict to_sample_from = [ {"score": {"instance": groups_global_scores[group_name]}} for group_name in groups_global_scores.keys() @@ -722,8 +734,6 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato aggregation_func=self.average_item_scores, ) else: - # todo: change to enable the CI employ the grouped version, and not the - # bare metric._compute confidence_interval = self.compute_global_confidence_intervals( references, predictions, task_data, score_name ) diff --git a/tests/library/test_metrics.py b/tests/library/test_metrics.py index 9c8385dc9..2f3a0d505 100644 --- a/tests/library/test_metrics.py +++ b/tests/library/test_metrics.py @@ -769,10 +769,10 @@ def test_rouge_grouping_and_filtering(self): (1 + (2 / 3) + 0) / 3, outputs[0]["score"]["global"]["score"] ) self.assertAlmostEqual( - 0.34900897136393977, outputs[0]["score"]["global"]["rougeL_ci_low"] + 0.37875517762310357, outputs[0]["score"]["global"]["rougeL_ci_low"] ) self.assertAlmostEqual( - 0.9333333333333332, outputs[0]["score"]["global"]["rougeL_ci_high"] + 0.8809773478805832, outputs[0]["score"]["global"]["rougeL_ci_high"] ) metric.grouping["ci_samples_from_groups_scores"] = True From 58bfbad345c554cac668065125812a20021fb871 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Sun, 5 May 2024 14:31:13 +0300 Subject: [PATCH 08/12] for scores that are lists, like rouge with use_aggregator=False, concatenate when combining groups' scores Signed-off-by: dafnapension --- src/unitxt/metrics.py | 32 +++++++++++++-------- tests/library/test_metrics.py | 52 ++++++++++++++++++++++++++++++++++- 2 files changed, 71 insertions(+), 13 deletions(-) diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index 5f9c19471..7e6217786 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -222,9 +222,11 @@ class MetricWithConfidenceInterval(Metric): # "group_by_field" which specifies the field in the instance whose value determines the group to which the instance belongs. # example: "task_data/group_id" # the second field of grouping, "ci_samples_from_groups_scores", is a boolean specifying whether resampling should be - # done from the individual groups' scores (True), as if each group is represented by one instance whose score instance - # is the group's aggregated score, or from the whole stream (False), where each resample is then split to - # groups, the score of which is then computed, and finally averaged with the other groups' scores. + # done from the individual groups' scores (True), as if each group is represented by one instance whose + # instance["score"]["instance"][score_name] is the group's aggregated score for score_name, + # Or from the whole stream (False), where each resample is then split to + # groups, the score of which is then computed, and finally averaged with the other groups' scores, as done + # here for the original whole stream. subgroup_filtering: dict = ( None # {"subgroup_column": str, "subgroup_types": List[str]} @@ -599,28 +601,34 @@ def average_groups_global_scores( ) # score_name is None result = defaultdict(list) - # average over the groups. Each group global score there is a dict, being the global_score - # computed for the group, or nan (if the group nullified or something). + # Average over the groups. Each group's global score is a dict, being the global_score + # computed for the group (as if it were a stream), or nan (if the group nullified or something). # nan-s are excluded, because typically the averaging is via nan_mean # so hereunder we average over the different fields of the dict, each field separately. - # for generatily we prepare a recursive averaging here, because some of the fields in that - # global score may have a value being a list (like rouge with use_aggregator = False) + # We prepare for a score being a string (and expected to be the same for all groups, like + # "score_name"), a float (which we average) or a list (which we concatenate, like the lists + # that rouge returns for use_aggregator=False) + fields_to_average = set() for _, group_global_score in groups_global_scores.items(): if isinstance(group_global_score, dict): for k, v in group_global_score.items(): if isinstance(v, str): result[k] = v - else: + elif isinstance(v, float): + fields_to_average.add(k) result[k].append(v) + else: + assert isoftype( + v, List[float] + ), f"unexpected type of score {v} in group's score field {k}" + result[k].extend(v) else: assert np.isnan( group_global_score ), "group global score should be either a dict or np.nan" for k, v in result.items(): - if isinstance(v, list): - # v should be either a str or a list, either a list of float, or a list of lists of floats - result[k] = np.array(result[k]) - result[k] = np.nanmean(result[k], axis=0) + if k in fields_to_average: + result[k] = np.nanmean(v) return result diff --git a/tests/library/test_metrics.py b/tests/library/test_metrics.py index 2f3a0d505..647cfefa8 100644 --- a/tests/library/test_metrics.py +++ b/tests/library/test_metrics.py @@ -816,7 +816,9 @@ def test_rouge_grouping_and_filtering(self): def test_rouge_l(self): metric = Rouge( - n_resamples=None, # disable confidence interval calculation which fails for this metric configuration + n_resamples=None, + # disable confidence interval calculation which fails for this metric configuration, + # since "score" is not a float, but a list of floats use_aggregator=False, # returns lists for scores, not any aggregation thereof rouge_types=["rougeL"], ) @@ -828,6 +830,54 @@ def test_rouge_l(self): global_target = [2 / 3, 1.0] self.assertListEqual(global_target, outputs[0]["score"]["global"]["score"]) + def test_rouge_l_grouping(self): + metric = Rouge( + n_resamples=None, + # disable confidence interval calculation which fails for this metric configuration, + # since "score" is not a float, but a list of floats + use_aggregator=False, # returns lists for scores, not any aggregation thereof + rouge_types=["rougeL"], + ) + references = [ + ["general kenobi", "general yoda"], + ["hello", "there"], + ["general kenobi", "general yoda"], + ["lieutenant dan", "lieutenant colonel"], + ["general kenobi", "general yoda"], + ] + predictions = [ + "general kenobi", + "hello there", + "general kenobi", + "forrest gump", + "general kenobi", + ] + task_data = [ + {"group_id": "group1"}, + {"group_id": "group2"}, + {"group_id": "group1"}, + {"group_id": "group3"}, + {"group_id": "group1"}, + ] + outputs = apply_metric( + metric=metric, predictions=predictions, references=references + ) + global_target = [1.0, 2 / 3, 1.0, 0.0, 1.0] + self.assertListEqual(global_target, outputs[0]["score"]["global"]["rougeL"]) + + metric.grouping = { + "group_by_field": "task_data/group_id", + "ci_samples_from_groups_scores": False, + } + outputs = apply_metric( + metric=metric, + predictions=predictions, + references=references, + task_data=task_data, + ) + global_target = [1.0, 1.0, 1.0, 2 / 3, 0.0] + self.assertListEqual(global_target, outputs[0]["score"]["global"]["rougeL"]) + def test_token_overlap(self): metric = TokenOverlap() predictions = ["hello there general dude", "foo bar foobar"] From 1c05e95b96e6d862e2a0decbb8bf6a2e93946f14 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Sun, 5 May 2024 15:04:06 +0300 Subject: [PATCH 09/12] grouped and filtered for rougeL Signed-off-by: dafnapension --- tests/library/test_metrics.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/library/test_metrics.py b/tests/library/test_metrics.py index 647cfefa8..69b976472 100644 --- a/tests/library/test_metrics.py +++ b/tests/library/test_metrics.py @@ -878,6 +878,19 @@ def test_rouge_l_grouping(self): global_target = [1.0, 1.0, 1.0, 2 / 3, 0.0] self.assertListEqual(global_target, outputs[0]["score"]["global"]["rougeL"]) + metric.subgroup_filtering = { + "subgroup_column": "task_data/group_id", + "subgroup_types": ["group1", "group2"], + } + outputs = apply_metric( + metric=metric, + predictions=predictions, + references=references, + task_data=task_data, + ) + global_target = [1.0, 1.0, 1.0, 2 / 3] + self.assertListEqual(global_target, outputs[0]["score"]["global"]["rougeL"]) + def test_token_overlap(self): metric = TokenOverlap() predictions = ["hello there general dude", "foo bar foobar"] From 442a4c16deb374cc9c981776617f2f53cf2554c4 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Sun, 5 May 2024 18:41:19 +0300 Subject: [PATCH 10/12] bulkinstance too Signed-off-by: dafnapension --- src/unitxt/metrics.py | 127 ++++++++++++++++++++---------------------- 1 file changed, 61 insertions(+), 66 deletions(-) diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index 7e6217786..ffc0b0599 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -528,14 +528,14 @@ def score_groups_globally( groups_global_scores = {} for group_name, group in grouped_instances.items(): - if isinstance(self, InstanceMetric): + if isinstance(self, (InstanceMetric, BulkInstanceMetric)): groups_global_scores[group_name] = {} for score_name in score_names: if isinstance(group, list): # not split to control and comparison groups_global_scores[group_name][score_name] = self.aggregating[ "aggregating_function" ](instances=group, score_name=score_name) - else: + else: # split to control and comparison control_scores = [ instance["score"]["instance"][score_name] for instance in group["control"] @@ -565,10 +565,6 @@ def score_groups_globally( predictions=predictions, task_data=task_data, ) - elif isinstance(self, BulkInstanceMetric): - raise ValueError( - "What are you doing here? nowhere in BulkInstanceMetric is this method invoked" - ) else: raise ValueError( f"Unrecognized extension of MetricWithConfidence: {type(self)}" @@ -786,31 +782,31 @@ class BulkInstanceMetric(SingleStreamOperator, MetricWithConfidenceInterval): n_resamples: int = OptionalField( default_factory=lambda: settings.num_resamples_for_instance_metrics ) - main_score: str - reduction_map: Dict[str, List[str]] + aggregating: dict = None + score_names: List[str] = None + + def prepare(self): + if self.score_names is None: + self.score_names = [self.main_score] + if self.aggregating is None: + self.aggregating = { + "aggregating_function_name": "mean", + "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + } - implemented_reductions: List[str] = field(default_factory=lambda: ["mean"]) + super().prepare() + if self.main_score is None: + self.main_score = "f1" def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: global_score = {} - instances = [] # consume the stream - references, predictions = map( - list, - zip( - *[ - (instance["references"], instance["prediction"]) - for instance in stream - ] - ), + predictions, references, task_data, instances = self.consume_stream( + stream=stream, task_data_field_name="task_data" ) - - task_data = [ - instance["task_data"] if "task_data" in instance else {} - for instance in stream - ] self._validate_references_and_prediction(references, predictions) + # compute the metric over all refs and preds instance_scores = self.compute( references=references, @@ -823,7 +819,7 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato instance_score["score"] = instance_score[self.main_score] instance_score["score_name"] = self.main_score - for instance, score in zip(stream, instance_scores): + for instance, score in zip(instances, instance_scores): if "score" not in instance: instance["score"] = {"global": global_score, "instance": {}} else: @@ -831,37 +827,45 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato instance["score"]["instance"].update(score) - instances.append(instance) - - for reduction, fields in self.reduction_map.items(): - assert ( - reduction in self.implemented_reductions - ), f"Reduction {reduction} is not implemented, use one of {self.implemented_reductions}" - - if reduction == "mean": - for field_name in fields: - global_score[field_name] = mean( - [ - instance["score"]["instance"][field_name] - for instance in instances - ] - ) - if field_name == self.main_score: - global_score["score"] = global_score[field_name] - global_score["score_name"] = self.main_score - - ci_fields = ( - list(set(self.ci_scores)) - if self.ci_scores is not None - else [self.main_score] + # groups also covers for non-grouped, where the whole stream is treated as a single group + groups_global_scores = self.score_groups_globally( + instances=instances, score_names=self.score_names + ) + # no playing with field names here as in InstanceMetric, so we simply average over the groups (one or more) + for score_name in self.score_names: + if self.grouping is None: + # there is only one group here + global_score.update( + {score_name: groups_global_scores["all"][score_name]} ) - confidence_interval = self.score_based_confidence_interval( - instances=instances, score_names=ci_fields + else: + global_score.update( + { + score_name: nan_mean( + [ + group_global_scores[score_name] + for group_global_scores in groups_global_scores.values() + if isinstance(groups_global_scores, dict) + ] + ) + } ) - global_score.update(confidence_interval) + if score_name == self.main_score: + global_score["score"] = global_score[self.main_score] + global_score["score_name"] = self.main_score - for instance in instances: - yield instance + ci_fields = ( + list(set(self.ci_scores)) + if self.ci_scores is not None + else [self.main_score] + ) + # working non-grouped, and hence no variation on field names + confidence_interval = self.score_based_confidence_interval( + instances=instances, score_names=ci_fields + ) + global_score.update(confidence_interval) + + yield from instances @abstractmethod def compute( @@ -1022,8 +1026,9 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato { to_score_name: nan_mean( [ - groups_global_scores[group_name][score_name] - for group_name in groups_global_scores.keys() + group_global_scores[score_name] + for group_global_scores in groups_global_scores.values() + if isinstance(group_global_scores, dict) ] ) } @@ -2110,7 +2115,7 @@ def _compute_single_ref( class BertScore(HuggingfaceBulkMetric): hf_metric_name = "bertscore" main_score = "f1" - reduction_map = {"mean": ["f1", "precision", "recall"]} + score_names = ["f1", "precision", "recall"] hf_metric_fields = ["f1", "precision", "recall"] ci_scores = ["f1", "precision", "recall"] model_name: str @@ -2128,7 +2133,6 @@ def prepare(self): class SentenceBert(BulkInstanceMetric): - reduction_map = {"mean": ["score"]} main_score = "score" batch_size: int = 32 @@ -2179,7 +2183,6 @@ def compute( class Reward(BulkInstanceMetric): - reduction_map = {"mean": ["score"]} main_score = "score" batch_size: int = 32 @@ -2220,7 +2223,6 @@ def compute( class Detector(BulkInstanceMetric): - reduction_map = {"mean": ["score"]} main_score = "score" batch_size: int = 32 @@ -2377,7 +2379,6 @@ class Perplexity(BulkInstanceMetric): """Computes the likelihood of generating text Y after text X - P(Y|X).""" main_score = "perplexity" - reduction_map = {"mean": ["perplexity"]} prediction_type = "str" source_template: str @@ -3559,6 +3560,7 @@ class BinaryAccuracy(InstanceMetric): "aggregating_function_name": "mean", "aggregating_function": MetricWithConfidenceInterval.average_item_scores, } + def _validate_reference(self, reference): super()._validate_reference(reference) assert reference[0] in [ @@ -3566,13 +3568,6 @@ def _validate_reference(self, reference): 1, ], f"all references of {self.main_score} must by 0 or 1" - def compute( - self, references: List[Any], prediction: Any, task_data: List[Dict] - ) -> dict: - float_prediction = to_float_or_default(prediction) - prediction = str(int(float_prediction > self.threshold)) - references = ["1"] if references[0].lower() in self.pos_classes else ["0"] - def compute( self, references: List[float], prediction: float, task_data: List[Dict] ) -> dict: From a5b0aca126013e09eb8d3c40f42ead7456ebd45a Mon Sep 17 00:00:00 2001 From: dafnapension Date: Wed, 8 May 2024 20:16:13 +0300 Subject: [PATCH 11/12] simplified further more by decouple aggregating_function_name from aggregating_function Signed-off-by: dafnapension --- src/unitxt/metrics.py | 170 ++++++++++++---------------------- tests/library/test_metrics.py | 14 +-- 2 files changed, 65 insertions(+), 119 deletions(-) diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index ffc0b0599..6a6563ff9 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -532,9 +532,11 @@ def score_groups_globally( groups_global_scores[group_name] = {} for score_name in score_names: if isinstance(group, list): # not split to control and comparison - groups_global_scores[group_name][score_name] = self.aggregating[ - "aggregating_function" - ](instances=group, score_name=score_name) + groups_global_scores[group_name][ + score_name + ] = self.aggregating_function( + instances=group, score_name=score_name + ) else: # split to control and comparison control_scores = [ instance["score"]["instance"][score_name] @@ -627,6 +629,19 @@ def average_groups_global_scores( result[k] = np.nanmean(v) return result + # for InstanceMetric and BulkInstanceMetric + def prepare_scorenames_aggregating(self): + if self.score_names is None: + assert ( + self.main_score is not None + ), "both score_names and main_score are None" + self.score_names = [self.main_score] + + if self.aggregating_function_name is None: + self.aggregating_function_name = "mean" + if self.aggregating_function is None: + self.aggregating_function = MetricWithConfidenceInterval.average_item_scores + class GlobalMetric(SingleStreamOperator, MetricWithConfidenceInterval): """A class for computing metrics that require joint calculations over all instances and are not just aggregation of scores of individuals instances. @@ -782,21 +797,15 @@ class BulkInstanceMetric(SingleStreamOperator, MetricWithConfidenceInterval): n_resamples: int = OptionalField( default_factory=lambda: settings.num_resamples_for_instance_metrics ) - aggregating: dict = None + # same args as for InstanceMetric score_names: List[str] = None + aggregating_function_name: str = None + aggregating_function: callable = None def prepare(self): - if self.score_names is None: - self.score_names = [self.main_score] - if self.aggregating is None: - self.aggregating = { - "aggregating_function_name": "mean", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } - - super().prepare() if self.main_score is None: self.main_score = "f1" + super().prepare_scorenames_aggregating() def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: global_score = {} @@ -927,29 +936,24 @@ class InstanceMetric(SingleStreamOperator, MetricWithConfidenceInterval): # compatibility -- reflecting the other input args for aggregating. to_score_names: List[str] = None - # How to yield one score, float, from a list of instances: either the whole stream or one group. + # How to yield one score, float, for each score_name in score_names, from a list of instances: either the whole stream or one group. # For InstanceMetric, this aggregation is over the instance scores, already sitting in each instance, in subfield # instance["score"]["instance"], which is a dict mapping score_name to (instance) score value. - # Tyically, to be overridden by the subclasses. If None, then for InstanceMetric - the default of average_item_scores, - # If not set by subclasses, it is set by InstanceMetric to { - # "aggregating_function_name": "mean", - # "aggregating_function": MetricWithConfidenceInterval.average_item_scores, + # Tyically, aggregating is to be overridden by the subclasses. If None, and not set by subclasses, then for InstanceMetric - + # the defaults set are: + # aggregating_function_name: "mean", + # aggregating_function: MetricWithConfidenceInterval.average_item_scores, # } - aggregating: dict = None + aggregating_function_name: str = None + aggregating_function: callable = None reference_field: str = NonPositionalField(default="references") prediction_field: str = NonPositionalField(default="prediction") def verify(self): - assert isinstance(self.aggregating, dict), "aggregating must be a dict" - assert len(self.aggregating) == 2, "aggregating must consist of two fields" - assert ( - "aggregating_function_name" in self.aggregating - and "aggregating_function" in self.aggregating - ), "aggregating must contain both fields: 'aggregating_function_name' and 'aggregating_function'" assert callable( - self.aggregating["aggregating_function"] - ), "self.aggregating['aggregating_function'] must be a callable" + self.aggregating_function + ), "arg aggregating_function must be a callable" if self.grouping is not None: assert isinstance( @@ -975,28 +979,20 @@ def verify(self): ), "'score_names' and 'to_score_names' must have the same length" def prepare(self): - if self.aggregating is None: - self.aggregating = { - "aggregating_function_name": "mean", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } - - if self.score_names is None: - self.score_names = [self.main_score] + self.prepare_scorenames_aggregating() self.prefix = "" if self.to_score_names is None: if self.grouping is not None: self.prefix = "group_" if self.grouping["ci_samples_from_groups_scores"]: self.prefix = "fixed_group_" - self.prefix += self.aggregating["aggregating_function_name"] + self.prefix += self.aggregating_function_name self.prefix += "_" # for backward compatibility, only when grouping do we note the aggregation function name # we suggest to always add it self.to_score_names = [ self.prefix + score_name for score_name in self.score_names ] - super().prepare() # flake8: noqa: C901 def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: @@ -1054,7 +1050,7 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato instances=instances, score_names=list(set(self.ci_scores)), ci_score_prefix=self.prefix, - aggregation_func=self.aggregating["aggregating_function"] + aggregation_func=self.aggregating_function if self.grouping is None else self.average_groups_global_scores, ) @@ -1124,11 +1120,12 @@ def compute(self, references: List[Any], prediction: Any, task_data: Dict) -> di class Accuracy(InstanceMetric): - grouping = None - score_names = ["accuracy"] main_score = "accuracy" ci_scores = ["accuracy"] + def prepare(self): + super().prepare() + prediction_type = "Any" # string representation is compared def compute( @@ -1181,15 +1178,18 @@ def compute( class MaxAccuracy(Accuracy): """Calculate the maximal accuracy over all instances as the global score.""" - aggregating = { - "aggregating_function_name": "max", - "aggregating_function": MetricWithConfidenceInterval.max_item_scores, - } + aggregating_function_name = "max" + + def prepare(self): + self.aggregating_function = MetricWithConfidenceInterval.max_item_scores + super().prepare() class MinAccuracy(Accuracy): """Calculate the minimal accuracy over all instances as the global score.""" + aggregating_function_name = "min" + def min_item_score(self, instances: List[Dict[str, Any]], score_name: str) -> float: raw_scores = [ instance["score"]["instance"][score_name] for instance in instances @@ -1200,10 +1200,7 @@ def min_item_score(self, instances: List[Dict[str, Any]], score_name: str) -> fl return np.min(non_nan_raw_scores) def prepare(self): - self.aggregating = { - "aggregating_function_name": "min", - "aggregating_function": self.min_item_score, - } + self.aggregating_function = self.min_item_score super().prepare() @@ -2259,7 +2256,6 @@ class LlamaIndexCorrectness(InstanceMetric): model_name: str = "" main_score: str = "" prediction_type: str = "str" - aggregating: dict = None openai_models: List[str] = ["gpt-3.5-turbo"] # anthropic_models is here for the sake of documentation for future models: @@ -2845,7 +2841,6 @@ def _compute( class RetrievalAtK(RetrievalMetric): k_list: List[int] main_score: str = None - aggregating: dict = None def prepare(self): self.main_score = self.score_name("match", self.k_list[0]) @@ -3264,10 +3259,7 @@ class FixedGroupMeanBaselineAccuracy(Accuracy): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "mean_baseline", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "mean_baseline" subgroup_filtering = { "subgroup_column": "task_data/variant_type", "subgroup_types": ["original"], @@ -3279,10 +3271,7 @@ class FixedGroupMeanParaphraseAccuracy(Accuracy): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "mean_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "mean_paraphrase" subgroup_filtering = { "subgroup_column": "task_data/variant_type", "subgroup_types": ["paraphrase"], @@ -3296,10 +3285,7 @@ class FixedGroupMeanBaselineStringContainment(StringContainment): "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "mean_baseline", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "mean_baseline" subgroup_filtering = { "subgroup_column": "task_data/variant_type", "subgroup_types": ["original"], @@ -3311,10 +3297,7 @@ class FixedGroupMeanParaphraseStringContainment(StringContainment): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "mean_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "mean_paraphrase" subgroup_filtering = { "subgroup_column": "task_data/variant_type", "subgroup_types": ["paraphrase"], @@ -3327,10 +3310,7 @@ class FixedGroupPDRParaphraseAccuracy(Accuracy): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "pdr_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "pdr_paraphrase" control_comparison = { "subgroup_column": "task_data/variant_type", "control_subgroup_types": ["original"], @@ -3344,10 +3324,7 @@ class FixedGroupPDRParaphraseStringContainment(StringContainment): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "pdr_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "pdr_paraphrase" control_comparison = { "subgroup_column": "task_data/variant_type", "control_subgroup_types": ["original"], @@ -3370,10 +3347,7 @@ class FixedGroupNormCohensHParaphraseAccuracy(Accuracy): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "norm_cohens_h_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "norm_cohens_h_paraphrase" control_comparison = { "subgroup_column": "task_data/variant_type", "control_subgroup_types": ["original"], @@ -3387,10 +3361,7 @@ class FixedGroupNormCohensHParaphraseStringContainment(StringContainment): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "norm_cohens_h_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "norm_cohens_h_paraphrase" control_comparison = { "subgroup_column": "task_data/variant_type", "control_subgroup_types": ["original"], @@ -3405,10 +3376,7 @@ class FixedGroupNormHedgesGParaphraseAccuracy(Accuracy): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "norm_hedges_g_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "norm_hedges_g_paraphrase" control_comparison = { "subgroup_column": "task_data/variant_type", "control_subgroup_types": ["original"], @@ -3422,10 +3390,7 @@ class FixedGroupNormHedgesGParaphraseStringContainment(StringContainment): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "norm_hedges_g_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "norm_hedges_g_paraphrase" control_comparison = { "subgroup_column": "task_data/variant_type", "control_subgroup_types": ["original"], @@ -3440,10 +3405,7 @@ class FixedGroupAbsvalNormCohensHParaphraseAccuracy(Accuracy): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "absval_norm_cohens_h_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "absval_norm_cohens_h_paraphrase" control_comparison = { "subgroup_column": "task_data/variant_type", "control_subgroup_types": ["original"], @@ -3457,10 +3419,7 @@ class FixedGroupAbsvalNormCohensHParaphraseStringContainment(StringContainment): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "absval_norm_cohens_h_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "absval_norm_cohens_h_paraphrase" control_comparison = { "subgroup_column": "task_data/variant_type", "control_subgroup_types": ["original"], @@ -3474,10 +3433,7 @@ class FixedGroupAbsvalNormHedgesGParaphraseAccuracy(Accuracy): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "absval_norm_hedges_g_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "absval_norm_hedges_g_paraphrase" control_comparison = { "subgroup_column": "task_data/variant_type", "control_subgroup_types": ["original"], @@ -3491,10 +3447,7 @@ class FixedGroupAbsvalNormHedgesGParaphraseStringContainment(StringContainment): "group_by_field": "task_data/group_id", "ci_samples_from_groups_scores": True, } - aggregating = { - "aggregating_function_name": "absval_norm_hedges_g_paraphrase", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "absval_norm_hedges_g_paraphrase" control_comparison = { "subgroup_column": "task_data/variant_type", "control_subgroup_types": ["original"], @@ -3556,10 +3509,7 @@ class BinaryAccuracy(InstanceMetric): prediction_type = "Union[float,int]" single_reference_per_prediction = True - aggregating = { - "aggregating_function_name": "mean", - "aggregating_function": MetricWithConfidenceInterval.average_item_scores, - } + aggregating_function_name = "mean" def _validate_reference(self, reference): super()._validate_reference(reference) diff --git a/tests/library/test_metrics.py b/tests/library/test_metrics.py index 69b976472..5815ac176 100644 --- a/tests/library/test_metrics.py +++ b/tests/library/test_metrics.py @@ -200,7 +200,6 @@ def test_accuracy_max_min_aggregation(self): outputs = apply_metric( metric=metric, predictions=predictions, references=references ) - expected_global_result = { "accuracy": 1 if number == 0 else 0, "score": 1 if number == 0 else 0, @@ -214,7 +213,7 @@ def test_accuracy_max_min_aggregation(self): for key, value in global_result.items() if key in expected_global_result } - self.assertDictEqual(global_result, expected_global_result) + self.assertDictEqual(expected_global_result, global_result) instance_targets = [ {"accuracy": 0.0, "score": 0.0, "score_name": "accuracy"}, @@ -222,7 +221,7 @@ def test_accuracy_max_min_aggregation(self): {"accuracy": 1.0, "score": 1.0, "score_name": "accuracy"}, ] for output, target in zip(outputs, instance_targets): - self.assertDictEqual(output["score"]["instance"], target) + self.assertDictEqual(target, output["score"]["instance"]) def test_f1_micro(self): metric = F1Micro() @@ -1082,7 +1081,7 @@ def test_grouped_instance_metric_errors(self): """Test certain value and assertion error raises for grouped instance metrics (with group_mean reduction).""" class NoAggFuncReduction(Accuracy): - aggregating = {"aggregating_function_name": "unknown"} + aggregating_function = "" with self.assertRaises(AssertionError): # should raise error because no aggregation_function will be defined, since only mean and group_mean are implemented @@ -1095,7 +1094,7 @@ class NoAggFuncReduction(Accuracy): ) class NoAggFunc(Accuracy): - aggregating = 9 + aggregating_function = 9 with self.assertRaises(AssertionError): # should raise error because no "agg_func" field in group_mean @@ -1108,10 +1107,7 @@ class NoAggFunc(Accuracy): ) class NoCallableAggFunc(Accuracy): - aggregating = { - "aggregating_function_name": "no_callable", - "aggregating_function": 9, - } + aggregating_function = "not_callable" with self.assertRaises(AssertionError): # should raise error because second field of agg_func should be callable From eb7f439496800f5c2ddf493d692b853c81ccd6a2 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Sun, 12 May 2024 11:03:31 +0300 Subject: [PATCH 12/12] advance to shape score aggregations into operators Signed-off-by: dafnapension --- src/unitxt/metrics.py | 61 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index 6a6563ff9..1548e03b6 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -26,7 +26,7 @@ StreamingOperator, StreamInstanceOperator, ) -from .operators import CopyFields +from .operators import CopyFields, FilterByCondition from .random_utils import get_seed from .settings_utils import get_settings from .stream import MultiStream, Stream @@ -1119,6 +1119,65 @@ def compute(self, references: List[Any], prediction: Any, task_data: Dict) -> di pass +class Aggregator(SingleStreamOperator): + """Given a stream of individually scored instances, and a score_name, generate the stream-global score for that score_name. + + For a given score_name, each instance is assumed to have a value in its instance["score"]["instance"][score_name]. + This operator computes the global score from all these instance-scores, and writes this computed global score in the + instance["score"]["global"] section of each instance in the stream. + """ + + def process( + self, + stream: Stream, + stream_name: Optional[str] = None, + score_names: Optional[List[str]] = None, + ) -> Generator: + if score_names is None: + score_names = ["score"] + global_score = {} + instances = [] + for instance in stream: + if "score" not in instance: + instance["score"] = {"global": global_score, "instance": {}} + else: + global_score = instance["score"]["global"] + instances.append(instance) + + for score_name in self.score_names: + gs = self.aggregate_instance_scores_to_a_global_score(instances, score_name) + global_score.update( + {score_name: gs, score_name + "_agg_name": self.aggregator_name} + ) + # all instances link to same global_score object, and hence all instances now have an updated global score + yield from instances + + def aggregate_instance_scores_to_a_global_score( + self, instances: List[Dict[str, Any]], score_name: str + ) -> float: + from .metrics import MetricWithConfidenceInterval + + return MetricWithConfidenceInterval.average_item_scores(instances, score_name) + + +class FilterAggregator(Aggregator): + """Filter the instances by a given filter, and aggregate over the remaining instances.""" + + filter_by_condition: FilterByCondition = None + + def process( + self, + stream: Stream, + stream_name: Optional[str] = None, + score_names: Optional[List[str]] = None, + ) -> Generator: + if self.filter_by_condition is None: + instances = stream + else: + instances = self.filter_by_condition(stream) + yield from super().process(stream=instances, score_names=score_names) + + class Accuracy(InstanceMetric): main_score = "accuracy" ci_scores = ["accuracy"]