Skip to content

Commit f7b27fa

Browse files
committed
[FSTORE-1640] Delta delta streamer repeatable reads (#432)
1 parent 8bcef22 commit f7b27fa

File tree

1 file changed

+20
-12
lines changed

1 file changed

+20
-12
lines changed

utils/python/hsfs_utils.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -269,15 +269,13 @@ def offline_fg_materialization(
269269

270270
read_options = engine.get_instance()._get_kafka_config(entity.feature_store_id, {})
271271

272-
# get offsets
272+
# get starting offsets
273273
offset_location = entity.prepare_spark_location() + "/kafka_offsets"
274274
try:
275275
if initial_check_point_string:
276-
offset_string = json.dumps(
277-
_build_starting_offsets(initial_check_point_string)
278-
)
276+
starting_offset_string = json.dumps(_build_offsets(initial_check_point_string))
279277
else:
280-
offset_string = spark.read.json(offset_location).toJSON().first()
278+
starting_offset_string = spark.read.json(offset_location).toJSON().first()
281279
except Exception as e:
282280
print(f"An unexpected error occurred: {e}")
283281
# if all else fails read from the beggining
@@ -286,15 +284,26 @@ def offline_fg_materialization(
286284
offline_write_options={},
287285
high=False,
288286
)
289-
offset_string = json.dumps(_build_starting_offsets(initial_check_point_string))
290-
print(f"startingOffsets: {offset_string}")
287+
starting_offset_string = json.dumps(_build_offsets(initial_check_point_string))
288+
print(f"startingOffsets: {starting_offset_string}")
289+
290+
# get ending offsets
291+
ending_offset_string = kafka_engine.kafka_get_offsets(
292+
topic_name=entity._online_topic_name,
293+
feature_store_id=entity.feature_store_id,
294+
offline_write_options={},
295+
high=True,
296+
)
297+
ending_offset_string = json.dumps(_build_offsets(ending_offset_string))
298+
print(f"endingOffsets: {ending_offset_string}")
291299

292300
# read kafka topic
293301
df = (
294302
spark.read.format("kafka")
295303
.options(**read_options)
296304
.option("subscribe", entity._online_topic_name)
297-
.option("startingOffsets", offset_string)
305+
.option("startingOffsets", starting_offset_string)
306+
.option("endingOffsets", ending_offset_string)
298307
.option("includeHeaders", "true")
299308
.option("failOnDataLoss", "false")
300309
.load()
@@ -326,8 +335,8 @@ def offline_fg_materialization(
326335
entity.insert(deserialized_df, storage="offline")
327336

328337
# update offsets
329-
df_offsets = (df if limit > filtered_df.count() else filtered_df).groupBy("partition").agg(max("offset").alias("offset")).collect()
330-
offset_dict = json.loads(offset_string)
338+
df_offsets = (df if limit > filtered_df.count() else filtered_df).groupBy('partition').agg(max('offset').alias('offset')).collect()
339+
offset_dict = json.loads(starting_offset_string)
331340
for offset_row in df_offsets:
332341
offset_dict[entity._online_topic_name][f"{offset_row.partition}"] = (
333342
offset_row.offset + 1
@@ -350,8 +359,7 @@ def update_table_schema_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> Non
350359
entity.stream = False
351360
engine.get_instance().update_table_schema(entity)
352361

353-
354-
def _build_starting_offsets(initial_check_point_string: str):
362+
def _build_offsets(initial_check_point_string: str):
355363
if not initial_check_point_string:
356364
return ""
357365

0 commit comments

Comments
 (0)