diff --git a/src/unitxt/metrics.py b/src/unitxt/metrics.py index c6f457fbda..157756d7e5 100644 --- a/src/unitxt/metrics.py +++ b/src/unitxt/metrics.py @@ -7,7 +7,7 @@ from copy import deepcopy from dataclasses import field from statistics import mean -from typing import Any, Dict, Generator, List, Optional, Tuple +from typing import Any, Dict, Generator, List, Optional, Tuple, Union import evaluate import numpy @@ -623,17 +623,6 @@ def compute( pass -def scores_dict_from_instances_dict( - instances_dict: Dict[str, List[Dict[str, Any]]], score_name: str -): - to_ret = {} - for key, instances in instances_dict.items(): - to_ret[key] = [ - instance["score"]["instance"][score_name] for instance in instances - ] - return to_ret - - class InstanceMetric(SingleStreamOperator, MetricWithConfidenceInterval): """Class for metrics for which a global score can be calculated by aggregating the instance scores (possibly with additional instance inputs). @@ -760,52 +749,74 @@ def accuracy_diff(subgroup_scores_dict, expected_subgroup_types=['original', 'pa self.subgroup_column in instance["task_data"] for instance in instances ), f"each instance task_data dict must have a key {self.subgroup_column}" - # flake8: noqa: C901 - def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: - instances, global_score = self.compute_instance_scores(stream) + def scores_dict_from_instances_dict( + self, instances_dict: Dict[str, List[Dict[str, Any]]], score_name: str + ): + to_ret = {} + for key, instances in instances_dict.items(): + to_ret[key] = [ + instance["score"]["instance"][score_name] for instance in instances + ] + return to_ret + + # accept either one list of instances, or a split list, in case of subfields + def instance_aggregator_employing_callable( + self, + scores_aggregator: Optional[callable], + ) -> callable: + def the_instance_aggregator( + instances: Optional[ + Union[Dict[str, List[Dict[str, Any]]], List[Dict[str, Any]]] + ], + score_name: Optional[str], + ) -> float: + if isinstance(instances, dict): + # split by subfield: prepare the dictionary of lists of instances to the shape + # that the callable (the lambdas in the individual metrics) are expecting it: + return scores_aggregator( + self.scores_dict_from_instances_dict(instances, score_name) + ) + + # just aggregate along the list of instances, by score_name + return scores_aggregator( + [instance["score"]["instance"][score_name] for instance in instances] + ) + + return the_instance_aggregator + + def prepare(self): + # in this PR, we take the input args as they are, in order to run on all the currently defined individual instancemetrics, + # but we change the args right here to the shape we suggest they should have: - # not clear to me why all types of aggregations (of which name and actual callable are delivered via "agg_func" - # in "reduction_map") are only allowed for groups and not over the whole list of instances. - # I am trying to unify this here assert ( len(self.reduction_map) == 1 ), f"@@@@@ @@@@@ @@@@@@@@@@@@@@@ offending is: {type(self)}" - reduction_type, reduction_params = next(iter(self.reduction_map.items())) - assert ( - reduction_type in ["max", "mean", "group_mean"] - ), f"Reduction {reduction_type} is not supported, please specify a valid reduction method in reduction_map {self.reduction_map}." - if self.subgroup_column is not None: - # this we check here, not necessarily within grouped. We allow subgroups also for the whole stream of instances - assert all( - self.subgroup_column in instance["task_data"] for instance in instances - ), f"each instance task_data dict must have a key {self.subgroup_column}" - # and assert that there is an aggregate_function_name and aggregate_function. Currently, these arrive - # within reduction_params, and only for the case of grouped_mean. Need to take them out - - if reduction_type == "group_mean" or self.subgroup_column is not None: - self._validate_group_mean_reduction(instances=instances) + reduction_type, reduction_params = next(iter(self.reduction_map.items())) + if reduction_type not in ["max", "mean", "group_mean"]: + raise ValueError( + f"Reduction {reduction_type} is not supported, please specify a valid reduction method in reduction_map {self.reduction_map}." + ) - reduction_fields = ( # we get reduction fields out of grouping - [self.main_score] - if "score_fields" not in reduction_params - else list(set(reduction_params["score_fields"])) - ) + super().prepare() - if reduction_type != "group_mean": + def score_groups_globally( + self, instances: List[Dict[str, Any]], score_names: List[str] + ) -> dict: + if self.grouping is None: grouped_instances = {"all": instances} - else: # for future: make grouping a separate arg, not to be inferred from the name of the aggregation + else: # grouping is already pretending to be a separate arg, not to be inferred from reduction_type grouped_instances = defaultdict(list) - group_by = "task_data/group_id" for instance in instances: try: - group_name = dict_get(instance, group_by) + group_name = dict_get(instance, self.grouping["by field"]) except Exception as e: raise ValueError( - f"Reduction type is group_mean, however instance {instance} does not contain subfield 'task_data/group_id'" + f"Reduction type is group_mean, grouping is to be empoloyed, however instance {instance} does not contain subfield '{self.grouping['by field']}'" ) from e grouped_instances[group_name].append(instance) - # instances are now grouped by task_data/group_id, if reduction_type == 'group_mean', else - all instance make one group named 'all' + # instances are now grouped by task_data/group_id (generally: by self.grouping["by field"]), + # if reduction_type == 'group_mean' (generally: if self.grouping is not None), else - all instance make one group named 'all' # continue to calculate the global score for each group (!) first: # If reduction_type == 'group_mean', apply the aggregation specified by reduction_params, which, in turn, @@ -813,148 +824,206 @@ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generato # are already coded in the aggregation function specified by the reduction_params). # If reduction_type != 'group_mean', aggregate with either self.average_item_scores or self.max_item_scores, # as indicated by reduction_type (== either 'mean' or 'max') - - aggregation_function = None - if reduction_type == "mean": - aggregation_function = self.average_item_scores - elif reduction_type == "max": - aggregation_function = self.max_item_scores - else: # reduction_type == 'group_mean' and reduction_param specifies the aggregation function to employ (over - # scores, not over instances, but we will see to it). - aggregation_function = reduction_params[1] - # currently, sub_group is only applicable for reduction_type == 'group_mean', but in future: make it general - # if self.subgroup_column is not None, generate a dict, associated with each group, where the instances - # are grouped to lists by the subgroup_column of their instance. - if self.subgroup_column is not None: - # in the current code, this is an indication of grouping. Should be separated - for group_name, group in grouped_instances.items(): - sub_grouped_instances = defaultdict(list) - sub_group_by = "task_data/" + self.subgroup_column - for instance in group: - try: - sub_group_name = dict_get(instance, sub_group_by) - except Exception as e: - raise ValueError( - f"subgroup_column is {self.subgroup_column}, however instance {instance} does not contain subfield '{sub_group_by}'" - ) from e - sub_grouped_instances[sub_group_name].append(instance) - grouped_instances[ - group_name - ] = sub_grouped_instances # replaced the list by dict of split lists, per sub_group value - - # if applicable ( or reduction_type == 'group_mean', and hence reduction_params indicates an aggregation to apply) - # -- compute score by the sub_groups, per their aggregation function (lambda..) - # otherwise + # To this end, first see if the aggregation result depends on subgroup_column, and prepare this for the aggregation + # to work. We allow this subgroups also when grouping is not done first + if self.subgroup_column is not None: + # in the current code, this is an indication of grouping. Should be separated + # here it can be employed to the whole stream, that now sits in grouped_instances["all"] + for group_name, group in grouped_instances.items(): + sub_grouped_instances = defaultdict(list) + sub_group_by = "task_data/" + self.subgroup_column + for instance in group: + try: + sub_group_name = dict_get(instance, sub_group_by) + except Exception as e: + raise ValueError( + f"subgroup_column is {self.subgroup_column}, however instance {instance} does not contain subfield '{sub_group_by}'" + ) from e + sub_grouped_instances[sub_group_name].append(instance) + grouped_instances[group_name] = sub_grouped_instances + # replaced the list by dict of split lists, per sub_group value + + # build the global score for each group, (potentially the only group called 'all') groups_global_scores = {} - for group_name in grouped_instances.keys(): + for group_name, group in grouped_instances.items(): groups_global_scores[group_name] = {} - for score_name in reduction_fields: - scores_dict = scores_dict_from_instances_dict( - grouped_instances[group_name], score_name - ) - groups_global_scores[group_name][score_name] = reduction_params[ - "agg_func" - ][1](scores_dict) - # for each score_name in reduction_fields, each group now has a score, computed through its subgroups, the score sits in - # the group's global_score (only of the group), named score_name (as the name of the score in the ["score"]["instance"] - # section of the instances - - # we now turn to compute the global score of the whole stream, by averaging over groups, if there were any - # (if reduction_type == 'group_mean'), or make the global_score of the sole group (called 'all') - the global_score - # of the whole stream. In the former case, we also prefix the score_name by - # "group_" + str(reduction_params["agg_func"][0]) + "_"+ - # and further prefix the above by "fixed_" in case that CI is done over the group-scores (as just computed) - # and not over the whole input stream - - # , prefixed - # as is done in the current code. - # we now turn to deal with ci, and accordingly, prefix (or not) the names of these global scores by "fixed_" - - aggregation_function_name = reduction_type - ## aggregation_func: over scores, not instances - if reduction_type != "group_mean": - aggregation_func = nan_mean if reduction_type == "mean" else nan_max - else: - aggregation_function_name = reduction_params["agg_func"][0] - aggregation_func = reduction_params["agg_func"][1] + # if this group is sub-split, need to invoke the special lambda + for score_name in score_names: + groups_global_scores[group_name][score_name] = self.aggregating[ + "aggregation_function" + ](instances=group, score_name=score_name) + + # for each score_name in reduction_fields, each group now has a score, computed through its subgroups, if applicable. + # the score sits in the group's own global_score (only of the group), named score_name (as the name of the score in + # the ["score"]["instance"] section of the instances + return groups_global_scores + + def average_groups_global_scores( + self, instances: List[Dict[str, Any]], score_name: str + ) -> float: + groups_global_scores = self.score_groups_globally( + instances=instances, score_names=[score_name] + ) + return nan_mean( + [ + groups_global_scores[group_name][score_name] + for group_name in groups_global_scores + ] + ) - # now see if (further) to split by subfield. This sub_group should also be independent of the grouping - # the following is just for ruff - assert aggregation_func != aggregation_function_name + # flake8: noqa: C901 + # flake8: noqa: C408 + # flake8: noqa: C416 + def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: + instances, global_score = self.compute_instance_scores(stream) - # for field_name in reduction_fields: - # print() + if self.subgroup_column is not None: + # this we check here, not necessarily within grouped. We allow subgroups also for the whole stream of instances + assert all( + self.subgroup_column in instance["task_data"] for instance in instances + ), f"each instance task_data dict must have a key {self.subgroup_column}" + # and assert that there is an aggregate_function_name and aggregate_function. Currently, these arrive + # within reduction_params, and only for the case of grouped_mean. Need to take them out - for reduction_type, reduction_params in self.reduction_map.items(): - assert ( - reduction_type in self.implemented_reductions - ), f"Reduction {reduction_type} is not implemented, use one of {self.implemented_reductions}" - - field_name_full_prefix = "" - # used for passing to the bootstrapping, depends on whether the groups are fixed or not - aggregation_function = None - if reduction_type == "mean": - aggregation_function = self.average_item_scores - reduction_fields = list(set(reduction_params)) - # no group reduction, so resample instances individually - scores_to_resample = instances - elif reduction_type == "max": - aggregation_function = self.max_item_scores - reduction_fields = list(set(reduction_params)) - # no group reduction, so resample instances individually - scores_to_resample = instances - elif reduction_type == "group_mean": - aggregation_function = self.average_item_scores - self._validate_group_mean_reduction(instances=instances) - reduction_fields = ( - [self.main_score] - if "score_fields" not in reduction_params - else list(set(reduction_params["score_fields"])) - ) - aggregation_function_name = str(reduction_params["agg_func"][0]) - field_name_full_prefix = "group_" + aggregation_function_name + "_" - do_resample_as_group = reduction_params["agg_func"][2] - if do_resample_as_group: - # append fixed_ to name because resamples the groups as fixed units - field_name_full_prefix = "fixed_" + field_name_full_prefix - ( - scores_to_resample, - aggregation_function, - ) = self._set_up_group_mean_aggregation( - instances, reduction_params, reduction_fields - ) - else: - raise ValueError( - f"Reduction {reduction_type} is not supported, please specify a valid reduction method in reduction_map {self.reduction_map}." - ) + reduction_type, reduction_params = next(iter(self.reduction_map.items())) + if reduction_type == "group_mean" or self.subgroup_column is not None: + self._validate_group_mean_reduction(instances=instances) + + # moved to here from prepare, to pass the validation, but todo: + # a validation without the instances + self.grouping = ( + None + if reduction_type != "group_mean" + else { + "by field": "task_data/group_id", + "ci_samples_from_groups_scores": reduction_type == "group_mean" + and reduction_params["agg_func"][2], + } + ) + # we suggest to not have this only for group_mean, but also allow for max and mean. default value of arg == None + + # aggregating over instances. also moved from prepare to come after the _validate + # but todo: validate partly without the instances, and return to prepare() + # we should allow all types of aggregations (of which currently, name and actual callable are delivered via "agg_func" + # in "reduction_map") not only for groups, but also over the whole stream of instances. + self.aggregating = { + "aggregating_function_name": reduction_type + if reduction_type in ["mean", "max"] + else str(reduction_params["agg_func"][0]), + "aggregation_function": self.average_item_scores + if reduction_type == "mean" + else self.max_item_scores + if reduction_type == "max" + else self.instance_aggregator_employing_callable( + scores_aggregator=reduction_params["agg_func"][1] + ), + } + + reduction_fields = ( # we get reduction fields out of grouping + reduction_params + if isinstance(reduction_params, list) + else list(set(reduction_params["score_fields"])) + if isinstance(reduction_params, dict) and "score_fields" in reduction_params + else [self.main_score] + ) + + # build the global score for each group, (potentially the only group called 'all') + groups_global_scores = self.score_groups_globally( + instances=instances, score_names=reduction_fields + ) + # for each score_name in reduction_fields, each group now has a score, computed through its subgroups, if applicable. + # the score sits in the group's own global_score (only of the group), named score_name (as the name of the score in + # the ["score"]["instance"] section of the instances + + # we turn now to merge the groups' global score into the final global score, looking into the naming + # that is now applied, for compatibility. Then we will turn to CI. + field_name_full_prefix = "" # imitating the original code + if self.grouping is not None: # this is how the code goes, but we thing + # the function name should show in the global score name also when + # grouping is not done, and the aggregation is simply 'mean' or 'max' + field_name_full_prefix = ( + "group_" + self.aggregating["aggregating_function_name"] + "_" + ) + if self.grouping and self.grouping["ci_samples_from_groups_scores"]: + field_name_full_prefix = "fixed_" + field_name_full_prefix + + # calculate global scores for each reduction field, from the groups' global score + if self.grouping is None: + # no prefix for score name, and there is only one group here + global_score.update(groups_global_scores["all"]) + if self.main_score in reduction_fields: + global_score["score"] = global_score[self.main_score] + global_score["score_name"] = self.main_score - # calculate global scores for each reduction field + else: + # simply average the groups' scores, and prefix score_name for field_name in reduction_fields: field_name_full = field_name_full_prefix + field_name - # if group resampling (3rd element of agg_func parameter) is True, then - # 1. scores_to_resample are the group scores, and - # 2. aggregation_function is to take the raw mean - # if no group resampling (3rd element of agg_func parameter) is False, then - # 1. scores_to_resample are the original instance scores, and - # 2. aggregation_function is to apply the group aggregation from the instance scores - # either way, the application of aggregation_function to scores_to_resample yields the global score - global_score[field_name_full] = aggregation_function( - scores_to_resample, field_name + global_score.update( + { + field_name_full: nan_mean( + [ + groups_global_scores[group_name][field_name] + for group_name in groups_global_scores.keys() + ] + ) + } ) if field_name == self.main_score: global_score["score"] = global_score[field_name_full] global_score["score_name"] = field_name_full - # need to specify which fields should have CIs calculated for them through ci_scores - # (will not automatically calculate CIs for fields in reduction map) - if self.ci_scores is not None: + # finally: the CI: + # if no grouping, or grouping["ci_samples_from_groups_scores"] is false: + # ci as usual, over the whole input stream, with aggregation function that + # was used above for the whole stream or the individual groups + # need to specify which fields should have CIs calculated for them through ci_scores + # (will not automatically calculate CIs for fields in reduction map) + if self.ci_scores is not None: + if ( + self.grouping is None + or not self.grouping["ci_samples_from_groups_scores"] + ): confidence_interval = self.score_based_confidence_interval( - instances=scores_to_resample, + instances=instances, score_names=list(set(self.ci_scores)), - ci_score_prefix=field_name_full_prefix, - aggregation_func=aggregation_function, + ci_score_prefix="" + if self.grouping is None + else field_name_full_prefix, # when no grouping, no score_name_prefix + aggregation_func=self.aggregating["aggregation_function"] + if self.grouping is None + else self.average_groups_global_scores, ) - global_score.update(confidence_interval) + else: + # dress the individual groups's score like instance scores: for each group generate + # a dict having just the "score" field, and in it -- just the "instance" section, + # and in that section: all the score_names that were evaluated for that group. + # then sample from them, aggregating by simple average: + to_sample_from = [ + { + "score": { + "instance": { + score_name: score + for score_name, score in groups_global_scores[ + group_name + ].items() + } + } + } + for group_name in groups_global_scores.keys() + ] + confidence_interval = self.score_based_confidence_interval( + instances=to_sample_from, + score_names=list(set(self.ci_scores)), + ci_score_prefix=field_name_full_prefix, # with grouping, use the same prefix as for the score_name + aggregation_func=self.average_item_scores, + ) + + global_score.update(confidence_interval) + + # finally, update all the instances with the global score now all computed: + for instance in instances: + instance["score"]["global"] = global_score yield from instances @@ -2193,8 +2262,6 @@ def _model_using_extrnal_api(self): def prepare(self): """Initialization method for the metric. Initializes the CorrectnessEvaluator with the OpenAI model.""" - super().prepare() - self.model_name_normalized = self.model_name.replace(".", "_").replace("-", "_") self.main_score: str = ( f"correctness_llama_index_by_{self.model_name_normalized}_judge" @@ -2202,6 +2269,8 @@ def prepare(self): self.reduction_map: Dict[str, List[str]] = {"mean": [self.main_score]} + super().prepare() + from llama_index.core.evaluation import CorrectnessEvaluator if self.model_name in self.openai_models: @@ -2952,6 +3021,8 @@ def performance_drop_rate( np.concatenate( [subgroup_scores_dict[subgroup_name] for subgroup_name in name_list] ) + if any(subgroup_name in subgroup_scores_dict for subgroup_name in name_list) + else [] for name_list in [control_subgroup_types, comparison_subgroup_types] ] if any(len(scores) == 0 for scores in group_scores_list): @@ -3064,6 +3135,8 @@ def normalized_cohens_h( np.concatenate( [subgroup_scores_dict[subgroup_name] for subgroup_name in name_list] ) + if any(subgroup_name in subgroup_scores_dict for subgroup_name in name_list) + else [] for name_list in [control_subgroup_types, comparison_subgroup_types] ] @@ -3114,6 +3187,8 @@ def normalized_hedges_g( np.concatenate( [subgroup_scores_dict[subgroup_name] for subgroup_name in name_list] ) + if any(subgroup_name in subgroup_scores_dict for subgroup_name in name_list) + else [] for name_list in [control_subgroup_types, comparison_subgroup_types] ] @@ -3178,9 +3253,16 @@ def mean_subgroup_score( ) # combine all desired subgroup scores - score_list = np.concatenate( - [subgroup_scores_dict[subgroup_name] for subgroup_name in subgroup_types] - ) + if any(subgroup_name in subgroup_scores_dict for subgroup_name in subgroup_types): + score_list = np.concatenate( + [ + subgroup_scores_dict[subgroup_name] + for subgroup_name in subgroup_types + if subgroup_name in subgroup_scores_dict + ] + ) + else: + score_list = [] if len(score_list) == 0: # no scores to use return np.nan diff --git a/tests/library/test_metrics.py b/tests/library/test_metrics.py index c2dcb75090..0adb92740f 100644 --- a/tests/library/test_metrics.py +++ b/tests/library/test_metrics.py @@ -1240,7 +1240,7 @@ def _test_grouped_instance_confidence_interval( score_value, expected_global_result[score_name], places=5, - msg=f"{group_score_name} score mismatch for {metric.__class__.__name__}, got {expected_global_result[score_name]} but expected {score_value}", + msg=f"{score_name} score mismatch for {metric.__class__.__name__}, expected {expected_global_result[score_name]} but got {score_value}", ) else: # An output score that is not expected