Implementing ZooKeeper persistent watcher into discovery service.#728
Implementing ZooKeeper persistent watcher into discovery service.#728
Conversation
📝 WalkthroughWalkthroughSwitches DiscoveryService from periodic ticks to ZooKeeper-driven updates: adds BasePath and LogObsolete; installs persistent recursive watches on ready; forwards ZooKeeper watch events into the main loop; maintains thread-safe _advertised_raw and applies it to _advertised_cache; deprecates get_advertised_instances. Changes
Sequence Diagram(s)sequenceDiagram
participant ZK as ZooKeeper
participant DS as DiscoveryService
participant Cache as _advertised_cache
ZK->>DS: CONNECTED / Ready (_on_zk_ready)
DS->>DS: Install recursive watch on BasePath
DS->>DS: Schedule async _rescan_advertised_instances
DS->>ZK: Read children & node data (_iter_zk_items)
ZK-->>DS: Nodes & JSON payloads
DS->>DS: Update _advertised_raw
DS->>DS: _apply_advertised_raw (validate/normalize)
DS->>Cache: Replace _advertised_cache under lock
ZK->>DS: Watch event (CREATED/CHANGED/DELETED) on watch thread
DS->>DS: _on_change_zookeeper_thread -> schedule_threadsafe
DS->>DS: async _on_change updates _advertised_raw -> _apply_advertised_raw
DS->>Cache: Sync cache with latest state
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
asab/api/discovery.py (1)
46-58:⚠️ Potential issue | 🟡 MinorSilence the unused PubSub argument.
Ruff flags
msgas unused in_on_tick600. Rename to_msg(or_) to keep the signature but avoid lint noise.🛠️ Proposed fix
- def _on_tick600(self, msg): + def _on_tick600(self, _msg):
🤖 Fix all issues with AI agents
In `@asab/api/discovery.py`:
- Line 86: Rename the unused local variable stat to _stat in the unpacking
assignments from self.ZooKeeperContainer.ZooKeeper.Client.get(...) to satisfy
lint; specifically change occurrences like "data, stat =
self.ZooKeeperContainer.ZooKeeper.Client.get(...)" to "data, _stat =
self.ZooKeeperContainer.ZooKeeper.Client.get(...)" for both places (the one at
the top-level discovery logic and the later occurrence around the same
Client.get call).
- Around line 236-293: In _apply_advertised_raw, the loop over web entries
extracts ip and port via ip = i[0]; port = i[1] but only catches KeyError, so
malformed entries can raise IndexError or TypeError and crash the method; update
the error handling around the ip/port extraction in the for i in web block to
either validate i is a sequence with at least two elements before indexing or
catch IndexError and TypeError (e.g., except (IndexError, TypeError, KeyError):)
and log the unexpected format using the existing L.error message, then continue;
this change targets the ip/port extraction lines and the except clause in
_apply_advertised_raw.
- Around line 72-77: The handler _on_change_threadsafe should guard against
WatchedEvent with path=None and use the KazooState enum; change the initial
check to compare event.state to KazooState.CONNECTED and return early if not
matched, and before slicing event.path using BasePath ensure event.path is not
None (or fall back to an empty string or skip calling _on_change), then call
App.TaskService.schedule_threadsafe(self._on_change(event.path[len(self.BasePath)
+ 1:], event.type)) only when event.path is present to avoid the TypeError.
- Around line 60-69: _on_zk_ready currently registers a persistent watch every
time a CONNECTED transition occurs, causing duplicate events; add a one-time
flag (e.g., self._zk_watch_registered) on the instance to guard the
ZooKeeper.Client.add_watch call so the watch is only registered once, set the
flag immediately after successful registration, and ensure you still call
self.App.TaskService.schedule(self._rescan_advertised_instances()) as before;
also rename the unused msg parameter (e.g., to _msg or remove it) to reflect it
is unused and avoid lint warnings; locate these changes in the _on_zk_ready
method and reference ZooKeeperContainer, BasePath, _on_change_threadsafe, and
_rescan_advertised_instances when applying the fix.
🧹 Nitpick comments (1)
asab/api/discovery.py (1)
221-231: Avoid swallowing unexpected errors in rescan.The broad
except Exceptioncan mask programming errors and leave stale cache state. Consider re‑raising unexpected exceptions after logging, or narrowing the handler.🛠️ Proposed fix (re‑raise after logging)
- except Exception: - L.exception("Error when scanning advertised instances") - return + except Exception: + L.exception("Error when scanning advertised instances") + raise
There was a problem hiding this comment.
Pull request overview
This PR migrates ASAB's Discovery service from traditional ZooKeeper watches to persistent watches (introduced in ZooKeeper 3.6). This reduces load on ZooKeeper by establishing a single persistent watch instead of repeatedly setting watches on each read operation.
Changes:
- Switched from per-operation watches to a single persistent recursive watch on the base path
- Optimized rescan frequency from 5 minutes to 10 minutes due to improved real-time change detection
- Deprecated
get_advertised_instances()method in favor ofdiscover()
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| setup.py | Updated kazoo dependency to use vendored version with persistent watch support |
| asab/api/discovery.py | Implemented persistent watch mechanism, refactored cache update logic, and deprecated old method |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
setup.py
Outdated
| 'aiohttp>=3.8.3,<4', | ||
| 'fastjsonschema>=2.16.2,<3', | ||
| 'kazoo>=2.9.0,<3', | ||
| 'git+https://github.com/TeskaLabs/kazoo.git', |
There was a problem hiding this comment.
Installing directly from a git repository URL without specifying a commit hash, tag, or branch makes builds non-reproducible and potentially unstable. Consider pinning to a specific commit SHA or tag (e.g., 'git+https://github.com/TeskaLabs/kazoo.git@') to ensure consistent builds across environments.
| 'git+https://github.com/TeskaLabs/kazoo.git', | |
| 'git+https://github.com/TeskaLabs/kazoo.git@master', |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@asab/api/discovery.py`:
- Around line 55-57: The handler _on_tick600 currently defines an unused
parameter named msg; rename it to _msg to indicate it is intentionally unused by
updating the function signature def _on_tick600(self, msg): → def
_on_tick600(self, _msg): in asab.api.discovery so linters understand the
argument is unused; leave the body (self.App.TaskService.schedule(...))
unchanged and update any internal references if the parameter is later used.
In `@setup.py`:
- Line 88: Replace the unpinned VCS requirement
'git+https://github.com/TeskaLabs/kazoo.git' with a PEP 508 direct reference
pinned to a specific tag or commit (for example: 'kazoo @
git+https://github.com/TeskaLabs/kazoo.git@<TAG_OR_COMMIT>#egg=kazoo'); update
the dependency string in setup.py accordingly so the installer pulls the fixed
commit/tag instead of the floating branch.
🧹 Nitpick comments (1)
asab/api/discovery.py (1)
223-233: Don’t swallow task cancellation in rescan.Catching
Exceptionhere can maskasyncio.CancelledError. Re-raise cancellations explicitly.🔧 Suggested change
- except Exception: + except asyncio.CancelledError: + raise + except Exception: L.exception("Error when scanning advertised instances") return
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
asab/api/discovery.py (2)
60-71:⚠️ Potential issue | 🟠 MajorPersistent watch registered on every reconnect leads to duplicate events.
The
_on_zk_readyhandler fires on everyCONNECTEDstate transition (including reconnects). Each invocation registers another persistent watch viaadd_watch(). Without a guard, this accumulates duplicate watchers that deliver duplicate events.Add a one-time flag to prevent re-registration:
Proposed fix
def __init__(self, app, zkc, service_name="asab.DiscoveryService") -> None: ... self._cache_lock = asyncio.Lock() self._ready_event = asyncio.Event() + self._zk_watch_installed = False self.App.PubSub.subscribe("Application.tick/600!", self._on_tick600) ... def _on_zk_ready(self, msg, zkcontainer): if zkcontainer != self.ZooKeeperContainer: return self.App.TaskService.schedule(self._rescan_advertised_instances()) - zkcontainer.ZooKeeper.Client.add_watch( - self.BasePath, - self._on_change_zookeeper_thread, - kazoo.protocol.states.AddWatchMode.PERSISTENT_RECURSIVE - ) + if not self._zk_watch_installed: + zkcontainer.ZooKeeper.Client.add_watch( + self.BasePath, + self._on_change_zookeeper_thread, + kazoo.protocol.states.AddWatchMode.PERSISTENT_RECURSIVE + ) + self._zk_watch_installed = True🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@asab/api/discovery.py` around lines 60 - 71, The _on_zk_ready handler currently calls zkcontainer.ZooKeeper.Client.add_watch every time it sees the ZooKeeperContainer (including reconnects), causing duplicate persistent watches; add a one-time guard flag (e.g., self._zk_watch_installed) on the class so that in _on_zk_ready you only call add_watch and schedule the rescan when the flag is false, then set the flag to True after successful registration; reference the symbols _on_zk_ready, ZooKeeperContainer, self.App.TaskService.schedule(self._rescan_advertised_instances()), self.BasePath, self._on_change_zookeeper_thread and add_watch to locate and change the code.
74-82:⚠️ Potential issue | 🟠 MajorCompare
event.stateagainstKazooStateenum, not a string literal.Line 75 compares
event.stateto the string'CONNECTED', but Kazoo'sWatchedEvent.stateis aKazooStateenum. This comparison may silently fail depending on how the enum's__eq__is implemented. Use the proper enum for robustness:+import kazoo.protocol.states + def _on_change_zookeeper_thread(self, event): - if event.state != 'CONNECTED': + if event.state != kazoo.protocol.states.KazooState.CONNECTED: returnNote:
kazoo.protocol.statesis already imported forAddWatchMode, so just use the full path or add an alias.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@asab/api/discovery.py` around lines 74 - 82, The check in _on_change_zookeeper_thread currently compares event.state to the string 'CONNECTED'; change it to compare against the KazooState enum (e.g. KazooState.CONNECTED) to ensure a reliable enum comparison. Update the code in the _on_change_zookeeper_thread function to use kazoo.protocol.states.KazooState.CONNECTED (or add an import alias for KazooState) instead of the string literal, leaving the rest of the method (including the event.path checks and the call to self.App.TaskService.schedule_threadsafe(self._on_change(...))) unchanged.setup.py (1)
88-88:⚠️ Potential issue | 🟠 MajorPin the Git dependency to a specific commit or tag for reproducible builds.
The unpinned VCS requirement can silently pull breaking changes and makes builds non-deterministic. While the PR notes this is temporary until the upstream Kazoo PR is merged, pinning to a specific commit ensures stability:
- 'kazoo @ git+https://github.com/TeskaLabs/kazoo.git', + 'kazoo @ git+https://github.com/TeskaLabs/kazoo.git@<commit-sha>',#!/bin/bash # Get the latest commit SHA from the TeskaLabs/kazoo fork to suggest pinning curl -s https://api.github.com/repos/TeskaLabs/kazoo/commits/master | jq -r '.sha'🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@setup.py` at line 88, Replace the unpinned VCS dependency string 'kazoo @ git+https://github.com/TeskaLabs/kazoo.git' with a pinned reference to a specific commit SHA or tag so installs are reproducible; update the dependency entry in setup.py to include the commit or tag suffix (e.g., append @<commit-or-tag>) and keep the package name identifier so pip can resolve it, choosing the latest stable commit from the TeskaLabs/kazoo fork until the upstream PR is merged.
🧹 Nitpick comments (1)
asab/api/discovery.py (1)
234-238: Consider narrowing the exception catch.The broad
except Exceptionat line 236 (flagged by Ruff BLE001) catches all exceptions including programming errors. While theCancelledErrorre-raise is correct, consider catching more specific exceptions (e.g.,kazoo.exceptions.KazooException,json.JSONDecodeError) to avoid masking unexpected bugs.That said, for a background rescan task, defensive broad catching with logging may be acceptable to prevent service crashes.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@asab/api/discovery.py` around lines 234 - 238, Replace the broad except Exception in the background scan that currently logs "Error when scanning advertised instances" with narrow exception handlers: catch kazoo.exceptions.KazooException and json.JSONDecodeError (and optionally ValueError/KeyError if parsing dicts) as specific exceptions and log them via L.exception including the exception info; keep the asyncio.CancelledError re-raise as-is; if you still want a defensive fallback, add a final generic except Exception as e that logs "Unexpected error when scanning advertised instances" with exc_info but does not swallow critical errors silently. Use the unique symbols L.exception and the scan coroutine (the block that logs "Error when scanning advertised instances") to locate and update the handlers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@asab/api/discovery.py`:
- Around line 240-249: Race window: callers (_rescan_advertised_instances and
_on_change) release _cache_lock then call _apply_advertised_raw which
re-acquires the lock, allowing concurrent mutation of _advertised_raw; fix by
having _apply_advertised_raw operate on a snapshot passed in by callers. Change
_apply_advertised_raw to accept an advertised snapshot argument (e.g.
advertised_snapshot) and have callers (_rescan_advertised_instances and
_on_change) create a shallow/deep copy of _advertised_raw while holding
_cache_lock and pass that copy to _apply_advertised_raw; remove or avoid
re-acquiring _cache_lock inside _apply_advertised_raw so it processes the stable
snapshot without race.
- Around line 85-110: The _on_change coroutine currently calls the blocking
kazoo Client.get() directly (in _on_change) which will block the asyncio loop;
wrap the blocking call in ProactorService.execute (same pattern used in
_iter_zk_items) so ZooKeeper.Client.get(self.BasePath + '/' + item) runs in the
threadpool and returns its result to the async function, preserving the existing
exception handling for SessionExpiredError, ConnectionLoss and NoNodeError; also
replace string literal event_type checks in _on_change with
kazoo.protocol.states.EventType (e.g., EventType.CREATED, EventType.CHANGED,
EventType.DELETED) to make the comparisons consistent.
---
Duplicate comments:
In `@asab/api/discovery.py`:
- Around line 60-71: The _on_zk_ready handler currently calls
zkcontainer.ZooKeeper.Client.add_watch every time it sees the ZooKeeperContainer
(including reconnects), causing duplicate persistent watches; add a one-time
guard flag (e.g., self._zk_watch_installed) on the class so that in _on_zk_ready
you only call add_watch and schedule the rescan when the flag is false, then set
the flag to True after successful registration; reference the symbols
_on_zk_ready, ZooKeeperContainer,
self.App.TaskService.schedule(self._rescan_advertised_instances()),
self.BasePath, self._on_change_zookeeper_thread and add_watch to locate and
change the code.
- Around line 74-82: The check in _on_change_zookeeper_thread currently compares
event.state to the string 'CONNECTED'; change it to compare against the
KazooState enum (e.g. KazooState.CONNECTED) to ensure a reliable enum
comparison. Update the code in the _on_change_zookeeper_thread function to use
kazoo.protocol.states.KazooState.CONNECTED (or add an import alias for
KazooState) instead of the string literal, leaving the rest of the method
(including the event.path checks and the call to
self.App.TaskService.schedule_threadsafe(self._on_change(...))) unchanged.
In `@setup.py`:
- Line 88: Replace the unpinned VCS dependency string 'kazoo @
git+https://github.com/TeskaLabs/kazoo.git' with a pinned reference to a
specific commit SHA or tag so installs are reproducible; update the dependency
entry in setup.py to include the commit or tag suffix (e.g., append
@<commit-or-tag>) and keep the package name identifier so pip can resolve it,
choosing the latest stable commit from the TeskaLabs/kazoo fork until the
upstream PR is merged.
---
Nitpick comments:
In `@asab/api/discovery.py`:
- Around line 234-238: Replace the broad except Exception in the background scan
that currently logs "Error when scanning advertised instances" with narrow
exception handlers: catch kazoo.exceptions.KazooException and
json.JSONDecodeError (and optionally ValueError/KeyError if parsing dicts) as
specific exceptions and log them via L.exception including the exception info;
keep the asyncio.CancelledError re-raise as-is; if you still want a defensive
fallback, add a final generic except Exception as e that logs "Unexpected error
when scanning advertised instances" with exc_info but does not swallow critical
errors silently. Use the unique symbols L.exception and the scan coroutine (the
block that logs "Error when scanning advertised instances") to locate and update
the handlers.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c9c9e630-0ae5-4375-a162-50f92c06a98c
📒 Files selected for processing (2)
asab/api/discovery.pysetup.py
| async def _on_change(self, item, event_type): | ||
| async with self._cache_lock: | ||
|
|
||
| if event_type == 'CREATED' or event_type == 'CHANGED': | ||
| # The item is new or changed - read the data and update the cache | ||
| try: | ||
| data, _stat = self.ZooKeeperContainer.ZooKeeper.Client.get(self.BasePath + '/' + item) | ||
| self._advertised_raw[item] = json.loads(data) | ||
| except (kazoo.exceptions.SessionExpiredError, kazoo.exceptions.ConnectionLoss): | ||
| L.warning("Connection to ZooKeeper lost. Discovery Service could not fetch up-to-date state of the cluster services.") | ||
| return | ||
| except kazoo.exceptions.NoNodeError: | ||
| return | ||
|
|
||
| elif event_type == 'DELETED': | ||
| # The item is deleted - remove it from the cache | ||
| prev = self._advertised_raw.pop(item, None) | ||
| if prev is None: | ||
| return | ||
|
|
||
| else: | ||
| L.warning("Unexpected event type: {}".format(event_type)) | ||
| return | ||
|
|
||
| def _on_zk_ready(self, msg, zkc): | ||
| if zkc == self.ZooKeeperContainer: | ||
| self.App.TaskService.schedule(self._rescan_advertised_instances()) | ||
| # Apply the changes to the cache | ||
| await self._apply_advertised_raw() |
There was a problem hiding this comment.
Blocking ZooKeeper call on the async event loop.
Line 91 calls self.ZooKeeperContainer.ZooKeeper.Client.get() directly inside an async method. Kazoo's client methods are synchronous and blocking—this will block the entire asyncio event loop until the ZooKeeper response arrives, degrading throughput and responsiveness.
Use ProactorService.execute() to run the blocking call in a thread pool, consistent with how _iter_zk_items handles it:
Proposed fix
async def _on_change(self, item, event_type):
async with self._cache_lock:
if event_type == 'CREATED' or event_type == 'CHANGED':
# The item is new or changed - read the data and update the cache
try:
- data, _stat = self.ZooKeeperContainer.ZooKeeper.Client.get(self.BasePath + '/' + item)
- self._advertised_raw[item] = json.loads(data)
+ def fetch_item():
+ return self.ZooKeeperContainer.ZooKeeper.Client.get(self.BasePath + '/' + item)
+ data, _stat = await self.ProactorService.execute(fetch_item)
+ self._advertised_raw[item] = json.loads(data)
except (kazoo.exceptions.SessionExpiredError, kazoo.exceptions.ConnectionLoss):Additionally, event_type comparisons at lines 88, 99, 105 use string literals. Consider using kazoo.protocol.states.EventType enum values for consistency (e.g., EventType.CREATED).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@asab/api/discovery.py` around lines 85 - 110, The _on_change coroutine
currently calls the blocking kazoo Client.get() directly (in _on_change) which
will block the asyncio loop; wrap the blocking call in ProactorService.execute
(same pattern used in _iter_zk_items) so ZooKeeper.Client.get(self.BasePath +
'/' + item) runs in the threadpool and returns its result to the async function,
preserving the existing exception handling for SessionExpiredError,
ConnectionLoss and NoNodeError; also replace string literal event_type checks in
_on_change with kazoo.protocol.states.EventType (e.g., EventType.CREATED,
EventType.CHANGED, EventType.DELETED) to make the comparisons consistent.
| await self._apply_advertised_raw() | ||
|
|
||
| advertised_raw = {} | ||
|
|
||
| try: | ||
| for item, item_data in await self._iter_zk_items(): | ||
| async def _apply_advertised_raw(self): | ||
| advertised = { | ||
| "instance_id": {}, | ||
| "service_id": {}, | ||
| } | ||
|
|
||
| advertised_raw[item] = item_data | ||
| async with self._cache_lock: |
There was a problem hiding this comment.
Race window between releasing lock and _apply_advertised_raw re-acquiring it.
In _rescan_advertised_instances, the _cache_lock is released at the end of the async with block (after line 238), then _apply_advertised_raw is called at line 240 which re-acquires the lock at line 249. Similarly, _on_change releases the lock after modifying _advertised_raw (line 107) before calling _apply_advertised_raw (line 110).
This creates a window where another coroutine can modify _advertised_raw between unlock and relock, potentially causing inconsistent state.
Consider either:
- Keeping
_apply_advertised_rawcall inside the lock, or - Making
_apply_advertised_rawwork on a snapshot of_advertised_raw
Option 1: Call _apply_advertised_raw inside the lock
async with self._cache_lock:
try:
prev_keys = set(self._advertised_raw.keys())
for item, item_data in await self._iter_zk_items():
self._advertised_raw[item] = item_data
prev_keys.discard(item)
for item in prev_keys:
self._advertised_raw.pop(item, None)
except asyncio.CancelledError:
raise
except Exception:
L.exception("Error when scanning advertised instances")
return
+ await self._apply_advertised_raw()
-
- await self._apply_advertised_raw()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@asab/api/discovery.py` around lines 240 - 249, Race window: callers
(_rescan_advertised_instances and _on_change) release _cache_lock then call
_apply_advertised_raw which re-acquires the lock, allowing concurrent mutation
of _advertised_raw; fix by having _apply_advertised_raw operate on a snapshot
passed in by callers. Change _apply_advertised_raw to accept an advertised
snapshot argument (e.g. advertised_snapshot) and have callers
(_rescan_advertised_instances and _on_change) create a shallow/deep copy of
_advertised_raw while holding _cache_lock and pass that copy to
_apply_advertised_raw; remove or avoid re-acquiring _cache_lock inside
_apply_advertised_raw so it processes the stable snapshot without race.
This PR introduces a significant change in how ASAB reacts to changes in Apache Zookeeper.
It leverages "persistent watches" - introduces in Zookeeper in version 3.6 and being introduced in Kazoo.
This PR particularly introduces this change into a Discovery service; the result is much more lighter (and more logical) pressure on the Apache Zookeeper.
There are other place in ASAB that can switch to this change
IMPORTANT: We need to use vendored version of Kazoo from https://github.com/TeskaLabs/kazoo till python-zk/kazoo#715 is merged.
Summary by CodeRabbit