Skip to content

Commit

Permalink
Fix committer lag of one record when NoSqlTarget is used (#548)
Browse files Browse the repository at this point in the history
`NoSqlTarget` must not keep a reference to the last event it processed, or else the GC can't collect it, and its offset won't be committed.

[ML-4421](https://iguazio.atlassian.net/browse/ML-4421)
  • Loading branch information
gtopper authored Dec 15, 2024
1 parent 77e57d8 commit a00e997
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
7 changes: 7 additions & 0 deletions storey/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ async def _persist_worker(self):
for done_job in self._pending_by_key[job.key].in_flight:
if done_job.callback:
await done_job.callback(done_job.extra_data, completed)
del done_job # Allow job to be garbage collected
self._pending_by_key[job.key].in_flight = []

# If we got more pending events for the same key process them
Expand All @@ -444,8 +445,14 @@ async def _persist_worker(self):
jobs_at_tail = self_sent_jobs.get(tail_position, [])
jobs_at_tail.append((job, asyncio.get_running_loop().create_task(future_task)))
self_sent_jobs[tail_position] = jobs_at_tail
# Allow these jobs to be garbage collected
del jobs_at_tail
else:
del self._pending_by_key[job.key]

# Allow job to be garbage collected
del job
task = None
except BaseException as ex:
if task and task is not _termination_obj:
if task[0].extra_data and task[0].extra_data._awaitable_result:
Expand Down
43 changes: 43 additions & 0 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,49 @@ def test_async_offset_commit_before_termination():
asyncio.run(async_offset_commit_before_termination())


async def async_offset_commit_before_termination_with_nosqltarget():
platform = Committer()
context = CommitterContext(platform)

max_wait_before_commit = 1

controller = build_flow(
[
AsyncEmitSource(context=context, explicit_ack=True, max_wait_before_commit=max_wait_before_commit),
Map(lambda x: x + 1),
Filter(lambda x: x < 3),
FlatMap(lambda x: [x, x * 10]),
NoSqlTarget(Table("/", NoopDriver(), flush_interval_secs=None)),
]
).run()

num_shards = 10
num_records_per_shard = 10

for offset in range(1, num_records_per_shard + 1):
for shard in range(num_shards):
event = Event(shard, "abc")
event.shard_id = shard
event.offset = offset
await controller.emit(event)

del event

await asyncio.sleep(max_wait_before_commit + 1)

try:
offsets = copy.copy(platform.offsets)
assert offsets == {("/", i): num_records_per_shard for i in range(num_shards)}
finally:
await controller.terminate()
await controller.await_termination()


# ML-4421
def test_async_offset_commit_before_termination_with_nosqltarget():
asyncio.run(async_offset_commit_before_termination_with_nosqltarget())


def test_offset_not_committed_prematurely():
platform = Committer()
context = CommitterContext(platform)
Expand Down

0 comments on commit a00e997

Please sign in to comment.