WIP: runs, but with video frame freezing and artifacts. #1
WIP: runs, but with video frame freezing and artifacts. #1
Conversation
…st coderabbit for a 2nd opinion
📝 WalkthroughWalkthroughThe PR adds comprehensive audio support to the MOQ source plugin, introducing audio track detection, separate frame queues for audio and video, a dedicated playback thread with frame synchronization primitives, and audio decoder lifecycle management integrated throughout the catalog and session workflow. Changes
Sequence DiagramsequenceDiagram
participant MOQ as MOQ Receiver
participant VQ as Video Queue
participant AQ as Audio Queue
participant PT as Playback Thread
participant VD as Video Decoder
participant AD as Audio Decoder
participant OBS as OBS Output
MOQ->>MOQ: on_catalog: detect audio/video tracks
MOQ->>AD: init_audio_decoder()
MOQ->>PT: start_playback()
rect rgb(200, 220, 255)
Note over MOQ,OBS: Frame Reception & Queuing
end
MOQ->>VQ: on_video_frame: queue frame
MOQ->>AQ: on_audio_frame: queue frame
rect rgb(220, 240, 220)
Note over PT,OBS: Playback Thread Processing
end
loop Playback Active
PT->>PT: wait on semaphore
PT->>VQ: frame_queue_pop (video)
alt Video Frame Available
PT->>VD: decode_queued_video_frame()
VD->>OBS: output video
end
PT->>AQ: frame_queue_pop (audio)
alt Audio Frame Available
PT->>AD: decode audio
AD->>OBS: output audio
end
end
MOQ->>PT: stop_playback()
PT->>AD: destroy_audio_decoder_locked()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (2)
src/moq-source.cpp (2)
1353-1576: Consider extracting shared video decoding logic to reduce duplication.
moq_source_decode_queued_video_frameduplicates most ofmoq_source_decode_frame(packet creation, decoder send/receive, scaler reinitialization, scaling, output). This makes maintenance harder and increases risk of divergence.Consider extracting the core decoding logic into a shared helper that both functions can call.
1356-1357: Reduce log verbosity in hot paths.
LOG_INFOstatements at lines 1356, 1570, 1575, and 1758 execute for every video frame (potentially 30-60+ times per second). This floods logs and may impact performance. Consider changing these toLOG_DEBUG.Also applies to: 1570-1571, 1575-1575, 1758-1758
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/moq-source.cpp
🔇 Additional comments (8)
src/moq-source.cpp (8)
58-73: LGTM!Audio codec detection follows the same pattern as existing video codecs and uses correct FFmpeg codec IDs.
78-101: LGTM!Speaker layout and sample format conversion helpers are well-structured with safe fallbacks for unknown formats.
127-139: LGTM!Frame queue structures are well-designed for buffering incoming frames with appropriate synchronization primitives.
250-268: LGTM!Frame queue and playback state initialization is thorough and correctly initializes all synchronization primitives.
1002-1074: LGTM!Audio decoder initialization follows the same robust pattern as video decoder initialization, with proper error handling and mutex-protected state swapping.
628-680: LGTM!Audio frame callback follows the same pattern as video frame callback with proper shutdown checks and frame queuing.
1776-1776: LGTM!Correctly adds
OBS_SOURCE_AUDIOflag to advertise audio output capability.
316-332: LGTM!Queue cleanup correctly drains all frames and destroys synchronization primitives in the proper order.
| moq_source_destroy_decoder_locked(ctx); | ||
| moq_source_destroy_audio_decoder_locked(ctx); | ||
|
|
||
| // Stop playback thread | ||
| moq_source_stop_playback(ctx); | ||
|
|
There was a problem hiding this comment.
Potential deadlock: pthread_join called while ctx->mutex is held indirectly.
moq_source_disconnect_locked requires the caller to hold ctx->mutex (as noted in the comment at line 838). It calls moq_source_stop_playback, which calls pthread_join. If the playback thread is inside moq_source_decode_queued_video_frame (which acquires ctx->mutex at line 1364), the following deadlock occurs:
- Main thread: holds
ctx->mutex, blocks onpthread_join - Playback thread: blocks trying to acquire
ctx->mutex
🔎 Proposed fix: release mutex before joining playback thread
Refactor moq_source_stop_playback to signal the thread to stop, release mutex, then join:
// NOTE: Caller must hold ctx->mutex when calling this function
static void moq_source_disconnect_locked(struct moq_source *ctx)
{
// ... track cleanup ...
moq_source_destroy_decoder_locked(ctx);
moq_source_destroy_audio_decoder_locked(ctx);
- // Stop playback thread
- moq_source_stop_playback(ctx);
+ // Signal playback thread to stop (don't join yet - mutex still held)
+ if (ctx->playback_thread_valid) {
+ ctx->playback_active = false;
+ os_sem_post(ctx->playback_sem);
+ }
ctx->got_keyframe = false;
ctx->frames_waiting_for_keyframe = 0;
ctx->consecutive_decode_errors = 0;
}Then join and cleanup after releasing the mutex in the calling code (e.g., in moq_source_destroy after pthread_mutex_unlock).
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/moq-source.cpp around lines 871 to 876, calling moq_source_stop_playback
(which performs pthread_join) while ctx->mutex is held can deadlock if the
playback thread is blocked trying to acquire ctx->mutex; change the flow so
moq_source_stop_playback only signals the playback thread to stop and does not
call pthread_join while holding the mutex, and move the pthread_join and any
thread cleanup to after the caller releases ctx->mutex (e.g., in
moq_source_destroy): specifically, refactor moq_source_stop_playback to set a
stop flag/condition and signal the thread, remove the join from that function,
then after unlocking ctx->mutex in the caller, call a new function (or the
existing stop variant) that performs pthread_join and final cleanup.
| // Helper function to pop frame from queue (with timeout) | ||
| static bool frame_queue_pop(struct frame_queue *queue, struct queued_frame *frame, uint32_t timeout_ms) | ||
| { | ||
| uint64_t start_time = os_gettime_ns() / 1000000; // milliseconds | ||
|
|
||
| while (os_gettime_ns() / 1000000 - start_time < timeout_ms) { | ||
| if (os_sem_wait(queue->sem) == 0) { | ||
| pthread_mutex_lock(&queue->mutex); | ||
| if (!queue->frames.empty()) { | ||
| *frame = queue->frames.front(); | ||
| queue->frames.pop_front(); | ||
| pthread_mutex_unlock(&queue->mutex); | ||
| return true; | ||
| } | ||
| pthread_mutex_unlock(&queue->mutex); | ||
| } | ||
| os_sleep_ms(1); // Small sleep to avoid busy waiting | ||
| } | ||
|
|
||
| return false; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for os_sem_timedwait and os_sem_wait in OBS headers
rg -n "os_sem_timedwait|os_sem_wait" --type=c --type=cppRepository: davegullo/obs-moq
Length of output: 122
🏁 Script executed:
#!/bin/bash
# Also search for semaphore function definitions in header files
fd -e h | xargs grep -l "os_sem" | head -20Repository: davegullo/obs-moq
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Look for the frame_queue_pop function to confirm it matches the review
rg -A 25 "static bool frame_queue_pop" src/moq-source.cppRepository: davegullo/obs-moq
Length of output: 1654
🏁 Script executed:
#!/bin/bash
# Search for all os_sem functions to see what's available
rg "os_sem_\w+" -o | sort | uniqRepository: davegullo/obs-moq
Length of output: 215
🏁 Script executed:
#!/bin/bash
# Look for OBS SDK header files or includes related to semaphores
rg -n "os_sem" --type=c --type=cpp | head -30Repository: davegullo/obs-moq
Length of output: 685
🏁 Script executed:
#!/bin/bash
# Check if OBS SDK is included and where from
rg -n "include.*util/threading" --type=c --type=cpp | head -10Repository: davegullo/obs-moq
Length of output: 109
🏁 Script executed:
#!/bin/bash
# Look for semaphore related code or patterns
rg -n "sem_wait|sem_timedwait" --type=c --type=cppRepository: davegullo/obs-moq
Length of output: 122
🌐 Web query:
OBS Studio SDK os_sem_timedwait semaphore
💡 Result:
Short answer: libobs does not provide os_sem_timedwait.
Details/alternatives:
- The official libobs threading API documents os_sem_t (init/destroy/post/wait) but there is no os_sem_timedwait function; only os_event_timedwait (milliseconds) is provided for timed waits. [1]
- Workarounds: use os_event_timedwait (if an event suits your need), or implement a timed semaphore wait yourself (e.g. try os_sem_wait/os_sem_try in a loop with a sleep and timeout, or use the platform's native sem_timedwait/pthread primitives where available). [1][2]
Sources:
- libobs Threading API (Semaphore & Event functions). [1]
- Archived libobs threading doc/source listing (shows semaphore functions but no timedwait). [2]
🌐 Web query:
OBS Studio SDK threading util.h os_sem functions
💡 Result:
The OBS libobs util threading API provides semaphore functions in util/threading.h:
- int os_sem_init(os_sem_t **sem, int value) — create a semaphore with initial count (returns 0 on success). [1]
- void os_sem_destroy(os_sem_t *sem) — destroy a semaphore. [1]
- int os_sem_post(os_sem_t *sem) — increment (signal) the semaphore (returns 0 on success). [1]
- int os_sem_wait(os_sem_t *sem) — decrement (wait) the semaphore; blocks until available (returns 0 on success). [1]
You can find these in the libobs util threading reference; the general util API docs are here as well. [1][2]
Sources:
[1] OBS Studio — Threading (util/threading.h) reference.
[2] OBS Studio — Platform/Utility API Reference (libobs/util).
🏁 Script executed:
#!/bin/bash
# Check the OBS SDK repository for threading.h
# Search for what semaphore functions are actually available
rg "os_sem_" --type=c --type=cpp | grep -E "(define|typedef|^.*\()" | head -20Repository: davegullo/obs-moq
Length of output: 559
Blocking semaphore wait defeats the timeout mechanism.
os_sem_wait blocks indefinitely until signaled. The timeout-based while loop cannot interrupt this blocking call, making the timeout_ms parameter ineffective. This causes the playback thread to hang indefinitely when no frames arrive.
Additionally, a race condition exists: if the semaphore is signaled but the queue is empty (due to direct deque manipulation elsewhere), the semaphore count becomes mismatched with the queue size.
Note: os_sem_timedwait is not available in the OBS SDK. Use os_event_timedwait (which OBS provides for timed waits) or implement timed semaphore behavior using os_sem_wait with a polling loop and timeout tracking.
🤖 Prompt for AI Agents
In src/moq-source.cpp around lines 1587 to 1607, replace the current blocking
os_sem_wait-based loop (which defeats timeout_ms and allows semaphore/queue
count races) with a timed-wait approach and proper locking: use OBS's
os_event_timedwait (or implement a timed polling of os_sem_trywait/os_sem_wait
with timeout tracking) so the call returns when timeout expires, and always
check/modify queue->frames under queue->mutex; only consume the semaphore when
you successfully pop a frame (or otherwise ensure semaphore count stays in
sync), and preserve the outer timeout loop to return false on overall timeout.
| static void moq_source_start_playback(struct moq_source *ctx) | ||
| { | ||
| if (ctx->playback_thread_valid) | ||
| return; | ||
|
|
||
| ctx->playback_active = true; | ||
| ctx->start_timestamp_us = 0; | ||
| ctx->base_system_ts = 0; |
There was a problem hiding this comment.
start_timestamp_us is never set from actual frame timestamps — likely root cause of video freezing.
start_timestamp_us is initialized to 0 but never updated from the first frame's actual timestamp. When used at lines 1569 and 1725 to calculate relative timestamps (qframe->timestamp_us - ctx->start_timestamp_us), this produces incorrect values if source timestamps aren't already zero-based.
If incoming frame timestamps are absolute values (e.g., wall clock time), the calculated timestamps will be enormous, causing OBS to misinterpret frame timing and resulting in the reported freezing and artifacts.
🔎 Proposed fix to initialize start_timestamp_us from first frame
In the playback thread, after waiting for audio frames and before setting base_system_ts:
// Wait for some audio frames to arrive before starting
while (ctx->playback_active && ctx->audio_queue.frames.empty()) {
os_sleep_ms(10);
}
if (!ctx->playback_active)
return NULL;
+ // Initialize start timestamp from first frame
+ pthread_mutex_lock(&ctx->audio_queue.mutex);
+ if (!ctx->audio_queue.frames.empty()) {
+ ctx->start_timestamp_us = ctx->audio_queue.frames.front().timestamp_us;
+ }
+ pthread_mutex_unlock(&ctx->audio_queue.mutex);
+
// Initialize timing - start timestamp is set, now set the playback start time
ctx->base_system_ts = (int64_t)os_gettime_ns() / 1000;Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/moq-source.cpp around lines 1626-1633, start_timestamp_us is initialized
to 0 but never set from the first incoming frame, causing huge relative
timestamps and playback freezes; update the playback thread so that after you
wait for the first audio/video frame and obtain its actual timestamp (the first
qframe timestamp used for playback) you assign ctx->start_timestamp_us =
first_frame_timestamp_us (only once, e.g. guarded by ctx->start_timestamp_us ==
0) before computing or assigning ctx->base_system_ts and proceeding with
playback, ensuring all subsequent timestamp calculations use the zero-relative
start_timestamp_us.
| // Wait for some audio frames to arrive before starting | ||
| while (ctx->playback_active && ctx->audio_queue.frames.empty()) { | ||
| os_sleep_ms(10); | ||
| } |
There was a problem hiding this comment.
Data race: checking frames.empty() without holding mutex.
The playback thread checks ctx->audio_queue.frames.empty() without holding audio_queue.mutex, while on_audio_frame may be modifying the queue concurrently. This is undefined behavior for std::deque.
🔎 Proposed fix
// Wait for some audio frames to arrive before starting
- while (ctx->playback_active && ctx->audio_queue.frames.empty()) {
+ while (ctx->playback_active) {
+ pthread_mutex_lock(&ctx->audio_queue.mutex);
+ bool empty = ctx->audio_queue.frames.empty();
+ pthread_mutex_unlock(&ctx->audio_queue.mutex);
+ if (!empty)
+ break;
os_sleep_ms(10);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Wait for some audio frames to arrive before starting | |
| while (ctx->playback_active && ctx->audio_queue.frames.empty()) { | |
| os_sleep_ms(10); | |
| } | |
| // Wait for some audio frames to arrive before starting | |
| while (ctx->playback_active) { | |
| pthread_mutex_lock(&ctx->audio_queue.mutex); | |
| bool empty = ctx->audio_queue.frames.empty(); | |
| pthread_mutex_unlock(&ctx->audio_queue.mutex); | |
| if (!empty) | |
| break; | |
| os_sleep_ms(10); | |
| } |
🤖 Prompt for AI Agents
In src/moq-source.cpp around lines 1679-1682, the playback thread reads
ctx->audio_queue.frames.empty() without holding audio_queue.mutex which causes a
data race; fix by acquiring audio_queue.mutex before checking frames and
replacing the busy-wait with a proper wait on the audio queue condition variable
(or a loop that locks the mutex, checks frames.empty() and waits on
audio_queue.condvar while playback_active and frames.empty()), re-checking
playback_active inside the locked scope and only proceeding when frames are
available; ensure the mutex is released while sleeping/waiting via the condition
variable to avoid blocking producers.
| // Process any available video frames (simpler approach) | ||
| pthread_mutex_lock(&ctx->video_queue.mutex); | ||
| while (!ctx->video_queue.frames.empty()) { | ||
| video_frame = ctx->video_queue.frames.front(); | ||
| ctx->video_queue.frames.pop_front(); | ||
| // Note: we don't post to semaphore here since we're consuming | ||
| pthread_mutex_unlock(&ctx->video_queue.mutex); | ||
|
|
||
| LOG_INFO("Processing video frame at timestamp %llu", video_frame.timestamp_us); | ||
| moq_source_decode_queued_video_frame(ctx, &video_frame); | ||
| bfree(video_frame.data); | ||
|
|
||
| pthread_mutex_lock(&ctx->video_queue.mutex); | ||
| } | ||
| pthread_mutex_unlock(&ctx->video_queue.mutex); |
There was a problem hiding this comment.
Video queue semaphore never consumed, causing count mismatch.
Video frames are popped directly from the deque without calling os_sem_wait on video_queue.sem, while frame_queue_push increments the semaphore for each pushed frame. This causes the semaphore count to grow unbounded and become desynchronized from the actual queue size.
🔎 Proposed fix to consume semaphore when popping video frames
// Process any available video frames (simpler approach)
pthread_mutex_lock(&ctx->video_queue.mutex);
while (!ctx->video_queue.frames.empty()) {
+ // Consume the semaphore to keep it synchronized
+ os_sem_wait(ctx->video_queue.sem);
+
video_frame = ctx->video_queue.frames.front();
ctx->video_queue.frames.pop_front();
- // Note: we don't post to semaphore here since we're consuming
pthread_mutex_unlock(&ctx->video_queue.mutex);
LOG_INFO("Processing video frame at timestamp %llu", video_frame.timestamp_us);
moq_source_decode_queued_video_frame(ctx, &video_frame);
bfree(video_frame.data);
pthread_mutex_lock(&ctx->video_queue.mutex);
}
pthread_mutex_unlock(&ctx->video_queue.mutex);🤖 Prompt for AI Agents
In src/moq-source.cpp around lines 1750-1764 the video queue consumer pops
frames without decrementing video_queue.sem, desynchronizing the semaphore from
actual queue size; fix by consuming the semaphore for each frame: before popping
a frame call os_sem_wait(&ctx->video_queue.sem), then lock the mutex, re-check
that frames is not empty (handle spurious wakeups/races), pop_front the frame,
unlock and proceed to process/ free it; this ensures the semaphore count stays
in sync with pushes.
pushing to test coderabbit for a 2nd opinion.
See attached for sample of video artifacts:

Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.