Skip to content

Commit 5cffe17

Browse files
committed
oops
1 parent 4eb1ca9 commit 5cffe17

File tree

1 file changed

+245
-0
lines changed

1 file changed

+245
-0
lines changed
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Worker Orchestrator
4+
5+
Main entrypoint for Chronicle worker orchestration system.
6+
Replaces start-workers.sh bash script with Python-based orchestration.
7+
8+
Usage:
9+
python worker_orchestrator.py
10+
# Or via Docker: docker compose up workers
11+
12+
Environment Variables:
13+
REDIS_URL Redis connection URL (default: redis://localhost:6379/0)
14+
WORKER_CHECK_INTERVAL Health check interval in seconds (default: 10)
15+
MIN_RQ_WORKERS Minimum expected RQ workers (default: 6)
16+
WORKER_STARTUP_GRACE_PERIOD Grace period before health checks (default: 30)
17+
WORKER_SHUTDOWN_TIMEOUT Max wait for graceful shutdown (default: 30)
18+
LOG_LEVEL Logging level (default: INFO)
19+
"""
20+
21+
import asyncio
22+
import logging
23+
import os
24+
import signal
25+
import socket
26+
import sys
27+
from typing import Optional
28+
29+
from redis import Redis
30+
from rq import Worker
31+
32+
# Import orchestrator components
33+
from src.advanced_omi_backend.workers.orchestrator import (
34+
OrchestratorConfig,
35+
ProcessManager,
36+
HealthMonitor,
37+
build_worker_definitions,
38+
)
39+
40+
# Configure logging
41+
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
42+
logging.basicConfig(
43+
level=LOG_LEVEL,
44+
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
45+
stream=sys.stdout,
46+
)
47+
48+
logger = logging.getLogger(__name__)
49+
50+
51+
class WorkerOrchestrator:
52+
"""
53+
Main orchestrator that coordinates all components.
54+
55+
Handles:
56+
- Startup sequence (Redis cleanup, worker startup)
57+
- Signal handling (SIGTERM, SIGINT)
58+
- Health monitoring
59+
- Graceful shutdown
60+
"""
61+
62+
def __init__(self):
63+
self.config: Optional[OrchestratorConfig] = None
64+
self.redis: Optional[Redis] = None
65+
self.process_manager: Optional[ProcessManager] = None
66+
self.health_monitor: Optional[HealthMonitor] = None
67+
self.shutdown_event = asyncio.Event()
68+
69+
async def startup(self):
70+
"""
71+
Startup sequence.
72+
73+
1. Load configuration
74+
2. Connect to Redis
75+
3. Clean up stale worker registrations
76+
4. Build worker definitions
77+
5. Start all workers
78+
6. Setup signal handlers
79+
7. Start health monitor
80+
"""
81+
logger.info("🚀 Starting Chronicle Worker Orchestrator...")
82+
83+
# 1. Load configuration
84+
logger.info("Loading configuration...")
85+
self.config = OrchestratorConfig()
86+
logger.info(f"Redis URL: {self.config.redis_url}")
87+
logger.info(f"Check interval: {self.config.check_interval}s")
88+
logger.info(f"Min RQ workers: {self.config.min_rq_workers}")
89+
logger.info(f"Startup grace period: {self.config.startup_grace_period}s")
90+
91+
# 2. Connect to Redis
92+
logger.info("Connecting to Redis...")
93+
self.redis = Redis.from_url(self.config.redis_url)
94+
try:
95+
self.redis.ping()
96+
logger.info("✅ Redis connection successful")
97+
except Exception as e:
98+
logger.error(f"❌ Failed to connect to Redis: {e}")
99+
raise
100+
101+
# 3. Clean up stale worker registrations
102+
logger.info("🧹 Cleaning up stale worker registrations from Redis...")
103+
cleaned_count = self._cleanup_stale_workers()
104+
if cleaned_count > 0:
105+
logger.info(f"Cleaned up {cleaned_count} stale workers")
106+
else:
107+
logger.info("No stale workers to clean")
108+
109+
# 4. Build worker definitions
110+
logger.info("Building worker definitions...")
111+
worker_definitions = build_worker_definitions()
112+
logger.info(f"Total enabled workers: {len(worker_definitions)}")
113+
114+
# 5. Create process manager and start all workers
115+
logger.info("Starting all workers...")
116+
self.process_manager = ProcessManager(worker_definitions)
117+
success = self.process_manager.start_all()
118+
119+
if not success:
120+
logger.error("❌ Some workers failed to start")
121+
raise RuntimeError("Worker startup failed")
122+
123+
# Log worker status
124+
logger.info("✅ All workers started:")
125+
for worker in self.process_manager.get_all_workers():
126+
logger.info(
127+
f" - {worker.name}: PID {worker.pid} "
128+
f"(queues: {', '.join(worker.definition.queues) if worker.definition.queues else 'stream consumer'})"
129+
)
130+
131+
# 6. Setup signal handlers
132+
loop = asyncio.get_running_loop()
133+
for sig in (signal.SIGTERM, signal.SIGINT):
134+
loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(self._signal_handler(s)))
135+
136+
logger.info("✅ Signal handlers configured (SIGTERM, SIGINT)")
137+
138+
# 7. Start health monitor
139+
logger.info("Starting health monitor...")
140+
self.health_monitor = HealthMonitor(
141+
self.process_manager, self.config, self.redis
142+
)
143+
await self.health_monitor.start()
144+
logger.info("✅ Health monitor started")
145+
146+
logger.info("⏳ Workers running - health monitor will auto-restart failed workers")
147+
148+
def _cleanup_stale_workers(self) -> int:
149+
"""
150+
Clean up stale worker registrations from Redis.
151+
152+
This replicates the bash script's logic:
153+
- Only clean up workers from THIS hostname (pod-aware)
154+
- Use RQ's register_death() to properly clean up
155+
156+
Returns:
157+
Number of workers cleaned up
158+
"""
159+
try:
160+
hostname = socket.gethostname()
161+
workers = Worker.all(connection=self.redis)
162+
cleaned = 0
163+
164+
for worker in workers:
165+
if worker.hostname == hostname:
166+
worker.register_death()
167+
cleaned += 1
168+
169+
return cleaned
170+
171+
except Exception as e:
172+
logger.warning(f"Failed to clean up stale workers: {e}")
173+
return 0
174+
175+
async def _signal_handler(self, sig: signal.Signals):
176+
"""Handle shutdown signals"""
177+
logger.info(f"Received signal: {sig.name}")
178+
self.shutdown_event.set()
179+
180+
async def shutdown(self):
181+
"""
182+
Graceful shutdown sequence.
183+
184+
1. Stop health monitor
185+
2. Stop all workers
186+
3. Close Redis connection
187+
"""
188+
logger.info("🛑 Initiating graceful shutdown...")
189+
190+
# 1. Stop health monitor
191+
if self.health_monitor:
192+
await self.health_monitor.stop()
193+
194+
# 2. Stop all workers
195+
if self.process_manager:
196+
logger.info("Stopping all workers...")
197+
self.process_manager.stop_all(timeout=self.config.shutdown_timeout)
198+
199+
# 3. Close Redis connection
200+
if self.redis:
201+
logger.info("Closing Redis connection...")
202+
self.redis.close()
203+
204+
logger.info("✅ All workers stopped gracefully")
205+
206+
async def run(self):
207+
"""Main run loop - wait for shutdown signal"""
208+
try:
209+
# Perform startup
210+
await self.startup()
211+
212+
# Wait for shutdown signal
213+
await self.shutdown_event.wait()
214+
215+
except Exception as e:
216+
logger.error(f"❌ Orchestrator error: {e}", exc_info=True)
217+
raise
218+
finally:
219+
# Always perform shutdown
220+
await self.shutdown()
221+
222+
223+
async def main():
224+
"""Main entrypoint"""
225+
orchestrator = WorkerOrchestrator()
226+
227+
try:
228+
await orchestrator.run()
229+
sys.exit(0)
230+
231+
except KeyboardInterrupt:
232+
logger.info("Interrupted by user")
233+
sys.exit(0)
234+
235+
except Exception as e:
236+
logger.error(f"Fatal error: {e}", exc_info=True)
237+
sys.exit(1)
238+
239+
240+
if __name__ == "__main__":
241+
# Ensure unbuffered output for Docker logs
242+
os.environ["PYTHONUNBUFFERED"] = "1"
243+
244+
# Run the orchestrator
245+
asyncio.run(main())

0 commit comments

Comments
 (0)