Skip to content

Commit 112a280

Browse files
committed
Add bulk restart mechanism for RQ worker registration loss
- Introduced a new method `_handle_registration_loss` to manage RQ worker registration loss, replicating the behavior of the previous bash script. - Implemented a cooldown period to prevent frequent restarts during network issues. - Added logging for bulk restart actions and their outcomes to enhance monitoring and debugging capabilities. - Created a `_restart_all_rq_workers` method to facilitate the bulk restart of RQ workers, ensuring they re-register with Redis upon startup.
1 parent 7e05de9 commit 112a280

File tree

1 file changed

+83
-0
lines changed

1 file changed

+83
-0
lines changed

backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ def __init__(
4242
self.running = False
4343
self.monitor_task: Optional[asyncio.Task] = None
4444
self.start_time = time.time()
45+
self.last_registration_recovery: Optional[float] = None
46+
self.registration_recovery_cooldown = 60 # seconds
4547

4648
async def start(self):
4749
"""Start the health monitoring loop"""
@@ -112,6 +114,10 @@ async def _check_health(self):
112114
# Check RQ worker registration count
113115
rq_health = self._check_rq_worker_registration()
114116

117+
# If RQ workers lost registration, trigger bulk restart (matches old bash script behavior)
118+
if not rq_health:
119+
self._handle_registration_loss()
120+
115121
# Restart failed workers
116122
self._restart_failed_workers()
117123

@@ -201,6 +207,83 @@ def _restart_failed_workers(self):
201207
else:
202208
logger.error(f"{worker.name}: Restart failed")
203209

210+
def _handle_registration_loss(self):
211+
"""
212+
Handle RQ worker registration loss.
213+
214+
This replicates the old bash script's self-healing behavior:
215+
- Check if cooldown period has passed
216+
- Restart all RQ workers (bulk restart)
217+
- Update recovery timestamp
218+
219+
Cooldown prevents too-frequent restarts during Redis/network issues.
220+
"""
221+
current_time = time.time()
222+
223+
# Check if cooldown period has passed
224+
if self.last_registration_recovery is not None:
225+
elapsed = current_time - self.last_registration_recovery
226+
if elapsed < self.registration_recovery_cooldown:
227+
remaining = self.registration_recovery_cooldown - elapsed
228+
logger.debug(
229+
f"Registration recovery cooldown active - "
230+
f"waiting {remaining:.0f}s before next recovery attempt"
231+
)
232+
return
233+
234+
logger.warning(
235+
"⚠️ RQ worker registration loss detected - initiating bulk restart "
236+
"(replicating old start-workers.sh behavior)"
237+
)
238+
239+
# Restart all RQ workers
240+
success = self._restart_all_rq_workers()
241+
242+
if success:
243+
logger.info("✅ Bulk restart completed - workers should re-register soon")
244+
else:
245+
logger.error("❌ Bulk restart encountered errors - check individual worker logs")
246+
247+
# Update recovery timestamp to start cooldown
248+
self.last_registration_recovery = current_time
249+
250+
def _restart_all_rq_workers(self) -> bool:
251+
"""
252+
Restart all RQ workers (bulk restart).
253+
254+
This matches the old bash script's recovery mechanism:
255+
- Kill all RQ workers
256+
- Restart them
257+
- Workers will automatically re-register with Redis on startup
258+
259+
Returns:
260+
True if all RQ workers restarted successfully, False otherwise
261+
"""
262+
rq_workers = [
263+
worker
264+
for worker in self.process_manager.get_all_workers()
265+
if worker.definition.worker_type == WorkerType.RQ_WORKER
266+
]
267+
268+
if not rq_workers:
269+
logger.warning("No RQ workers found to restart")
270+
return False
271+
272+
logger.info(f"Restarting {len(rq_workers)} RQ workers...")
273+
274+
all_success = True
275+
for worker in rq_workers:
276+
logger.info(f" ↻ Restarting {worker.name}...")
277+
success = self.process_manager.restart_worker(worker.name)
278+
279+
if success:
280+
logger.info(f" ✓ {worker.name} restarted successfully")
281+
else:
282+
logger.error(f" ✗ {worker.name} restart failed")
283+
all_success = False
284+
285+
return all_success
286+
204287
def get_health_status(self) -> dict:
205288
"""
206289
Get current health status summary.

0 commit comments

Comments
 (0)