From 04c82e05d3c7458dcd6d3233ba4a5dc2c11fdccf Mon Sep 17 00:00:00 2001 From: nasso Date: Tue, 27 Aug 2024 15:52:05 +0200 Subject: [PATCH 1/5] link: fix post init link --- src/link.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/link.c b/src/link.c index c073572..4114384 100644 --- a/src/link.c +++ b/src/link.c @@ -399,7 +399,6 @@ int sp_generic_link(TXMainContext *ctx, SPEventType src_post_init = sp_eventlist_has_dispatched(src_events, SP_EVENT_ON_INIT); - src_post_init = 0; if (src_post_init) flags |= SP_EVENT_ON_COMMIT; else From b566eb073e4a4d9f400999e7c839f70dad092f35 Mon Sep 17 00:00:00 2001 From: nasso Date: Tue, 27 Aug 2024 15:52:05 +0200 Subject: [PATCH 2/5] fifo: poking - Added a new FIFO flag: `XXX_FIFO_PULL_POKE` - Added `sp_xxx_fifo_poke` to send a "poke" to a FIFO, forcing any pull operation with the `XXX_FIFO_PULL_POKE` flag to return with `EAGAIN` - Added `sp_xxx_fifo_peek_flags` - Added more logging --- src/fifo_frame.c | 11 +- src/fifo_packet.c | 13 +- src/fifo_template.c | 209 +++++++++++++++++++++------ src/include/libtxproto/fifo_frame.h | 19 ++- src/include/libtxproto/fifo_packet.h | 19 ++- 5 files changed, 191 insertions(+), 80 deletions(-) diff --git a/src/fifo_frame.c b/src/fifo_frame.c index 6a18f28..b2993f9 100644 --- a/src/fifo_frame.c +++ b/src/fifo_frame.c @@ -3,19 +3,10 @@ #define FRENAME(x) FRAME_FIFO_ ## x #define RENAME(x) sp_frame_ ##x #define PRIV_RENAME(x) frame_ ##x -#define FNAME enum SPFrameFIFOFlags +#define FNAME SPFrameFIFOFlags #define SNAME SPFrameFIFO #define FREE_FN av_frame_free #define CLONE_FN(x) ((x) ? av_frame_clone((x)) : NULL) #define TYPE AVFrame #include "fifo_template.c" - -#undef TYPE -#undef CLONE_FN -#undef FREE_FN -#undef SNAME -#undef FNAME -#undef PRIV_RENAME -#undef RENAME -#undef FRENAME diff --git a/src/fifo_packet.c b/src/fifo_packet.c index 74821e4..96a0366 100644 --- a/src/fifo_packet.c +++ b/src/fifo_packet.c @@ -2,20 +2,11 @@ #define FRENAME(x) PACKET_FIFO_ ## x #define RENAME(x) sp_packet_ ##x -#define PRIV_RENAME(x) packet ##x -#define FNAME enum SPPacketFIFOFlags +#define PRIV_RENAME(x) packet_ ##x +#define FNAME SPPacketFIFOFlags #define SNAME SPPacketFIFO #define FREE_FN av_packet_free #define CLONE_FN(x) ((x) ? av_packet_clone((x)) : NULL) #define TYPE AVPacket #include "fifo_template.c" - -#undef TYPE -#undef CLONE_FN -#undef FREE_FN -#undef SNAME -#undef FNAME -#undef PRIV_RENAME -#undef RENAME -#undef FRENAME diff --git a/src/fifo_template.c b/src/fifo_template.c index c50cee6..787b33b 100644 --- a/src/fifo_template.c +++ b/src/fifo_template.c @@ -16,6 +16,10 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include + +#include +#include #include typedef struct SNAME { @@ -24,6 +28,7 @@ typedef struct SNAME { int max_queued; FNAME block_flags; unsigned int queued_alloc_size; + bool poked; pthread_mutex_t lock; pthread_cond_t cond_in; pthread_cond_t cond_out; @@ -97,11 +102,28 @@ AVBufferRef *RENAME(fifo_create)(void *opaque, int max_queued, FNAME block_flags int RENAME(fifo_mirror)(AVBufferRef *dst, AVBufferRef *src) { + if (!dst || !src) + return AVERROR(EINVAL); + SNAME *dst_ctx = (SNAME *)dst->data; SNAME *src_ctx = (SNAME *)src->data; - if (!dst || !src) - return AVERROR(EINVAL); + void *src_class = av_buffer_get_opaque(src); + const char *src_class_name = sp_class_get_name(src_class); + + if (sp_log_get_ctx_lvl(src_class_name) >= SP_LOG_VERBOSE) { + void *dst_class = av_buffer_get_opaque(dst); + const char *dst_class_name = sp_class_get_name(dst_class); + const char *dst_class_type = sp_class_type_string(dst_class); + const char *src_class_type = sp_class_type_string(src_class); + + sp_log(src_class, SP_LOG_VERBOSE, + "Mirroring output FIFO from \"%s\" (%s) to \"%s\" (%s)\n", + src_class_name ? src_class_name : "unknown", + src_class_type ? src_class_type : "unknown", + dst_class_name ? dst_class_name : "unknown", + dst_class_type ? dst_class_type : "unknown"); + } sp_bufferlist_append(dst_ctx->sources, src); sp_bufferlist_append(src_ctx->dests, dst); @@ -111,6 +133,26 @@ int RENAME(fifo_mirror)(AVBufferRef *dst, AVBufferRef *src) int RENAME(fifo_unmirror)(AVBufferRef *dst, AVBufferRef *src) { + if (!dst || !src) + return AVERROR(EINVAL); + + void *src_class = av_buffer_get_opaque(src); + const char *src_class_name = sp_class_get_name(src_class); + + if (sp_log_get_ctx_lvl(src_class_name) >= SP_LOG_VERBOSE) { + void *dst_class = av_buffer_get_opaque(dst); + const char *dst_class_name = sp_class_get_name(dst_class); + const char *dst_class_type = sp_class_type_string(dst_class); + const char *src_class_type = sp_class_type_string(src_class); + + sp_log(src_class, SP_LOG_VERBOSE, + "Unmirroring output FIFO from \"%s\" (%s) to \"%s\" (%s)\n", + src_class_name ? src_class_name : "unknown", + src_class_type ? src_class_type : "unknown", + dst_class_name ? dst_class_name : "unknown", + dst_class_type ? dst_class_type : "unknown"); + } + SNAME *dst_ctx = (SNAME *)dst->data; SNAME *src_ctx = (SNAME *)src->data; @@ -127,25 +169,68 @@ int RENAME(fifo_unmirror)(AVBufferRef *dst, AVBufferRef *src) return 0; } -int RENAME(fifo_unmirror_all)(AVBufferRef *dst) +int RENAME(fifo_unmirror_all)(AVBufferRef *ref) { - if (!dst) + if (!ref) return 0; - SNAME *dst_ctx = (SNAME *)dst->data; + void *ref_class = av_buffer_get_opaque(ref); + const char *ref_class_name = sp_class_get_name(ref_class); + enum SPLogLevel log_lvl = sp_log_get_ctx_lvl(ref_class_name); + + if (log_lvl >= SP_LOG_VERBOSE) { + const char *ref_class_type = sp_class_type_string(ref_class); + + sp_log(ref_class, SP_LOG_VERBOSE, "Unmirroring all from \"%s\" (%s)...\n", + ref_class_name ? ref_class_name : "unknown", + ref_class_type ? ref_class_type : "unknown"); + } - pthread_mutex_lock(&dst_ctx->lock); + SNAME *ref_ctx = (SNAME *)ref->data; + + pthread_mutex_lock(&ref_ctx->lock); AVBufferRef *src_ref = NULL; - while ((src_ref = sp_bufferlist_pop(dst_ctx->sources, sp_bufferlist_find_fn_first, NULL))) { + while ((src_ref = sp_bufferlist_pop(ref_ctx->sources, sp_bufferlist_find_fn_first, NULL))) { SNAME *src_ctx = (SNAME *)src_ref->data; AVBufferRef *own_ref = sp_bufferlist_pop(src_ctx->dests, find_ref_by_data, - dst_ctx); + ref_ctx); + if (log_lvl >= SP_LOG_VERBOSE) { + void *src_class = av_buffer_get_opaque(src_ref); + const char *src_class_name = sp_class_get_name(src_class); + const char *src_class_type = sp_class_type_string(src_class); + + sp_log(ref_class, SP_LOG_VERBOSE, " ...from source \"%s\" (%s)\n", + src_class_name ? src_class_name : "unknown", + src_class_type ? src_class_type : "unknown"); + } av_buffer_unref(&own_ref); av_buffer_unref(&src_ref); } - pthread_mutex_unlock(&dst_ctx->lock); + AVBufferRef *dst_ref = NULL; + while ((dst_ref = sp_bufferlist_pop(ref_ctx->dests, sp_bufferlist_find_fn_first, NULL))) { + SNAME *dst_ctx = (SNAME *)dst_ref->data; + AVBufferRef *own_ref = sp_bufferlist_pop(dst_ctx->sources, find_ref_by_data, + ref_ctx); + if (log_lvl >= SP_LOG_VERBOSE) { + void *dst_class = av_buffer_get_opaque(dst_ref); + const char *dst_class_name = sp_class_get_name(dst_class); + const char *dst_class_type = sp_class_type_string(dst_class); + + sp_log(ref_class, SP_LOG_VERBOSE, " ...from dest \"%s\" (%s)\n", + dst_class_name ? dst_class_name : "unknown", + dst_class_type ? dst_class_type : "unknown"); + } + + /* unblock anyone pulling this dest */ + pthread_cond_signal(&dst_ctx->cond_in); + + av_buffer_unref(&own_ref); + av_buffer_unref(&dst_ref); + } + + pthread_mutex_unlock(&ref_ctx->lock); return 0; } @@ -290,7 +375,24 @@ int RENAME(fifo_push)(AVBufferRef *dst, TYPE *in) return err; } -int RENAME(fifo_pop_flags)(AVBufferRef *src, TYPE **dst, FNAME flags) +int RENAME(fifo_poke)(AVBufferRef *ref) +{ + SNAME *ctx = (SNAME *)ref->data; + void *ref_class = av_buffer_get_opaque(ref); + const char *ref_class_name = sp_class_get_name(ref_class); + const char *ref_class_type = sp_class_type_string(ref_class); + sp_log(ref_class, SP_LOG_VERBOSE, "Poking FIFO \"%s\" (%s)...\n", + ref_class_name ? ref_class_name : "unknown", + ref_class_type ? ref_class_type : "unknown"); + pthread_mutex_lock(&ctx->lock); + ctx->poked = true; + pthread_mutex_unlock(&ctx->lock); + pthread_cond_signal(&ctx->cond_in); + return 0; +} + +static int PRIV_RENAME(fifo_pull_flags_template)(AVBufferRef *src, TYPE **dst, + FNAME flags, int pop) { int ret = 0; @@ -303,24 +405,43 @@ int RENAME(fifo_pop_flags)(AVBufferRef *src, TYPE **dst, FNAME flags) SNAME *ctx = (SNAME *)src->data; pthread_mutex_lock(&ctx->lock); - if (!ctx->num_queued) { - if ((flags & FRENAME(PULL_NO_BLOCK)) || - !(ctx->block_flags & FRENAME(BLOCK_NO_INPUT))) { + int pull_poke = flags & FRENAME(PULL_POKE); + int pull_no_block = flags & FRENAME(PULL_NO_BLOCK); + + while (!ctx->num_queued) { + /* this one might change while we wait for `cond_in` */ + int block_no_input = ctx->block_flags & FRENAME(BLOCK_NO_INPUT); + + if (!block_no_input || pull_no_block) { ret = AVERROR(EAGAIN); goto unlock; } - pthread_cond_wait(&ctx->cond_in, &ctx->lock); + if (!ctx->poked) { + pthread_cond_wait(&ctx->cond_in, &ctx->lock); + } + + /* if the `PULL_POKE` flag is set, return on poke */ + if (pull_poke && ctx->poked) { + ctx->poked = false; + ret = AVERROR(EAGAIN); + goto unlock; + } + ctx->poked = false; } - out = ctx->queued[0]; - ctx->num_queued--; - assert(ctx->num_queued >= 0); + if (pop) { + out = ctx->queued[0]; + ctx->num_queued--; + assert(ctx->num_queued >= 0); - memmove(&ctx->queued[0], &ctx->queued[1], ctx->num_queued*sizeof(TYPE *)); + memmove(&ctx->queued[0], &ctx->queued[1], ctx->num_queued*sizeof(TYPE *)); - if (ctx->max_queued > 0) - pthread_cond_signal(&ctx->cond_out); + if (ctx->max_queued > 0) + pthread_cond_signal(&ctx->cond_out); + } else { + out = CLONE_FN(ctx->queued[0]); + } unlock: pthread_mutex_unlock(&ctx->lock); @@ -330,33 +451,35 @@ int RENAME(fifo_pop_flags)(AVBufferRef *src, TYPE **dst, FNAME flags) return ret; } -TYPE *RENAME(fifo_pop)(AVBufferRef *src) +int RENAME(fifo_pop_flags)(AVBufferRef *src, TYPE **dst, FNAME flags) { - TYPE *ret; - RENAME(fifo_pop_flags)(src, &ret, 0x0); - return ret; + return PRIV_RENAME(fifo_pull_flags_template)(src, dst, flags, 1); } -TYPE *RENAME(fifo_peek)(AVBufferRef *src) +TYPE *RENAME(fifo_pop)(AVBufferRef *src) { - if (!src) - return NULL; - - TYPE *out = NULL; - SNAME *ctx = (SNAME *)src->data; - pthread_mutex_lock(&ctx->lock); - - if (!ctx->num_queued) { - if (!(ctx->block_flags & FRENAME(BLOCK_NO_INPUT))) - goto unlock; - - pthread_cond_wait(&ctx->cond_in, &ctx->lock); - } - - out = CLONE_FN(ctx->queued[0]); + TYPE *val = NULL; + PRIV_RENAME(fifo_pull_flags_template)(src, &val, 0x0, 1); + return val; +} -unlock: - pthread_mutex_unlock(&ctx->lock); +int RENAME(fifo_peek_flags)(AVBufferRef *src, TYPE **dst, FNAME flags) +{ + return PRIV_RENAME(fifo_pull_flags_template)(src, dst, flags, 0); +} - return out; +TYPE *RENAME(fifo_peek)(AVBufferRef *src) +{ + TYPE *val = NULL; + PRIV_RENAME(fifo_pull_flags_template)(src, &val, 0x0, 0); + return val; } + +#undef TYPE +#undef CLONE_FN +#undef FREE_FN +#undef SNAME +#undef FNAME +#undef PRIV_RENAME +#undef RENAME +#undef FRENAME diff --git a/src/include/libtxproto/fifo_frame.h b/src/include/libtxproto/fifo_frame.h index 856d23f..c21363a 100644 --- a/src/include/libtxproto/fifo_frame.h +++ b/src/include/libtxproto/fifo_frame.h @@ -21,17 +21,18 @@ #include #include -enum SPFrameFIFOFlags { - FRAME_FIFO_BLOCK_MAX_OUTPUT = (1 << 0), - FRAME_FIFO_BLOCK_NO_INPUT = (1 << 1), - FRAME_FIFO_PULL_NO_BLOCK = (1 << 2), -}; - #define FRENAME(x) FRAME_FIFO_ ## x #define RENAME(x) sp_frame_ ##x -#define FNAME enum SPFrameFIFOFlags +#define FNAME SPFrameFIFOFlags #define TYPE AVFrame +typedef enum FNAME { + FRENAME(BLOCK_MAX_OUTPUT) = (1 << 0), + FRENAME(BLOCK_NO_INPUT) = (1 << 1), + FRENAME(PULL_NO_BLOCK) = (1 << 2), + FRENAME(PULL_POKE) = (1 << 3), +} FNAME; + /* Create */ AVBufferRef *RENAME(fifo_create)(void *opaque, int max_queued, FNAME block_flags); /* -1 = INF, 0 = none */ AVBufferRef *RENAME(fifo_ref)(AVBufferRef *src, int max_queued, FNAME block_flags); @@ -52,10 +53,12 @@ int RENAME(fifo_unmirror)(AVBufferRef *dst, AVBufferRef *src); int RENAME(fifo_unmirror_all)(AVBufferRef *dst); /* I/O */ +int RENAME(fifo_poke)(AVBufferRef *dst); int RENAME(fifo_push)(AVBufferRef *dst, TYPE *in); TYPE *RENAME(fifo_pop)(AVBufferRef *src); -int RENAME(fifo_pop_flags)(AVBufferRef *src, TYPE **ret, FNAME flags); +int RENAME(fifo_pop_flags)(AVBufferRef *src, TYPE **dst, FNAME flags); TYPE *RENAME(fifo_peek)(AVBufferRef *src); +int RENAME(fifo_peek_flags)(AVBufferRef *src, TYPE **dst, FNAME flags); #undef TYPE #undef FNAME diff --git a/src/include/libtxproto/fifo_packet.h b/src/include/libtxproto/fifo_packet.h index ecc7fb9..9f4a9ab 100644 --- a/src/include/libtxproto/fifo_packet.h +++ b/src/include/libtxproto/fifo_packet.h @@ -21,17 +21,18 @@ #include #include -enum SPPacketFIFOFlags { - PACKET_FIFO_BLOCK_MAX_OUTPUT = (1 << 0), - PACKET_FIFO_BLOCK_NO_INPUT = (1 << 1), - PACKET_FIFO_PULL_NO_BLOCK = (1 << 2), -}; - #define FRENAME(x) PACKET_FIFO_ ## x #define RENAME(x) sp_packet_ ##x -#define FNAME enum SPPacketFIFOFlags +#define FNAME SPPacketFIFOFlags #define TYPE AVPacket +typedef enum FNAME { + FRENAME(BLOCK_MAX_OUTPUT) = (1 << 0), + FRENAME(BLOCK_NO_INPUT) = (1 << 1), + FRENAME(PULL_NO_BLOCK) = (1 << 2), + FRENAME(PULL_POKE) = (1 << 3), +} FNAME; + /* Create */ AVBufferRef *RENAME(fifo_create)(void *opaque, int max_queued, FNAME block_flags); /* -1 = INF, 0 = none */ AVBufferRef *RENAME(fifo_ref)(AVBufferRef *src, int max_queued, FNAME block_flags); @@ -52,10 +53,12 @@ int RENAME(fifo_unmirror)(AVBufferRef *dst, AVBufferRef *src); int RENAME(fifo_unmirror_all)(AVBufferRef *dst); /* I/O */ +int RENAME(fifo_poke)(AVBufferRef *dst); int RENAME(fifo_push)(AVBufferRef *dst, TYPE *in); TYPE *RENAME(fifo_pop)(AVBufferRef *src); -int RENAME(fifo_pop_flags)(AVBufferRef *src, TYPE **ret, FNAME flags); +int RENAME(fifo_pop_flags)(AVBufferRef *src, TYPE **dst, FNAME flags); TYPE *RENAME(fifo_peek)(AVBufferRef *src); +int RENAME(fifo_peek_flags)(AVBufferRef *src, TYPE **dst, FNAME flags); #undef TYPE #undef FNAME From b50867ac2104bc6666e063dc1635601e57c7ba50 Mon Sep 17 00:00:00 2001 From: nasso Date: Tue, 27 Aug 2024 15:52:05 +0200 Subject: [PATCH 3/5] encoder: fix deadlock in when input changes The encoder would be stuck pulling a frame when its input is disconnected, preventing it from ever linking with a new input. --- src/encode.c | 68 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 47 insertions(+), 21 deletions(-) diff --git a/src/encode.c b/src/encode.c index 30e7727..48eaf9d 100644 --- a/src/encode.c +++ b/src/encode.c @@ -20,13 +20,14 @@ #include #include #include +#include #include #include +#include #include "encoding_utils.h" #include "os_compat.h" -#include #include "utils.h" #include "ctrl_template.h" @@ -216,10 +217,10 @@ static int init_hwcontext(EncodingContext *ctx, AVFrame *conf) ctx->avctx->thread_type = 0; ctx->avctx->hw_frames_ctx = av_buffer_ref(ctx->enc_frames_ref); - if (!ctx->avctx->hw_frames_ctx) { - av_buffer_unref(&ctx->enc_frames_ref); - err = AVERROR(ENOMEM); - } + if (!ctx->avctx->hw_frames_ctx) { + av_buffer_unref(&ctx->enc_frames_ref); + err = AVERROR(ENOMEM); + } end: /* Hardware frames make their own ref */ @@ -257,10 +258,10 @@ static int configure_encoder(EncodingContext *ctx, AVFrame *conf) } err = avcodec_open2(ctx->avctx, ctx->codec, NULL); - if (err < 0) { - sp_log(ctx, SP_LOG_ERROR, "Cannot open encoder: %s!\n", av_err2str(err)); - return err; - } + if (err < 0) { + sp_log(ctx, SP_LOG_ERROR, "Cannot open encoder: %s!\n", av_err2str(err)); + return err; + } return 0; } @@ -268,15 +269,26 @@ static int configure_encoder(EncodingContext *ctx, AVFrame *conf) static int context_full_config(EncodingContext *ctx) { int err; + AVFrame *conf; + + do { + err = sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_CONFIG, NULL); + if (err < 0) + return err; + + av_buffer_unref(&ctx->mode_negotiate_event); + + sp_log(ctx, SP_LOG_VERBOSE, "Getting a frame to configure...\n"); - err = sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_CONFIG, NULL); + /* blocks until we get a frame or a poke */ + err = sp_frame_fifo_peek_flags(ctx->src_frames, &conf, + FRAME_FIFO_BLOCK_NO_INPUT | + FRAME_FIFO_PULL_POKE); + /* loop if we get poked to dispatch on:config */ + } while (err == AVERROR(EAGAIN)); if (err < 0) return err; - av_buffer_unref(&ctx->mode_negotiate_event); - - sp_log(ctx, SP_LOG_VERBOSE, "Getting a frame to configure...\n"); - AVFrame *conf = sp_frame_fifo_peek(ctx->src_frames); if (!conf) { sp_log(ctx, SP_LOG_ERROR, "No input frame to configure with!\n"); return AVERROR(EINVAL); @@ -592,7 +604,7 @@ static void *encoding_thread(void *arg) #if 0 if (!sp_eventlist_has_dispatched(ctx->events, SP_EVENT_ON_CONFIG)) { - int ret = context_full_config(ctx); + ret = context_full_config(ctx); if (ret < 0) return ret; } @@ -600,13 +612,14 @@ static void *encoding_thread(void *arg) sp_log(ctx, SP_LOG_VERBOSE, "Encoder initialized!\n"); - sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_CONFIG | SP_EVENT_ON_INIT, NULL); + sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_INIT, NULL); do { - pthread_mutex_lock(&ctx->lock); - AVFrame *frame = NULL; + pthread_mutex_lock(&ctx->lock); + sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_CONFIG, NULL); + if (ctx->reconfigure_frame && !ctx->waiting_eof) { frame = ctx->reconfigure_frame; ctx->reconfigure_frame = NULL; @@ -618,7 +631,21 @@ static void *encoding_thread(void *arg) if (ctx->avctx->flags & AV_CODEC_FLAG_GLOBAL_HEADER) ctx->attach_sidedata = 1; } else if (!flush) { - frame = sp_frame_fifo_pop(ctx->src_frames); + sp_log(ctx, SP_LOG_TRACE, "Pulling frame...\n"); + ret = sp_frame_fifo_pop_flags(ctx->src_frames, &frame, + FRAME_FIFO_PULL_POKE); + if (ret == AVERROR(EAGAIN)) { + sp_log(ctx, SP_LOG_VERBOSE, "No frame yet, trying again...\n"); + pthread_mutex_unlock(&ctx->lock); + continue; + } + + if (frame) + sp_log(ctx, SP_LOG_TRACE, "Got frame, pts = %f\n", + av_q2d(ctx->avctx->time_base) * frame->pts); + else + sp_log(ctx, SP_LOG_VERBOSE, "Received NULL frame, flushing...\n"); + flush = !frame; } @@ -718,8 +745,6 @@ static void *encoding_thread(void *arg) sp_packet_fifo_push(ctx->dst_packets, out_pkt); - sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_CONFIG | SP_EVENT_ON_INIT, NULL); - av_packet_free(&out_pkt); } @@ -804,6 +829,7 @@ static int encoder_ioctx_ctrl_cb(AVBufferRef *event_ref, void *callback_ctx, int sp_encoder_ctrl(AVBufferRef *ctx_ref, SPEventType ctrl, void *arg) { EncodingContext *ctx = (EncodingContext *)ctx_ref->data; + sp_frame_fifo_poke(ctx->src_frames); return sp_ctrl_template(ctx, ctx->events, 0x0, encoder_ioctx_ctrl_cb, ctrl, arg); } From f22f3a1099c4e94737fb1c6d84e1168d7627d399 Mon Sep 17 00:00:00 2001 From: nasso Date: Tue, 27 Aug 2024 15:52:05 +0200 Subject: [PATCH 4/5] filter: add `send_eos` init option Controls whether filters send EOS when destroyed. --- src/filter.c | 7 ++++++- src/include/libtxproto/filter.h | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/filter.c b/src/filter.c index 1456f16..fd7ef8a 100644 --- a/src/filter.c +++ b/src/filter.c @@ -698,7 +698,7 @@ static void *filtering_thread(void *data) av_frame_free(&filt_frame); - { + if (ctx->send_eos) { int tmp = err; sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_EOS, &tmp); if (tmp != 0) { @@ -814,6 +814,10 @@ static int filter_ioctx_ctrl_cb(AVBufferRef *event_ref, void *callback_ctx, sp_frame_fifo_set_max_queued(ctx->in_pads[i]->fifo, len); } } + if ((tmp_val = dict_get(event->opts, "send_eos"))) { + /* anything other than "false" or 0 is truthy */ + ctx->send_eos = strcmp(tmp_val, "false") && strtol(tmp_val, NULL, 10) != 0; + } pthread_mutex_unlock(&ctx->lock); } else if (event->ctrl & SP_EVENT_CTRL_COMMAND) { char result[4096]; @@ -930,6 +934,7 @@ AVBufferRef *sp_filter_alloc(void) pthread_mutex_init(&ctx->lock, NULL); ctx->events = sp_bufferlist_new(); + ctx->send_eos = 1; return ctx_ref; } diff --git a/src/include/libtxproto/filter.h b/src/include/libtxproto/filter.h index 2855564..bc21510 100644 --- a/src/include/libtxproto/filter.h +++ b/src/include/libtxproto/filter.h @@ -57,7 +57,7 @@ typedef struct FilterContext { char **out_pad_names; int dump_graph; - int fifo_size; + int send_eos; /* Derived from input device reference */ enum AVHWDeviceType device_type; From df438cb0be825daff236659d2e92b66302acd59f Mon Sep 17 00:00:00 2001 From: nasso Date: Tue, 27 Aug 2024 15:52:05 +0200 Subject: [PATCH 5/5] examples: add `replace_node.c` example Shows how one can hot-swap a filtergraph without killing the encoder. --- DOCS/examples/replace_node.c | 223 +++++++++++++++++++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 DOCS/examples/replace_node.c diff --git a/DOCS/examples/replace_node.c b/DOCS/examples/replace_node.c new file mode 100644 index 0000000..578d421 --- /dev/null +++ b/DOCS/examples/replace_node.c @@ -0,0 +1,223 @@ +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +/** + * @file example of how one can dynamically replace a filtergraph node + * @example replace_node.c + * + * This example shows how a filtergraph node can be replaced dynamically on a + * running stream. It is recommended to run this example with a real-time input + * like an MPEGTS or RTP stream (see example usage below). + * + * The graph initially starts as: + * + * ┌───────┐ ┌────┐ ┌─────┐ ┌───────┐ ┌─────┐ + * │demuxer│──►│h264│──►│hflip│──►│libx264│──►│muxer│ + * └───────┘ └────┘ └─────┘ └───────┘ └─────┘ + * + * When the user presses enter, `hflip` is destroyed and a new filtergraph + * `vflip` is created: + * + * ┌ ─ ─ ┐ + * ┌ ► hflip ─ ┐ + * ┌───────┐ ┌────┐ └ ─ ─ ┘ ┌───────┐ ┌─────┐ + * │demuxer│──►│h264│─┤ ├─►│libx264│──►│muxer│ + * └───────┘ └────┘ │ ┌─────┐ │ └───────┘ └─────┘ + * └─►│vflip│──┘ + * └─────┘ + * + * The user can press enter again to repeat the process indefinitely, + * alternating between an `hflip` and a `vflip` filter. Note that the filters + * are purposefully destroyed and recreated each time and aren't being reused. + * + * Building + * -------- + * + * gcc -Wall -g replace_node.c $(pkg-config --cflags --libs txproto libavutil) + * + * Usage + * ----- + * + * ./a.out + * + * or + * + * ./a.out + * + * Example + * ------- + * + * Start the example: + * + * ./a.out udp://127.0.0.1:9000 h264 libx264 udp://127.0.0.1:9001 + * + * Then, in another terminal, start the player: + * + * ffplay udp://127.0.0.1:9001 + * + * Finally, in yet another terminal, start the source: + * + * ffmpeg -re -f lavfi -i testsrc=r=30:s=hd720 -c:v libx264 -g 60 -f mpegts udp://127.1:9000 + * + */ + +struct Args { + const char *in_url; + const char *in_fmt; + const char *decoder; + const char *encoder; + const char *out_fmt; + const char *out_url; +}; + +static void print_usage(FILE *f, const char *arg0) +{ + fprintf( + f, + "Usage:\n" + " %1$s \n" + " %1$s \n", + arg0 + ); +} + +// helper macros for error handling +#define TRY(e) \ + do { \ + int err = e; \ + if (err < 0) return err; \ + } while(0) +#define EXPECT(e) \ + do { \ + int err = e; \ + assert(err >= 0); \ + } while(0) + +static AVDictionary *make_filter_init_opts() +{ + AVDictionary *init_opts = NULL; + + /* by default, filters send an EOS signal to their outputs when they get + * destroyed. we don't want that, as that would stop the encoder! */ + EXPECT(av_dict_set(&init_opts, "send_eos", "false", 0)); + + return init_opts; +} + +int main(int argc, char *argv[]) +{ + struct Args args; + + if (argc == 1) { + print_usage(stdout, argv[0]); + return 0; + } else if (argc == 5) { + args.in_url = argv[1]; + args.in_fmt = NULL; + args.decoder = argv[2]; + args.encoder = argv[3]; + args.out_fmt = NULL; + args.out_url = argv[4]; + } else if (argc == 7) { + args.in_url = argv[1]; + args.in_fmt = argv[2]; + args.decoder = argv[3]; + args.encoder = argv[4]; + args.out_fmt = argv[5]; + args.out_url = argv[6]; + } else { + fprintf(stderr, "Expected 4 or 6 arguments, got %d\n", argc - 1); + print_usage(stderr, argv[0]); + return 1; + } + + TXMainContext *ctx = tx_new(); + + EXPECT(tx_init(ctx)); + EXPECT(tx_epoch_set(ctx, 0)); + + printf("Creating nodes...\n"); + AVBufferRef *demuxer = tx_demuxer_create( + ctx, + NULL, // Name + args.in_url, // in_url + args.in_fmt, // in_format + NULL, // start_options + NULL // init_opts + ); + AVBufferRef *decoder = tx_decoder_create( + ctx, + args.decoder, // dec_name + NULL // init_opts + ); + AVBufferRef *filter = tx_filtergraph_create( + ctx, + "hflip", + AV_HWDEVICE_TYPE_NONE, + make_filter_init_opts() + ); + AVBufferRef *encoder = tx_encoder_create( + ctx, + args.encoder, + NULL, // name + NULL, // options + NULL // init_opts + ); + AVBufferRef *muxer = tx_muxer_create( + ctx, + args.out_url, + args.out_fmt, // out_format + NULL, // options + NULL // init_opts + ); + + printf("Initial setup...\n"); + EXPECT(tx_link(ctx, demuxer, decoder, 0)); + EXPECT(tx_link(ctx, decoder, filter, 0)); + EXPECT(tx_link(ctx, filter, encoder, 0)); + EXPECT(tx_link(ctx, encoder, muxer, 0)); + EXPECT(tx_commit(ctx)); + + int hflip = 1; + while (1) { + printf("Press enter to change filter...\n"); + getchar(); + + hflip = !hflip; + + if (hflip) { + printf("Replacing vflip with hflip...\n"); + } else { + printf("Replacing hflip with vflip...\n"); + } + + // destroy previous filter + EXPECT(tx_destroy(ctx, &filter)); + + // create the new one + filter = tx_filtergraph_create( + ctx, + hflip ? "hflip" : "vflip", + AV_HWDEVICE_TYPE_NONE, + make_filter_init_opts() + ); + + EXPECT(tx_link(ctx, decoder, filter, 0)); + EXPECT(tx_link(ctx, filter, encoder, 0)); + EXPECT(tx_commit(ctx)); + } + + printf("Freeing...\n"); + tx_free(ctx); + + return 0; +}