diff --git a/timex_lca/timeline_builder.py b/timex_lca/timeline_builder.py index 4be40af..a8e7ba7 100644 --- a/timex_lca/timeline_builder.py +++ b/timex_lca/timeline_builder.py @@ -15,7 +15,6 @@ convert_date_string_to_datetime, ) - class TimelineBuilder: """ This class is responsible for building a timeline of processes based on the temporal relationship of the priority-first graph traversal. @@ -97,25 +96,14 @@ def __init__( ) self.edge_timeline = self.edge_extractor.build_edge_timeline() - def check_database_names(self) -> None: - """ - Check that the strings of the databases exist in the databases of the brightway project. - """ - for db in self.database_date_dict_static_only.keys(): - assert ( - db in bd.databases - ), f"{db} is not in your brightway project databases." - return - - # TODO: rethink structure of build_timeline(): is it good to have all these nested functions? def build_timeline(self) -> pd.DataFrame: """ - Create a dataframe with grouped, time-explicit edges and for each grouped edge interpolate to the database with the closest time of representativeness. + Create a dataframe with grouped, time-explicit edges and, for each grouped edge, interpolate to the database with the closest time of representativeness. Edges from same producer to same consumer that occur at different times within the same time window (temporal_grouping) are grouped together. Possible temporal groupings are "year", "month", "day" and "hour". - The column "interpolation weights" assigns the ratio [0-1] of the edge's amount to be taken from the database with the closest time of representativeness. + For edges between forground and background system, the column "interpolation weights" assigns the ratio [0-1] of the edge's amount to be taken from the database with the closest time of representativeness. If a process is in the foreground system only, the interpolation weight is set to None. Available interpolation types are: - "linear": linear interpolation between the two closest databases, based on temporal distance. - "closest": closest database is assigned 1 @@ -131,220 +119,6 @@ def build_timeline(self) -> pd.DataFrame: A timeline with grouped, time-explicit edges and interpolation weights to background databases. """ - def extract_edge_data(edge: Edge) -> dict: - """ - Stores the attributes of an Edge instance in a dictionary. - - :param edge: Edge instance - :return: Dictionary with attributes of the edge - """ - try: - consumer_date = edge.abs_td_consumer.date - consumer_date = np.array( - [consumer_date for i in range(len(edge.td_producer))] - ).T.flatten() - except AttributeError: - consumer_date = None - - return { - "producer": edge.producer, - "consumer": edge.consumer, - "leaf": edge.leaf, - "consumer_date": consumer_date, - "producer_date": edge.abs_td_producer.date, - "amount": edge.abs_td_producer.amount, - } - - def get_consumer_name(id: int) -> str: - """ - Returns the name of consumer node. - If consuming node is the functional unit, returns -1. - - :param id: Id of node - :return: string of node's name or -1 - """ - try: - return bd.get_node(id=id)["name"] - except: - return "-1" # functional unit - - def add_column_interpolation_weights_to_timeline( - tl_df: pd.DataFrame, - interpolation_type: str = "linear", - ) -> pd.DataFrame: - """ - Add a column to a timeline with the weights for an interpolation between the two nearest dates, from the list of dates of the available databases. - - :param tl_df: Timeline as a dataframe. - :param interpolation_type: Type of interpolation between the nearest lower and higher dates. Available options: "linear"and "nearest". - - :return: Timeline as a dataframe with a column 'interpolation_weights' added, this column looks like {database_name: weight, database_name: weight}. - - """ - if not self.database_date_dict_static_only: - tl_df["interpolation_weights"] = None - warnings.warn( - "No time-explicit databases are provided. Mapping to time-explicit databases is not possible.", - category=Warning, - ) - return tl_df - - dates_list = [ - date - for date in self.database_date_dict_static_only.values() - if type(date) == datetime - ] - if "date_producer" not in list(tl_df.columns): - raise ValueError("The timeline does not contain dates.") - - # create reversed dict {date: database} with only static "background" db's - self.reversed_database_date_dict = { - v: k - for k, v in self.database_date_dict_static_only.items() - if type(v) == datetime - } - - if self.interpolation_type == "nearest": - tl_df["interpolation_weights"] = tl_df["date_producer"].apply( - lambda x: find_closest_date(x, dates_list) - ) - - if self.interpolation_type == "linear": - tl_df["interpolation_weights"] = tl_df["date_producer"].apply( - lambda x: get_weights_for_interpolation_between_nearest_years( - x, dates_list, self.interpolation_type - ) - ) - - else: - raise ValueError( - f"Sorry, but {self.interpolation_type} interpolation is not available yet." - ) - - tl_df["interpolation_weights"] = tl_df.apply( - add_interpolation_weights_at_intersection_to_background, axis=1 - ) # add the weights to the timeline for processes at intersection - - return tl_df - - def add_interpolation_weights_at_intersection_to_background(row) -> Union[dict, None]: - """ - returns the interpolation weights to background databases only for those exchanges, where the producing process - actually comes from a background database (temporal markets). - - Only these processes are receiving inputs from the background databases. - All other process in the timeline are not directly linked to the background, so the interpolation weight info is not needed and set to None - - Parameters - ---------- - row : pd.Series - Row of the timeline dataframe - Returns - ------- - dict - Dictionary with the name of databases and interpolation weights. - """ - - if ( - row["producer"] - in self.node_id_collection_dict[ - "first_level_background_node_ids_static" - ] - ): - return { - self.reversed_database_date_dict[x]: v - for x, v in row["interpolation_weights"].items() - } - else: - return None - - def find_closest_date(target: datetime, dates: KeysView[datetime]) -> dict: - """ - Find the closest date to the target in the dates list. - - :param target: Target datetime.datetime object. - :param dates: List of datetime.datetime objects. - :return: Dictionary with the key as the closest datetime.datetime object from the list and a value of 1. - - """ - - # If the list is empty, return None - if not dates: - return None - - # Sort the dates - dates = sorted(dates) - # Use min function with a key based on the absolute difference between the target and each date - closest = min(dates, key=lambda date: abs(target - date)) - - return {closest: 1} - - def get_weights_for_interpolation_between_nearest_years( - reference_date: datetime, - dates_list: KeysView[datetime], - interpolation_type: str = "linear", - ) -> dict: - """ - Find the nearest dates (before and after) a given date in a list of dates and calculate the interpolation weights. - - :param reference_date: Target date. - :param dates_list: KeysView[datetime], which is a list of the temporal representativeness of the available databases. - :param interpolation_type: Type of interpolation between the nearest lower and higher dates. For now, - only "linear" is available. - - :return: Dictionary with temporal coverage of the available databases to use as keys and the weights for interpolation as values. - - """ - dates_list = sorted(dates_list) - - diff_dates_list = [reference_date - x for x in dates_list] - - if timedelta(0) in diff_dates_list: # date of process == date of database - exact_match = dates_list[diff_dates_list.index(timedelta(0))] - return {exact_match: 1} - - closest_lower = None - closest_higher = None - - # select the closest lower and higher dates of the database in regards to the date of process - for date in dates_list: - if date < reference_date: - if ( - closest_lower is None - or reference_date - date < reference_date - closest_lower - ): - closest_lower = date - elif date > reference_date: - if ( - closest_higher is None - or date - reference_date < closest_higher - reference_date - ): - closest_higher = date - - if closest_lower is None: - warnings.warn( - f"Reference date {reference_date} is lower than all provided dates. Data will be taken from the closest higher year.", - category=Warning, - ) - return {closest_higher: 1} - - if closest_higher is None: - warnings.warn( - f"Reference date {reference_date} is higher than all provided dates. Data will be taken from the closest lower year.", - category=Warning, - ) - return {closest_lower: 1} - - if self.interpolation_type == "linear": - weight = int((reference_date - closest_lower).total_seconds()) / int( - (closest_higher - closest_lower).total_seconds() - ) - else: - raise ValueError( - f"Sorry, but {interpolation_type} interpolation is not available yet." - ) - return {closest_lower: 1 - weight, closest_higher: weight} - # check if database names match with databases in BW project self.check_database_names() @@ -356,7 +130,7 @@ def get_weights_for_interpolation_between_nearest_years( ) # Extract edge data into a list of dictionaries - edges_data = [extract_edge_data(edge) for edge in self.edge_timeline] + edges_data = [self.extract_edge_data(edge) for edge in self.edge_timeline] # Convert list of dictionaries to dataframe edges_df = pd.DataFrame(edges_data) @@ -371,7 +145,7 @@ def get_weights_for_interpolation_between_nearest_years( edges_df["consumer"] == -1, "producer_date" ] - # extract grouping time of consumer and producer: processes occuring at different times withing in teh same time window of grouping get the same grouping time + # extract grouping time of consumer and producer: processes occuring at different times withing in the same time window of grouping get the same grouping time edges_df["consumer_grouping_time"] = edges_df["consumer_date"].apply( lambda x: extract_date_as_string(self.temporal_grouping, x) ) @@ -440,7 +214,7 @@ def get_weights_for_interpolation_between_nearest_years( ) # Add interpolation weights to background databases to the dataframe - grouped_edges = add_column_interpolation_weights_to_timeline( + grouped_edges = self.add_column_interpolation_weights_to_timeline( grouped_edges, interpolation_type=self.interpolation_type, ) @@ -449,7 +223,7 @@ def get_weights_for_interpolation_between_nearest_years( grouped_edges["producer_name"] = grouped_edges.producer.apply( lambda x: bd.get_node(id=x)["name"] ) - grouped_edges["consumer_name"] = grouped_edges.consumer.apply(get_consumer_name) + grouped_edges["consumer_name"] = grouped_edges.consumer.apply(self.get_consumer_name) # Reorder columns grouped_edges = grouped_edges[ @@ -470,3 +244,266 @@ def get_weights_for_interpolation_between_nearest_years( ] return grouped_edges + + ################################################### + # underlying functions called by build_timeline() # + ################################################### + + def check_database_names(self) -> None: + """ + Check that the strings of the databases exist in the databases of the brightway project. + + """ + for db in self.database_date_dict_static_only.keys(): + assert ( + db in bd.databases + ), f"{db} is not in your brightway project databases." + return + + def extract_edge_data(self, edge: Edge) -> dict: + """ + Stores the attributes of an Edge instance in a dictionary. + + Parameters + ---------- + edge: Edge + Edge instance + + Returns + ------- + dict + Dictionary with the attributes of the edge instance. + """ + try: + consumer_date = edge.abs_td_consumer.date + consumer_date = np.array( + [consumer_date for i in range(len(edge.td_producer))] + ).T.flatten() + except AttributeError: + consumer_date = None + + return { + "producer": edge.producer, + "consumer": edge.consumer, + "leaf": edge.leaf, + "consumer_date": consumer_date, + "producer_date": edge.abs_td_producer.date, + "amount": edge.abs_td_producer.amount, + } + + def add_column_interpolation_weights_to_timeline(self, + tl_df: pd.DataFrame, + interpolation_type: str = "linear", + ) -> pd.DataFrame: + """ + Add a column to a timeline with the weights for an interpolation between the two nearest dates, from the list of dates of the available databases. + + Parameters + ---------- + tl_df: pd.DataFrame + Timeline as a dataframe. + interpolation_type: str, optional + Type of interpolation between the nearest lower and higher dates. Available options: "linear" and "nearest", defaulting to "linear". + + Returns + ------- + pd.DataFrame + Timeline as a dataframe with a column 'interpolation_weights' added, this column looks like {database_name: weight, database_name: weight}. + """ + if not self.database_date_dict_static_only: + tl_df["interpolation_weights"] = None + warnings.warn( + "No time-explicit databases are provided. Mapping to time-explicit databases is not possible.", + category=Warning, + ) + return tl_df + + dates_list = [ + date + for date in self.database_date_dict_static_only.values() + if type(date) == datetime + ] + if "date_producer" not in list(tl_df.columns): + raise ValueError("The timeline does not contain dates.") + + # create reversed dict {date: database} with only static "background" db's + self.reversed_database_date_dict = { + v: k + for k, v in self.database_date_dict_static_only.items() + if type(v) == datetime + } + + if self.interpolation_type == "nearest": + tl_df["interpolation_weights"] = tl_df["date_producer"].apply( + lambda x: self.find_closest_date(x, dates_list) + ) + + if self.interpolation_type == "linear": + tl_df["interpolation_weights"] = tl_df["date_producer"].apply( + lambda x: self.get_weights_for_interpolation_between_nearest_years( + x, dates_list, self.interpolation_type + ) + ) + + else: + raise ValueError( + f"Sorry, but {self.interpolation_type} interpolation is not available yet." + ) + + tl_df["interpolation_weights"] = tl_df.apply( + self.add_interpolation_weights_at_intersection_to_background, axis=1 + ) # add the weights to the timeline for processes at intersection + + return tl_df + + def find_closest_date(self, target: datetime, dates: KeysView[datetime]) -> dict: + """ + Find the closest date to the target in the dates list. + + Parameters + ---------- + target : datetime.datetime + Target datetime object. + dates : KeysView[datetime] + List of datetime.datetime objects. + + Returns + ------- + dict + Dictionary with the key as the closest datetime.datetime object from the list and a value of 1. + """ + + # If the list is empty, return None + if not dates: + return None + + # Sort the dates + dates = sorted(dates) + + # Use min function with a key based on the absolute difference between the target and each date + closest = min(dates, key=lambda date: abs(target - date)) + + return {closest: 1} + + def get_weights_for_interpolation_between_nearest_years(self, + reference_date: datetime, + dates_list: KeysView[datetime], + interpolation_type: str = "linear", + ) -> dict: + """ + Find the nearest dates (lower and higher) for a given date from a list of dates and calculate the interpolation weights based on temporal proximity. + + Parameters + ---------- + reference_date : datetime + Target date. + dates_list : KeysView[datetime] + List of datetime objects representing the temporal representativeness of the available databases. + interpolation_type : str, optional + Type of interpolation between the nearest lower and higher dates. For now, only "linear" is available. + + Returns + ------- + dict + Dictionary with datetimes of the available closest databases as keys and the weights for interpolation as values. + """ + dates_list = sorted(dates_list) + + diff_dates_list = [reference_date - x for x in dates_list] + + if timedelta(0) in diff_dates_list: # date of process == date of database + exact_match = dates_list[diff_dates_list.index(timedelta(0))] + return {exact_match: 1} + + closest_lower = None + closest_higher = None + + # select the closest lower and higher dates of the database in regards to the date of process + for date in dates_list: + if date < reference_date: + if ( + closest_lower is None + or reference_date - date < reference_date - closest_lower + ): + closest_lower = date + elif date > reference_date: + if ( + closest_higher is None + or date - reference_date < closest_higher - reference_date + ): + closest_higher = date + + if closest_lower is None: + warnings.warn( + f"Reference date {reference_date} is lower than all provided dates. Data will be taken from the closest higher year.", + category=Warning, + ) + return {closest_higher: 1} + + if closest_higher is None: + warnings.warn( + f"Reference date {reference_date} is higher than all provided dates. Data will be taken from the closest lower year.", + category=Warning, + ) + return {closest_lower: 1} + + if self.interpolation_type == "linear": + weight = int((reference_date - closest_lower).total_seconds()) / int( + (closest_higher - closest_lower).total_seconds() + ) + else: + raise ValueError( + f"Sorry, but {interpolation_type} interpolation is not available yet." + ) + return {closest_lower: 1 - weight, closest_higher: weight} + + def add_interpolation_weights_at_intersection_to_background(self, row) -> Union[dict, None]: + """ + returns the interpolation weights to background databases only for those exchanges, where the producing process + actually comes from a background database (temporal markets). + + Only these processes are receiving inputs from the background databases. + All other process in the timeline are not directly linked to the background, so the interpolation weight info is not needed and set to None + + Parameters + ---------- + row : pd.Series + Row of the timeline dataframe + Returns + ------- + dict + Dictionary with the name of databases and interpolation weights. + """ + + if ( + row["producer"] + in self.node_id_collection_dict[ + "first_level_background_node_ids_static" + ] + ): + return { + self.reversed_database_date_dict[x]: v + for x, v in row["interpolation_weights"].items() + } + else: + return None + + def get_consumer_name(self, id: int) -> str: + """ + Returns the name of consumer node. + If consuming node is the functional unit, returns -1. + + Parameters + ---------- + id : int + Id of node. + + Returns + ------- + str + Name of the node or -1 + """ + try: + return bd.get_node(id=id)["name"] + except: + return "-1" # functional unit \ No newline at end of file