diff --git a/cmd/examples/encode/context.go b/cmd/examples/encode/context.go new file mode 100644 index 0000000..97c8708 --- /dev/null +++ b/cmd/examples/encode/context.go @@ -0,0 +1,33 @@ +package main + +import ( + "context" + "os" + "os/signal" +) + +/////////////////////////////////////////////////////////////////////////////// +// PUBLIC METHODS + +// ContextForSignal returns a context object which is cancelled when a signal +// is received. It returns nil if no signal parameter is provided +func ContextForSignal(signals ...os.Signal) context.Context { + if len(signals) == 0 { + return nil + } + + ch := make(chan os.Signal, 1) + ctx, cancel := context.WithCancel(context.Background()) + + // Send message on channel when signal received + signal.Notify(ch, signals...) + + // When any signal received, call cancel + go func() { + <-ch + cancel() + }() + + // Return success + return ctx +} diff --git a/cmd/examples/encode/main.go b/cmd/examples/encode/main.go index 306f215..6b3c5d6 100644 --- a/cmd/examples/encode/main.go +++ b/cmd/examples/encode/main.go @@ -1,10 +1,13 @@ package main import ( + "context" + "errors" "fmt" "io" "log" "os" + "syscall" // Packages ffmpeg "github.com/mutablelogic/go-media/pkg/ffmpeg" @@ -13,9 +16,14 @@ import ( // This example encodes an audio an video stream to a file func main() { + // Check we have a filename + if len(os.Args) != 2 { + log.Fatal("Usage: encode filename") + } + // Create a new file with an audio and video stream file, err := ffmpeg.Create(os.Args[1], - ffmpeg.OptStream(1, ffmpeg.VideoPar("yuv420p", "1280x720", 30)), + ffmpeg.OptStream(1, ffmpeg.VideoPar("yuv420p", "1280x720", 25, ffmpeg.NewMetadata("crf", 2))), ffmpeg.OptStream(2, ffmpeg.AudioPar("fltp", "mono", 22050)), ) if err != nil { @@ -39,10 +47,13 @@ func main() { } defer audio.Close() + // Bail out when we receive a signal + ctx := ContextForSignal(os.Interrupt, syscall.SIGQUIT) + // Write 90 seconds, passing video and audio frames to the encoder // and returning io.EOF when the duration is reached duration := float64(90) - err = file.Encode(func(stream int) (*ffmpeg.Frame, error) { + err = file.Encode(ctx, func(stream int) (*ffmpeg.Frame, error) { var frame *ffmpeg.Frame switch stream { case 1: @@ -51,12 +62,12 @@ func main() { frame = audio.Frame() } if frame != nil && frame.Ts() < duration { - fmt.Print(".") + fmt.Println(stream, frame.Ts()) return frame, nil } return nil, io.EOF }, nil) - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { log.Fatal(err) } fmt.Print("\n") diff --git a/pkg/ffmpeg/encoder.go b/pkg/ffmpeg/encoder.go index c16c116..d6a0c26 100644 --- a/pkg/ffmpeg/encoder.go +++ b/pkg/ffmpeg/encoder.go @@ -72,7 +72,7 @@ func NewEncoder(ctx *ff.AVFormatContext, stream int, par *Par) (*Encoder, error) } // Create the stream - if streamctx := ff.AVFormat_new_stream(ctx, nil); streamctx == nil { + if streamctx := ff.AVFormat_new_stream(ctx, codec); streamctx == nil { ff.AVCodec_free_context(encoder.ctx) return nil, ErrInternalAppError.With("could not allocate stream") } else { @@ -86,20 +86,40 @@ func NewEncoder(ctx *ff.AVFormatContext, stream int, par *Par) (*Encoder, error) encoder.ctx.SetFlags(encoder.ctx.Flags() | ff.AV_CODEC_FLAG_GLOBAL_HEADER) } + // Get the options + opts := par.newOpts() + if opts == nil { + ff.AVCodec_free_context(encoder.ctx) + return nil, ErrInternalAppError.With("could not allocate options dictionary") + } + defer ff.AVUtil_dict_free(opts) + // Open it - if err := ff.AVCodec_open(encoder.ctx, codec, nil); err != nil { + if err := ff.AVCodec_open(encoder.ctx, codec, opts); err != nil { ff.AVCodec_free_context(encoder.ctx) return nil, ErrInternalAppError.Withf("codec_open: %v", err) } + // If there are any non-consumed options, then error + var result error + for _, key := range ff.AVUtil_dict_keys(opts) { + result = errors.Join(result, ErrBadParameter.Withf("Stream %d: invalid codec option %q", stream, key)) + } + if result != nil { + ff.AVCodec_free_context(encoder.ctx) + return nil, result + } + // Copy parameters to stream if err := ff.AVCodec_parameters_from_context(encoder.stream.CodecPar(), encoder.ctx); err != nil { ff.AVCodec_free_context(encoder.ctx) return nil, err - } else { - encoder.stream.SetTimeBase(par.timebase) } + // Hint what timebase we want to encode at. This will change when writing the + // headers for the encoding process + encoder.stream.SetTimeBase(par.timebase) + // Create a packet packet := ff.AVCodec_packet_alloc() if packet == nil { @@ -167,7 +187,7 @@ func (e *Encoder) Encode(frame *Frame, fn EncoderPacketFn) error { // Return the codec parameters func (e *Encoder) Par() *Par { par := new(Par) - par.timebase = e.stream.TimeBase() + par.timebase = e.ctx.TimeBase() if err := ff.AVCodec_parameters_from_context(&par.AVCodecParameters, e.ctx); err != nil { return nil } else { @@ -180,9 +200,9 @@ func (e *Encoder) nextPts(frame *Frame) int64 { next_pts := int64(0) switch e.ctx.Codec().Type() { case ff.AVMEDIA_TYPE_AUDIO: - next_pts = ff.AVUtil_rational_rescale_q(int64(frame.NumSamples()), ff.AVUtil_rational(1, frame.SampleRate()), e.stream.TimeBase()) + next_pts = ff.AVUtil_rational_rescale_q(int64(frame.NumSamples()), frame.TimeBase(), e.stream.TimeBase()) case ff.AVMEDIA_TYPE_VIDEO: - next_pts = ff.AVUtil_rational_rescale_q(1, ff.AVUtil_rational_invert(e.ctx.Framerate()), e.stream.TimeBase()) + next_pts = ff.AVUtil_rational_rescale_q(1, frame.TimeBase(), e.stream.TimeBase()) default: // Dunno what to do with subtitle and data streams yet fmt.Println("TODO: next_pts for subtitle and data streams") diff --git a/pkg/ffmpeg/par.go b/pkg/ffmpeg/par.go index 9a0e5d6..1812ac3 100644 --- a/pkg/ffmpeg/par.go +++ b/pkg/ffmpeg/par.go @@ -18,20 +18,25 @@ import ( type Par struct { ff.AVCodecParameters + opts []media.Metadata timebase ff.AVRational } type jsonPar struct { - Par ff.AVCodecParameters `json:"parameters"` - Timebase ff.AVRational `json:"timebase"` + ff.AVCodecParameters + Timebase ff.AVRational `json:"timebase"` + Opts []media.Metadata `json:"options"` } /////////////////////////////////////////////////////////////////////////////// // LIFECYCLE -func NewAudioPar(samplefmt string, channellayout string, samplerate int) (*Par, error) { +// Create new audio parameters with sample format, channel layout and sample rate +// plus any additional options which is used for creating a stream +func NewAudioPar(samplefmt string, channellayout string, samplerate int, opts ...media.Metadata) (*Par, error) { par := new(Par) par.SetCodecType(ff.AVMEDIA_TYPE_AUDIO) + par.opts = opts // Sample Format if samplefmt_ := ff.AVUtil_get_sample_fmt(samplefmt); samplefmt_ == ff.AV_SAMPLE_FMT_NONE { @@ -59,9 +64,12 @@ func NewAudioPar(samplefmt string, channellayout string, samplerate int) (*Par, return par, nil } -func NewVideoPar(pixfmt string, size string, framerate float64) (*Par, error) { +// Create new video parameters with pixel format, frame size, framerate +// plus any additional options which is used for creating a stream +func NewVideoPar(pixfmt string, size string, framerate float64, opts ...media.Metadata) (*Par, error) { par := new(Par) par.SetCodecType(ff.AVMEDIA_TYPE_VIDEO) + par.opts = opts // Pixel Format if pixfmt_ := ff.AVUtil_get_pix_fmt(pixfmt); pixfmt_ == ff.AV_PIX_FMT_NONE { @@ -88,37 +96,22 @@ func NewVideoPar(pixfmt string, size string, framerate float64) (*Par, error) { // Set default sample aspect ratio par.SetSampleAspectRatio(ff.AVUtil_rational(1, 1)) - // TODO: Set profile, codec and bitrate and any other parameters - - /* TODO - c->gop_size = 12; // emit one intra frame every twelve frames at most - c->pix_fmt = STREAM_PIX_FMT; - if (c->codec_id == AV_CODEC_ID_MPEG2VIDEO) { - // just for testing, we also add B-frames - c->max_b_frames = 2; - } - if (c->codec_id == AV_CODEC_ID_MPEG1VIDEO) { - // Needed to avoid using macroblocks in which some coeffs overflow. - // This does not happen with normal video, it just happens here as - // the motion of the chroma plane does not match the luma plane. - c->mb_decision = 2; - } - */ - // Return success return par, nil } -func AudioPar(samplefmt string, channellayout string, samplerate int) *Par { - if par, err := NewAudioPar(samplefmt, channellayout, samplerate); err != nil { +// Create audio parameters. If there is an error, then this function will panic +func AudioPar(samplefmt string, channellayout string, samplerate int, opts ...media.Metadata) *Par { + if par, err := NewAudioPar(samplefmt, channellayout, samplerate, opts...); err != nil { panic(err) } else { return par } } -func VideoPar(pixfmt string, size string, framerate float64) *Par { - if par, err := NewVideoPar(pixfmt, size, framerate); err != nil { +// Create video parameters. If there is an error, then this function will panic +func VideoPar(pixfmt string, size string, framerate float64, opts ...media.Metadata) *Par { + if par, err := NewVideoPar(pixfmt, size, framerate, opts...); err != nil { panic(err) } else { return par @@ -130,8 +123,9 @@ func VideoPar(pixfmt string, size string, framerate float64) *Par { func (ctx *Par) MarshalJSON() ([]byte, error) { return json.Marshal(jsonPar{ - Par: ctx.AVCodecParameters, - Timebase: ctx.timebase, + AVCodecParameters: ctx.AVCodecParameters, + Timebase: ctx.timebase, + Opts: ctx.opts, }) } @@ -196,6 +190,19 @@ func (ctx *Par) CopyToCodecContext(codec *ff.AVCodecContext) error { /////////////////////////////////////////////////////////////////////////////// // PRIVATE METHODS +// Return options as a dictionary, which needs to be freed after use +// by the caller method +func (ctx *Par) newOpts() *ff.AVDictionary { + dict := ff.AVUtil_dict_alloc() + for _, opt := range ctx.opts { + if err := ff.AVUtil_dict_set(dict, opt.Key(), opt.Value(), ff.AV_DICT_APPEND); err != nil { + ff.AVUtil_dict_free(dict) + return nil + } + } + return dict +} + func (ctx *Par) copyAudioCodec(codec *ff.AVCodecContext) error { codec.SetSampleFormat(ctx.SampleFormat()) codec.SetSampleRate(ctx.Samplerate()) diff --git a/pkg/ffmpeg/writer.go b/pkg/ffmpeg/writer.go index 60c8fa4..c59b785 100644 --- a/pkg/ffmpeg/writer.go +++ b/pkg/ffmpeg/writer.go @@ -1,6 +1,7 @@ package ffmpeg import ( + "context" "encoding/json" "errors" "fmt" @@ -240,7 +241,7 @@ func (w *Writer) Stream(stream int) *Encoder { // the frame. If the callback function returns io.EOF then the encoding for // that encoder is stopped after flushing. If the second callback is nil, // then packets are written to the output. -func (w *Writer) Encode(in EncoderFrameFn, out EncoderPacketFn) error { +func (w *Writer) Encode(ctx context.Context, in EncoderFrameFn, out EncoderPacketFn) error { if in == nil { return ErrBadParameter.With("nil in or out") } @@ -265,53 +266,35 @@ func (w *Writer) Encode(in EncoderFrameFn, out EncoderPacketFn) error { encoder.next_pts = 0 } - // Continue until all encoders have returned io.EOF and have been flushed + // Continue until all encoders have returned io.EOF (or context cancelled) + // and have been flushed for { // No more encoding to do if len(encoders) == 0 { break } - // Find encoder with the lowest timestamp, based on next_pts and timebase - next_stream := -1 - var next_encoder *Encoder - for stream, encoder := range encoders { - if next_encoder == nil || compareNextPts(encoder, next_encoder) < 0 { - next_encoder = encoder - next_stream = stream + // Mark as EOF if context is done + select { + case <-ctx.Done(): + // Mark all encoders as EOF to flush them + for _, encoder := range encoders { + encoder.eof = true } - } - - var frame *Frame - var err error - - // Receive a frame if not EOF - if !next_encoder.eof { - frame, err = in(next_stream) - if errors.Is(err, io.EOF) { - next_encoder.eof = true - } else if err != nil { - return fmt.Errorf("stream %v: %w", next_stream, err) + // Perform the encode + if err := encode(in, out, encoders); err != nil { + return err + } + default: + // Perform the encode + if err := encode(in, out, encoders); err != nil { + return err } } - - // Send a frame for encoding - if err := next_encoder.Encode(frame, out); err != nil { - return fmt.Errorf("stream %v: %w", next_stream, err) - } - - // If eof then delete the encoder - if next_encoder.eof { - delete(encoders, next_stream) - continue - } - - // Calculate the next PTS - next_encoder.next_pts = next_encoder.next_pts + next_encoder.nextPts(frame) } - // Return success - return nil + // Return error from context + return ctx.Err() } // Write a packet to the output. If you intercept the packets in the @@ -325,6 +308,49 @@ func compareNextPts(a, b *Encoder) int { return ff.AVUtil_compare_ts(a.next_pts, a.stream.TimeBase(), b.next_pts, b.stream.TimeBase()) } +//////////////////////////////////////////////////////////////////////////////// +// PRIVATE METHODS - Encoding + +// Find encoder with the lowest timestamp, based on next_pts and timebase +// and send to the the EncoderFrameFn +func encode(in EncoderFrameFn, out EncoderPacketFn, encoders map[int]*Encoder) error { + next_stream := -1 + var next_encoder *Encoder + for stream, encoder := range encoders { + if next_encoder == nil || compareNextPts(encoder, next_encoder) < 0 { + next_encoder = encoder + next_stream = stream + } + } + + // Receive a frame if not EOF + var frame *Frame + var err error + if !next_encoder.eof { + frame, err = in(next_stream) + if errors.Is(err, io.EOF) { + next_encoder.eof = true + } else if err != nil { + return fmt.Errorf("stream %v: %w", next_stream, err) + } + } + + // Send a frame for encoding + if err := next_encoder.Encode(frame, out); err != nil { + return fmt.Errorf("stream %v: %w", next_stream, err) + } + + // If eof then delete the encoder + if next_encoder.eof { + delete(encoders, next_stream) + return nil + } + + // Calculate the next PTS + next_encoder.next_pts = next_encoder.next_pts + next_encoder.nextPts(frame) + return nil +} + //////////////////////////////////////////////////////////////////////////////// // PRIVATE METHODS - Writer diff --git a/pkg/ffmpeg/writer_test.go b/pkg/ffmpeg/writer_test.go index e132f0f..d6e4b3e 100644 --- a/pkg/ffmpeg/writer_test.go +++ b/pkg/ffmpeg/writer_test.go @@ -1,6 +1,7 @@ package ffmpeg_test import ( + "context" "io" "os" "testing" @@ -41,7 +42,7 @@ func Test_writer_001(t *testing.T) { // Write 1 min of frames duration := float64(60) - assert.NoError(writer.Encode(func(stream int) (*ffmpeg.Frame, error) { + assert.NoError(writer.Encode(context.Background(), func(stream int) (*ffmpeg.Frame, error) { frame := audio.Frame() if frame.Ts() >= duration { return nil, io.EOF @@ -87,7 +88,7 @@ func Test_writer_002(t *testing.T) { // Write 15 mins of frames duration := float64(15 * 60) - assert.NoError(writer.Encode(func(stream int) (*ffmpeg.Frame, error) { + assert.NoError(writer.Encode(context.Background(), func(stream int) (*ffmpeg.Frame, error) { frame := audio.Frame() if frame.Ts() >= duration { return nil, io.EOF @@ -133,7 +134,7 @@ func Test_writer_003(t *testing.T) { // Write 1 min of frames duration := float64(60) - assert.NoError(writer.Encode(func(stream int) (*ffmpeg.Frame, error) { + assert.NoError(writer.Encode(context.Background(), func(stream int) (*ffmpeg.Frame, error) { frame := video.Frame() if frame.Ts() >= duration { return nil, io.EOF @@ -191,7 +192,7 @@ func Test_writer_004(t *testing.T) { // Write 10 secs of frames duration := float64(10) - assert.NoError(writer.Encode(func(stream int) (*ffmpeg.Frame, error) { + assert.NoError(writer.Encode(context.Background(), func(stream int) (*ffmpeg.Frame, error) { var frame *ffmpeg.Frame switch stream { case 1: