From fc0da9669ac9a28d8ffc89be9be6196cb6e7f35f Mon Sep 17 00:00:00 2001 From: Ralf Date: Wed, 5 Feb 2025 12:27:08 +0200 Subject: [PATCH] [FSTORE-1640] Delta delta streamer repeatable reads (#1416) --- utils/python/hsfs_utils.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index f61435cfb5..015bb788e1 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -269,15 +269,13 @@ def offline_fg_materialization( read_options = engine.get_instance()._get_kafka_config(entity.feature_store_id, {}) - # get offsets + # get starting offsets offset_location = entity.prepare_spark_location() + "/kafka_offsets" try: if initial_check_point_string: - offset_string = json.dumps( - _build_starting_offsets(initial_check_point_string) - ) + starting_offset_string = json.dumps(_build_offsets(initial_check_point_string)) else: - offset_string = spark.read.json(offset_location).toJSON().first() + starting_offset_string = spark.read.json(offset_location).toJSON().first() except Exception as e: print(f"An unexpected error occurred: {e}") # if all else fails read from the beggining @@ -286,15 +284,26 @@ def offline_fg_materialization( offline_write_options={}, high=False, ) - offset_string = json.dumps(_build_starting_offsets(initial_check_point_string)) - print(f"startingOffsets: {offset_string}") + starting_offset_string = json.dumps(_build_offsets(initial_check_point_string)) + print(f"startingOffsets: {starting_offset_string}") + + # get ending offsets + ending_offset_string = kafka_engine.kafka_get_offsets( + topic_name=entity._online_topic_name, + feature_store_id=entity.feature_store_id, + offline_write_options={}, + high=True, + ) + ending_offset_string = json.dumps(_build_offsets(ending_offset_string)) + print(f"endingOffsets: {ending_offset_string}") # read kafka topic df = ( spark.read.format("kafka") .options(**read_options) .option("subscribe", entity._online_topic_name) - .option("startingOffsets", offset_string) + .option("startingOffsets", starting_offset_string) + .option("endingOffsets", ending_offset_string) .option("includeHeaders", "true") .option("failOnDataLoss", "false") .load() @@ -326,8 +335,8 @@ def offline_fg_materialization( entity.insert(deserialized_df, storage="offline") # update offsets - df_offsets = (df if limit > filtered_df.count() else filtered_df).groupBy("partition").agg(max("offset").alias("offset")).collect() - offset_dict = json.loads(offset_string) + df_offsets = (df if limit > filtered_df.count() else filtered_df).groupBy('partition').agg(max('offset').alias('offset')).collect() + offset_dict = json.loads(starting_offset_string) for offset_row in df_offsets: offset_dict[entity._online_topic_name][f"{offset_row.partition}"] = ( offset_row.offset + 1 @@ -350,8 +359,7 @@ def update_table_schema_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> Non entity.stream = False engine.get_instance().update_table_schema(entity) - -def _build_starting_offsets(initial_check_point_string: str): +def _build_offsets(initial_check_point_string: str): if not initial_check_point_string: return ""