Skip to content

Commit

Permalink
[FSTORE-1640] Delta delta streamer repeatable reads (#1416)
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks authored Feb 5, 2025
1 parent 7554590 commit fc0da96
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions utils/python/hsfs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,13 @@ def offline_fg_materialization(

read_options = engine.get_instance()._get_kafka_config(entity.feature_store_id, {})

# get offsets
# get starting offsets
offset_location = entity.prepare_spark_location() + "/kafka_offsets"
try:
if initial_check_point_string:
offset_string = json.dumps(
_build_starting_offsets(initial_check_point_string)
)
starting_offset_string = json.dumps(_build_offsets(initial_check_point_string))
else:
offset_string = spark.read.json(offset_location).toJSON().first()
starting_offset_string = spark.read.json(offset_location).toJSON().first()
except Exception as e:
print(f"An unexpected error occurred: {e}")
# if all else fails read from the beggining
Expand All @@ -286,15 +284,26 @@ def offline_fg_materialization(
offline_write_options={},
high=False,
)
offset_string = json.dumps(_build_starting_offsets(initial_check_point_string))
print(f"startingOffsets: {offset_string}")
starting_offset_string = json.dumps(_build_offsets(initial_check_point_string))
print(f"startingOffsets: {starting_offset_string}")

# get ending offsets
ending_offset_string = kafka_engine.kafka_get_offsets(
topic_name=entity._online_topic_name,
feature_store_id=entity.feature_store_id,
offline_write_options={},
high=True,
)
ending_offset_string = json.dumps(_build_offsets(ending_offset_string))
print(f"endingOffsets: {ending_offset_string}")

# read kafka topic
df = (
spark.read.format("kafka")
.options(**read_options)
.option("subscribe", entity._online_topic_name)
.option("startingOffsets", offset_string)
.option("startingOffsets", starting_offset_string)
.option("endingOffsets", ending_offset_string)
.option("includeHeaders", "true")
.option("failOnDataLoss", "false")
.load()
Expand Down Expand Up @@ -326,8 +335,8 @@ def offline_fg_materialization(
entity.insert(deserialized_df, storage="offline")

# update offsets
df_offsets = (df if limit > filtered_df.count() else filtered_df).groupBy("partition").agg(max("offset").alias("offset")).collect()
offset_dict = json.loads(offset_string)
df_offsets = (df if limit > filtered_df.count() else filtered_df).groupBy('partition').agg(max('offset').alias('offset')).collect()
offset_dict = json.loads(starting_offset_string)
for offset_row in df_offsets:
offset_dict[entity._online_topic_name][f"{offset_row.partition}"] = (
offset_row.offset + 1
Expand All @@ -350,8 +359,7 @@ def update_table_schema_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> Non
entity.stream = False
engine.get_instance().update_table_schema(entity)


def _build_starting_offsets(initial_check_point_string: str):
def _build_offsets(initial_check_point_string: str):
if not initial_check_point_string:
return ""

Expand Down

0 comments on commit fc0da96

Please sign in to comment.