Skip to content

Commit a4681c5

Browse files
committed
Refactor job status handling to align with RQ standards
- Updated job status checks across various modules to use "started" and "finished" instead of "processing" and "completed" for consistency with RQ's naming conventions. - Adjusted related logging and response messages to reflect the new status terminology. - Simplified Docker Compose project name handling in test scripts to avoid conflicts and improve clarity in test environment setup.
1 parent fb8225d commit a4681c5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+428
-403
lines changed

backends/advanced/docker-compose-test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# Isolated test environment for integration tests
33
# Uses different ports to avoid conflicts with development environment
44

5+
name: backend-test
6+
57
services:
68
chronicle-backend-test:
79
build:

backends/advanced/run-test.sh

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -219,17 +219,13 @@ if [ -d "./data/test_audio_chunks/" ] || [ -d "./data/test_data/" ] || [ -d "./d
219219
docker run --rm -v "$(pwd)/data:/data" alpine sh -c 'rm -rf /data/test_*' 2>/dev/null || true
220220
fi
221221

222-
# Use unique project name to avoid conflicts with development environment
223-
export COMPOSE_PROJECT_NAME="advanced-backend-test"
222+
# Note: Project name 'backend-test' is set in docker-compose-test.yml
223+
# No need to export COMPOSE_PROJECT_NAME - it's handled by the compose file
224224

225225
# Stop any existing test containers
226226
print_info "Stopping existing test containers..."
227-
# Try cleanup with current project name
228227
docker compose -f docker-compose-test.yml down -v || true
229228

230-
# Also try cleanup with default project name (in case containers were started without COMPOSE_PROJECT_NAME)
231-
COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true
232-
233229
# Run integration tests
234230
print_info "Running integration tests..."
235231
print_info "Using fresh mode (CACHED_MODE=False) for clean testing"
@@ -268,8 +264,6 @@ else
268264
if [ "${CLEANUP_CONTAINERS:-true}" != "false" ]; then
269265
print_info "Cleaning up test containers after failure..."
270266
docker compose -f docker-compose-test.yml down -v || true
271-
# Also cleanup with default project name
272-
COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true
273267
docker system prune -f || true
274268
else
275269
print_warning "Skipping cleanup (CLEANUP_CONTAINERS=false) - containers left running for debugging"
@@ -282,8 +276,6 @@ fi
282276
if [ "${CLEANUP_CONTAINERS:-true}" != "false" ]; then
283277
print_info "Cleaning up test containers..."
284278
docker compose -f docker-compose-test.yml down -v || true
285-
# Also cleanup with default project name
286-
COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true
287279
docker system prune -f || true
288280
else
289281
print_warning "Skipping cleanup (CLEANUP_CONTAINERS=false) - containers left running"

backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ async def upload_and_process_audio_files(
203203

204204
processed_files.append({
205205
"filename": file.filename,
206-
"status": "processing",
206+
"status": "started", # RQ standard: job has been enqueued
207207
"conversation_id": conversation_id,
208208
"transcript_job_id": transcription_job.id,
209209
"speaker_job_id": job_ids['speaker_recognition'],
@@ -233,7 +233,7 @@ async def upload_and_process_audio_files(
233233
"error": str(e),
234234
})
235235

236-
successful_files = [f for f in processed_files if f.get("status") == "processing"]
236+
successful_files = [f for f in processed_files if f.get("status") == "started"]
237237
failed_files = [f for f in processed_files if f.get("status") == "error"]
238238

239239
return {
@@ -242,7 +242,7 @@ async def upload_and_process_audio_files(
242242
"files": processed_files,
243243
"summary": {
244244
"total": len(files),
245-
"processing": len(successful_files),
245+
"started": len(successful_files), # RQ standard
246246
"failed": len(failed_files),
247247
},
248248
}

backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import redis
1919
from rq import Queue, Worker
20-
from rq.job import Job
20+
from rq.job import Job, JobStatus
2121
from rq.registry import ScheduledJobRegistry, DeferredJobRegistry
2222

2323
from advanced_omi_backend.models.job import JobPriority
@@ -30,6 +30,52 @@
3030
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
3131
redis_conn = redis.from_url(REDIS_URL)
3232

33+
34+
def get_job_status_from_rq(job: Job) -> str:
35+
"""
36+
Get job status using RQ's native method.
37+
38+
Uses job.get_status() which is the Redis Queue standard approach.
39+
Returns RQ's standard status names.
40+
41+
Returns one of: queued, started, finished, failed, deferred, scheduled, canceled, stopped
42+
43+
Raises:
44+
RuntimeError: If job status is unexpected (should never happen with RQ's method)
45+
"""
46+
rq_status = job.get_status()
47+
48+
# RQ returns status as JobStatus enum or string
49+
# Convert to string if it's an enum
50+
if isinstance(rq_status, JobStatus):
51+
status_str = rq_status.value
52+
else:
53+
status_str = str(rq_status)
54+
55+
# Validate it's a known RQ status
56+
valid_statuses = {
57+
JobStatus.QUEUED.value,
58+
JobStatus.STARTED.value,
59+
JobStatus.FINISHED.value,
60+
JobStatus.FAILED.value,
61+
JobStatus.DEFERRED.value,
62+
JobStatus.SCHEDULED.value,
63+
JobStatus.CANCELED.value,
64+
JobStatus.STOPPED.value,
65+
}
66+
67+
if status_str not in valid_statuses:
68+
logger.error(
69+
f"Job {job.id} has unexpected RQ status: {status_str}. "
70+
f"This indicates RQ library added a new status we don't know about."
71+
)
72+
raise RuntimeError(
73+
f"Job {job.id} has unknown RQ status: {status_str}. "
74+
f"Please update get_job_status_from_rq() to handle this new status."
75+
)
76+
77+
return status_str
78+
3379
# Queue name constants
3480
TRANSCRIPTION_QUEUE = "transcription"
3581
MEMORY_QUEUE = "memory"
@@ -61,34 +107,34 @@ def get_queue(queue_name: str = DEFAULT_QUEUE) -> Queue:
61107

62108

63109
def get_job_stats() -> Dict[str, Any]:
64-
"""Get statistics about jobs in all queues matching frontend expectations."""
110+
"""Get statistics about jobs in all queues using RQ standard status names."""
65111
total_jobs = 0
66112
queued_jobs = 0
67-
processing_jobs = 0
68-
completed_jobs = 0
113+
started_jobs = 0 # RQ standard: "started" not "processing"
114+
finished_jobs = 0 # RQ standard: "finished" not "completed"
69115
failed_jobs = 0
70-
cancelled_jobs = 0
116+
canceled_jobs = 0 # RQ standard: "canceled" not "cancelled"
71117
deferred_jobs = 0 # Jobs waiting for dependencies (depends_on)
72118

73119
for queue_name in QUEUE_NAMES:
74120
queue = get_queue(queue_name)
75121

76122
queued_jobs += len(queue)
77-
processing_jobs += len(queue.started_job_registry)
78-
completed_jobs += len(queue.finished_job_registry)
123+
started_jobs += len(queue.started_job_registry)
124+
finished_jobs += len(queue.finished_job_registry)
79125
failed_jobs += len(queue.failed_job_registry)
80-
cancelled_jobs += len(queue.canceled_job_registry)
126+
canceled_jobs += len(queue.canceled_job_registry)
81127
deferred_jobs += len(queue.deferred_job_registry)
82128

83-
total_jobs = queued_jobs + processing_jobs + completed_jobs + failed_jobs + cancelled_jobs + deferred_jobs
129+
total_jobs = queued_jobs + started_jobs + finished_jobs + failed_jobs + canceled_jobs + deferred_jobs
84130

85131
return {
86132
"total_jobs": total_jobs,
87133
"queued_jobs": queued_jobs,
88-
"processing_jobs": processing_jobs,
89-
"completed_jobs": completed_jobs,
134+
"started_jobs": started_jobs,
135+
"finished_jobs": finished_jobs,
90136
"failed_jobs": failed_jobs,
91-
"cancelled_jobs": cancelled_jobs,
137+
"canceled_jobs": canceled_jobs,
92138
"deferred_jobs": deferred_jobs,
93139
"timestamp": datetime.utcnow().isoformat()
94140
}
@@ -124,11 +170,11 @@ def get_jobs(
124170
for qname in queues_to_check:
125171
queue = get_queue(qname)
126172

127-
# Collect jobs from all registries
173+
# Collect jobs from all registries (using RQ standard status names)
128174
registries = [
129175
(queue.job_ids, "queued"),
130-
(queue.started_job_registry.get_job_ids(), "processing"),
131-
(queue.finished_job_registry.get_job_ids(), "completed"),
176+
(queue.started_job_registry.get_job_ids(), "started"), # RQ standard, not "processing"
177+
(queue.finished_job_registry.get_job_ids(), "finished"), # RQ standard, not "completed"
132178
(queue.failed_job_registry.get_job_ids(), "failed"),
133179
(queue.deferred_job_registry.get_job_ids(), "deferred"), # Jobs waiting for dependencies
134180
]

backends/advanced/src/advanced_omi_backend/controllers/session_controller.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ async def mark_session_complete(
5757
"""
5858
session_key = f"audio:session:{session_id}"
5959
await redis_client.hset(session_key, mapping={
60-
"status": "complete",
60+
"status": "finished",
6161
"completed_at": str(time.time()),
6262
"completion_reason": reason
6363
})
64-
logger.info(f"✅ Session {session_id[:12]} marked complete: {reason}")
64+
logger.info(f"✅ Session {session_id[:12]} marked finished: {reason}")
6565

6666

6767
async def get_session_info(redis_client, session_id: str) -> Optional[Dict]:
@@ -231,15 +231,15 @@ async def get_streaming_status(request):
231231
# Check if all jobs are complete (including failed jobs)
232232
all_jobs_done = all_jobs_complete_for_session(session_id)
233233

234-
# Session is completed if:
235-
# 1. Redis status says complete/finalized AND all jobs done, OR
236-
# 2. All jobs are done (even if status isn't complete yet)
237-
# This ensures sessions with failed jobs move to completed
238-
if status in ["complete", "completed", "finalized"] or all_jobs_done:
234+
# Session is finished if:
235+
# 1. Redis status says finished AND all jobs done, OR
236+
# 2. All jobs are done (even if status isn't finished yet)
237+
# This ensures sessions with failed jobs move to finished
238+
if status == "finished" or all_jobs_done:
239239
if all_jobs_done:
240-
# All jobs complete - this is truly a completed session
241-
# Update Redis status if it wasn't already marked complete
242-
if status not in ["complete", "completed", "finalized"]:
240+
# All jobs finished - this is truly a finished session
241+
# Update Redis status if it wasn't already marked finished
242+
if status != "finished":
243243
await mark_session_complete(redis_client, session_id, "all_jobs_complete")
244244

245245
# Get additional session data for completed sessions
@@ -251,7 +251,7 @@ async def get_streaming_status(request):
251251
"client_id": session_obj.get("client_id", ""),
252252
"conversation_id": session_data.get(b"conversation_id", b"").decode() if session_data and b"conversation_id" in session_data else None,
253253
"has_conversation": bool(session_data and session_data.get(b"conversation_id", b"")),
254-
"action": session_data.get(b"action", b"complete").decode() if session_data and b"action" in session_data else "complete",
254+
"action": session_data.get(b"action", b"finished").decode() if session_data and b"action" in session_data else "finished",
255255
"reason": session_data.get(b"reason", b"").decode() if session_data and b"reason" in session_data else "",
256256
"completed_at": session_obj.get("last_chunk_at", 0),
257257
"audio_file": session_data.get(b"audio_file", b"").decode() if session_data and b"audio_file" in session_data else "",
@@ -450,26 +450,26 @@ async def get_streaming_status(request):
450450
rq_stats = {
451451
"transcription_queue": {
452452
"queued": transcription_queue.count,
453-
"processing": len(transcription_queue.started_job_registry),
454-
"completed": len(transcription_queue.finished_job_registry),
453+
"started": len(transcription_queue.started_job_registry),
454+
"finished": len(transcription_queue.finished_job_registry),
455455
"failed": len(transcription_queue.failed_job_registry),
456-
"cancelled": len(transcription_queue.canceled_job_registry),
456+
"canceled": len(transcription_queue.canceled_job_registry),
457457
"deferred": len(transcription_queue.deferred_job_registry)
458458
},
459459
"memory_queue": {
460460
"queued": memory_queue.count,
461-
"processing": len(memory_queue.started_job_registry),
462-
"completed": len(memory_queue.finished_job_registry),
461+
"started": len(memory_queue.started_job_registry),
462+
"finished": len(memory_queue.finished_job_registry),
463463
"failed": len(memory_queue.failed_job_registry),
464-
"cancelled": len(memory_queue.canceled_job_registry),
464+
"canceled": len(memory_queue.canceled_job_registry),
465465
"deferred": len(memory_queue.deferred_job_registry)
466466
},
467467
"default_queue": {
468468
"queued": default_queue.count,
469-
"processing": len(default_queue.started_job_registry),
470-
"completed": len(default_queue.finished_job_registry),
469+
"started": len(default_queue.started_job_registry),
470+
"finished": len(default_queue.finished_job_registry),
471471
"failed": len(default_queue.failed_job_registry),
472-
"cancelled": len(default_queue.canceled_job_registry),
472+
"canceled": len(default_queue.canceled_job_registry),
473473
"deferred": len(default_queue.deferred_job_registry)
474474
}
475475
}

backends/advanced/src/advanced_omi_backend/routers/modules/obsidian_routes.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,14 +176,12 @@ async def get_status(job_id: str, current_user: User = Depends(current_active_us
176176
status = job.get_status()
177177
if status == "started":
178178
status = "running"
179-
if status == "canceled":
180-
status = "cancelled"
181-
179+
182180
# Get metadata
183181
meta = job.meta or {}
184-
182+
185183
# If meta has status, prefer it (for granular updates)
186-
if "status" in meta and meta["status"] in ("running", "completed", "failed", "cancelled"):
184+
if "status" in meta and meta["status"] in ("running", "finished", "failed", "canceled"):
187185
status = meta["status"]
188186

189187
total = meta.get("total_files", 0)

0 commit comments

Comments
 (0)