Skip to content

Commit

Permalink
fix: deadlock in encoder when replacing filter
Browse files Browse the repository at this point in the history
- Added a `replace_node.c` example showing how one can hot-swap a filtergraph
- Added a `send_eos` init option to control whether filters send EOS
  when destroyed
- Added a new FIFO flag: `XXX_FIFO_PULL_POKE`
- Added `sp_xxx_fifo_poke` to send a "poke" to a FIFO, forcing any pull
  operation with the `XXX_FIFO_PULL_POKE` flag to return with `EAGAIN`
- Fix deadlock in `encode.c` where the encoder would be stuck pulling a
  frame when its input is disconnected, preventing it from ever linking
  with a new input.
- Added `sp_xxx_fifo_peek_flags`
- Added some logging here and there...
  • Loading branch information
nasso committed Sep 3, 2024
1 parent ab8a677 commit 7edaa45
Show file tree
Hide file tree
Showing 10 changed files with 452 additions and 103 deletions.
223 changes: 223 additions & 0 deletions DOCS/examples/replace_node.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
#include <stdio.h>
#include <assert.h>
#include <limits.h>
#include <pthread.h>

#include <libavutil/buffer.h>
#include <libavutil/dict.h>
#include <libavutil/time.h>

#include <libtxproto/events.h>
#include <libtxproto/txproto.h>

/**
* @file example of how one can dynamically replace a filtergraph node
* @example replace_node.c
*
* This example shows how a filtergraph node can be replaced dynamically on a
* running stream. It is recommended to run this example with a real-time input
* like an MPEGTS or RTP stream (see example usage below).
*
* The graph initially starts as:
*
* ┌───────┐ ┌────┐ ┌─────┐ ┌───────┐ ┌─────┐
* │demuxer│──►│h264│──►│hflip│──►│libx264│──►│muxer│
* └───────┘ └────┘ └─────┘ └───────┘ └─────┘
*
* When the user presses enter, `hflip` is destroyed and a new filtergraph
* `vflip` is created:
*
* ┌ ─ ─ ┐
* ┌ ► hflip ─ ┐
* ┌───────┐ ┌────┐ └ ─ ─ ┘ ┌───────┐ ┌─────┐
* │demuxer│──►│h264│─┤ ├─►│libx264│──►│muxer│
* └───────┘ └────┘ │ ┌─────┐ │ └───────┘ └─────┘
* └─►│vflip│──┘
* └─────┘
*
* The user can press enter again to repeat the process indefinitely,
* alternating between an `hflip` and a `vflip` filter. Note that the filters
* are purposefully destroyed and recreated each time and aren't being reused.
*
* Building
* --------
*
* gcc -Wall -g replace_node.c $(pkg-config --cflags --libs txproto libavutil)
*
* Usage
* -----
*
* ./a.out <in-url> <decoder> <encoder> <out-url>
*
* or
*
* ./a.out <in-url> <in-fmt> <decoder> <encoder> <out-fmt> <out-url>
*
* Example
* -------
*
* Start the example:
*
* ./a.out udp://127.0.0.1:9000 h264 libx264 udp://127.0.0.1:9001
*
* Then, in another terminal, start the player:
*
* ffplay udp://127.0.0.1:9001
*
* Finally, in yet another terminal, start the source:
*
* ffmpeg -re -f lavfi -i testsrc=r=30:s=hd720 -c:v libx264 -g 60 -f mpegts udp://127.1:9000
*
*/

struct Args {
const char *in_url;
const char *in_fmt;
const char *decoder;
const char *encoder;
const char *out_fmt;
const char *out_url;
};

static void print_usage(FILE *f, const char *arg0)
{
fprintf(
f,
"Usage:\n"
" %1$s <in-url> <decoder> <encoder> <out-url>\n"
" %1$s <in-url> <in-fmt> <decoder> <encoder> <out-fmt> <out-url>\n",
arg0
);
}

// helper macros for error handling
#define TRY(e) \
do { \
int err = e; \
if (err < 0) return err; \
} while(0)
#define EXPECT(e) \
do { \
int err = e; \
assert(err >= 0); \
} while(0)

static AVDictionary *make_filter_init_opts()
{
AVDictionary *init_opts = NULL;

/* by default, filters send an EOS signal to their outputs when they get
* destroyed. we don't want that, as that would stop the encoder! */
EXPECT(av_dict_set(&init_opts, "send_eos", "false", 0));

return init_opts;
}

int main(int argc, char *argv[])
{
struct Args args;

if (argc == 1) {
print_usage(stdout, argv[0]);
return 0;
} else if (argc == 5) {
args.in_url = argv[1];
args.in_fmt = NULL;
args.decoder = argv[2];
args.encoder = argv[3];
args.out_fmt = NULL;
args.out_url = argv[4];
} else if (argc == 7) {
args.in_url = argv[1];
args.in_fmt = argv[2];
args.decoder = argv[3];
args.encoder = argv[4];
args.out_fmt = argv[5];
args.out_url = argv[6];
} else {
fprintf(stderr, "Expected 4 or 6 arguments, got %d\n", argc - 1);
print_usage(stderr, argv[0]);
return 1;
}

TXMainContext *ctx = tx_new();

EXPECT(tx_init(ctx));
EXPECT(tx_epoch_set(ctx, 0));

printf("Creating nodes...\n");
AVBufferRef *demuxer = tx_demuxer_create(
ctx,
NULL, // Name
args.in_url, // in_url
args.in_fmt, // in_format
NULL, // start_options
NULL // init_opts
);
AVBufferRef *decoder = tx_decoder_create(
ctx,
args.decoder, // dec_name
NULL // init_opts
);
AVBufferRef *filter = tx_filtergraph_create(
ctx,
"hflip",
AV_HWDEVICE_TYPE_NONE,
make_filter_init_opts()
);
AVBufferRef *encoder = tx_encoder_create(
ctx,
args.encoder,
NULL, // name
NULL, // options
NULL // init_opts
);
AVBufferRef *muxer = tx_muxer_create(
ctx,
args.out_url,
args.out_fmt, // out_format
NULL, // options
NULL // init_opts
);

printf("Initial setup...\n");
EXPECT(tx_link(ctx, demuxer, decoder, 0));
EXPECT(tx_link(ctx, decoder, filter, 0));
EXPECT(tx_link(ctx, filter, encoder, 0));
EXPECT(tx_link(ctx, encoder, muxer, 0));
EXPECT(tx_commit(ctx));

int hflip = 1;
while (1) {
printf("Press enter to change filter...\n");
getchar();

hflip = !hflip;

if (hflip) {
printf("Replacing vflip with hflip...\n");
} else {
printf("Replacing hflip with vflip...\n");
}

// destroy previous filter
EXPECT(tx_destroy(ctx, &filter));

// create the new one
filter = tx_filtergraph_create(
ctx,
hflip ? "hflip" : "vflip",
AV_HWDEVICE_TYPE_NONE,
make_filter_init_opts()
);

EXPECT(tx_link(ctx, decoder, filter, 0));
EXPECT(tx_link(ctx, filter, encoder, 0));
EXPECT(tx_commit(ctx));
}

printf("Freeing...\n");
tx_free(ctx);

return 0;
}
68 changes: 47 additions & 21 deletions src/encode.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
#include <libavutil/opt.h>
#include <libavutil/pixdesc.h>
#include <libavutil/cpu.h>
#include <libavutil/mem.h>

#include <libtxproto/encode.h>
#include <libtxproto/log.h>
#include <libtxproto/utils.h>

#include "encoding_utils.h"
#include "os_compat.h"
#include <libtxproto/utils.h>
#include "utils.h"
#include "ctrl_template.h"

Expand Down Expand Up @@ -216,10 +217,10 @@ static int init_hwcontext(EncodingContext *ctx, AVFrame *conf)
ctx->avctx->thread_type = 0;

ctx->avctx->hw_frames_ctx = av_buffer_ref(ctx->enc_frames_ref);
if (!ctx->avctx->hw_frames_ctx) {
av_buffer_unref(&ctx->enc_frames_ref);
err = AVERROR(ENOMEM);
}
if (!ctx->avctx->hw_frames_ctx) {
av_buffer_unref(&ctx->enc_frames_ref);
err = AVERROR(ENOMEM);
}

end:
/* Hardware frames make their own ref */
Expand Down Expand Up @@ -257,26 +258,37 @@ static int configure_encoder(EncodingContext *ctx, AVFrame *conf)
}

err = avcodec_open2(ctx->avctx, ctx->codec, NULL);
if (err < 0) {
sp_log(ctx, SP_LOG_ERROR, "Cannot open encoder: %s!\n", av_err2str(err));
return err;
}
if (err < 0) {
sp_log(ctx, SP_LOG_ERROR, "Cannot open encoder: %s!\n", av_err2str(err));
return err;
}

return 0;
}

static int context_full_config(EncodingContext *ctx)
{
int err;
AVFrame *conf;

do {
err = sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_CONFIG, NULL);
if (err < 0)
return err;

av_buffer_unref(&ctx->mode_negotiate_event);

sp_log(ctx, SP_LOG_VERBOSE, "Getting a frame to configure...\n");

err = sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_CONFIG, NULL);
/* blocks until we get a frame or a poke */
err = sp_frame_fifo_peek_flags(ctx->src_frames, &conf,
FRAME_FIFO_BLOCK_NO_INPUT |
FRAME_FIFO_PULL_POKE);
/* loop if we get poked to dispatch on:config */
} while (err == AVERROR(EAGAIN));
if (err < 0)
return err;

av_buffer_unref(&ctx->mode_negotiate_event);

sp_log(ctx, SP_LOG_VERBOSE, "Getting a frame to configure...\n");
AVFrame *conf = sp_frame_fifo_peek(ctx->src_frames);
if (!conf) {
sp_log(ctx, SP_LOG_ERROR, "No input frame to configure with!\n");
return AVERROR(EINVAL);
Expand Down Expand Up @@ -592,21 +604,22 @@ static void *encoding_thread(void *arg)

#if 0
if (!sp_eventlist_has_dispatched(ctx->events, SP_EVENT_ON_CONFIG)) {
int ret = context_full_config(ctx);
ret = context_full_config(ctx);
if (ret < 0)
return ret;
}
#endif

sp_log(ctx, SP_LOG_VERBOSE, "Encoder initialized!\n");

sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_CONFIG | SP_EVENT_ON_INIT, NULL);
sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_INIT, NULL);

do {
pthread_mutex_lock(&ctx->lock);

AVFrame *frame = NULL;

pthread_mutex_lock(&ctx->lock);
sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_CONFIG, NULL);

if (ctx->reconfigure_frame && !ctx->waiting_eof) {
frame = ctx->reconfigure_frame;
ctx->reconfigure_frame = NULL;
Expand All @@ -618,7 +631,21 @@ static void *encoding_thread(void *arg)
if (ctx->avctx->flags & AV_CODEC_FLAG_GLOBAL_HEADER)
ctx->attach_sidedata = 1;
} else if (!flush) {
frame = sp_frame_fifo_pop(ctx->src_frames);
sp_log(ctx, SP_LOG_TRACE, "Pulling frame...\n");
ret = sp_frame_fifo_pop_flags(ctx->src_frames, &frame,
FRAME_FIFO_PULL_POKE);
if (ret == AVERROR(EAGAIN)) {
sp_log(ctx, SP_LOG_VERBOSE, "No frame yet, trying again...\n");
pthread_mutex_unlock(&ctx->lock);
continue;
}

if (frame)
sp_log(ctx, SP_LOG_TRACE, "Got frame, pts = %f\n",
av_q2d(ctx->avctx->time_base) * frame->pts);
else
sp_log(ctx, SP_LOG_VERBOSE, "Received NULL frame, flushing...\n");

flush = !frame;
}

Expand Down Expand Up @@ -718,8 +745,6 @@ static void *encoding_thread(void *arg)

sp_packet_fifo_push(ctx->dst_packets, out_pkt);

sp_eventlist_dispatch(ctx, ctx->events, SP_EVENT_ON_CONFIG | SP_EVENT_ON_INIT, NULL);

av_packet_free(&out_pkt);
}

Expand Down Expand Up @@ -804,6 +829,7 @@ static int encoder_ioctx_ctrl_cb(AVBufferRef *event_ref, void *callback_ctx,
int sp_encoder_ctrl(AVBufferRef *ctx_ref, SPEventType ctrl, void *arg)
{
EncodingContext *ctx = (EncodingContext *)ctx_ref->data;
sp_frame_fifo_poke(ctx->src_frames);
return sp_ctrl_template(ctx, ctx->events, 0x0,
encoder_ioctx_ctrl_cb, ctrl, arg);
}
Expand Down
11 changes: 1 addition & 10 deletions src/fifo_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,10 @@
#define FRENAME(x) FRAME_FIFO_ ## x
#define RENAME(x) sp_frame_ ##x
#define PRIV_RENAME(x) frame_ ##x
#define FNAME enum SPFrameFIFOFlags
#define FNAME SPFrameFIFOFlags
#define SNAME SPFrameFIFO
#define FREE_FN av_frame_free
#define CLONE_FN(x) ((x) ? av_frame_clone((x)) : NULL)
#define TYPE AVFrame

#include "fifo_template.c"

#undef TYPE
#undef CLONE_FN
#undef FREE_FN
#undef SNAME
#undef FNAME
#undef PRIV_RENAME
#undef RENAME
#undef FRENAME
Loading

0 comments on commit 7edaa45

Please sign in to comment.