Skip to content

Commit

Permalink
Add thousand islands based source
Browse files Browse the repository at this point in the history
  • Loading branch information
dmorn committed Oct 30, 2023
1 parent 69fcddd commit 900cbdc
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 137 deletions.
44 changes: 6 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,14 @@ function.
Check the tests, but in practice this is the flow for decoding audio from a
multi-track file from file to file in a lazy fashion.

If initialized directly from a file, the demuxer
supports all protocols supported by libav itself (such as RTMP, UDP, HLS, local files, ...).
If a custom reader is provided, such the the MailboxReader, users should take
care of fetching and sending the bytes to the demuxer by themselves (very useful when
sending data from browser microphone through liveview and to the demuxer, for example).
Supports all protocols supported by libav itself (such as RTMP, UDP, HLS, local
files, unix sockets, TCP, UDP, ...).

```elixir
demuxer = Demuxer.new_from_file(input_path)
{:ok, demuxer} = Demuxer.new_from_file(input_path)

# Detect available stream and select one (or more)
{streams, demuxer} = Demuxer.streams(demuxer)
streams = Demuxer.read_streams(demuxer)
audio_stream = Enum.find(streams, fn stream -> stream.codec_type == :audio end)

# Initialize the decoder. The sample rate, channel and audio format will match
Expand All @@ -67,37 +64,8 @@ demuxer
|> Enum.into(output)
```

An experimental feature: the demuxer can be initialized with a custom reader,
which might send data from mailbox, a file and other custom patterns.

This is an example reading from a file.
```elixir
demuxer =
Demuxer.new_in_memory(%{
opaque: File.open!(input_path, [:raw, :read]),
read: fn input, size ->
resp = IO.binread(input, size)
{resp, input}
end,
close: fn input -> File.close(input) end
})

```

This one uses the MailboxReader: you can send messages with the data to
it and will act as a source to the Demuxer.
```elixir
{:ok, pid} = AVx.Demuxer.MailboxReader.start_link()

demuxer = AVx.Demuxer.new_in_memory(%{
opaque: pid,
read: &AVx.Demuxer.MailboxReader.read/2,
close: &AVx.Demuxer.MailboxReader.close/1
})

# In another process
:ok = AVx.Demuxer.MailboxReader.add_data(pid, <<>>)
```
I'm currently working on a ThousandIsland Handler that can be used
to create a Demuxer source suitable for streaming setups.

And that's it. Compared to using the `ffmpeg` executable directly, here you have access
to every single packet, which you can re-route, manipulate and process at will.
Expand Down
2 changes: 1 addition & 1 deletion c_src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ endif

all: $(LIB_SO)

$(LIB_SO): libav.c ioq.c demuxer.c decoder.c
$(LIB_SO): libav.c demuxer.c decoder.c
@ mkdir -p $(PRIV_DIR)
$(CC) $(CFLAGS) $(LDFLAGS) -o $@ $^ $(LIBS)

Expand Down
55 changes: 0 additions & 55 deletions c_src/ioq.c

This file was deleted.

28 changes: 0 additions & 28 deletions c_src/ioq.h

This file was deleted.

15 changes: 6 additions & 9 deletions c_src/libav.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
#include "libavcodec/codec_par.h"
#include "libavformat/avformat.h"
#include "libavutil/log.h"
#include "libavutil/samplefmt.h"
#include <decoder.h>
#include <demuxer.h>
#include <erl_nif.h>
Expand Down Expand Up @@ -450,17 +446,18 @@ static ErlNifFunc nif_funcs[] = {

// {erl_function_name, erl_function_arity, c_function}
// Demuxer
{"demuxer_alloc_from_file", 1, enif_demuxer_alloc_from_file},
{"demuxer_alloc_from_file", 1, enif_demuxer_alloc_from_file,
ERL_NIF_DIRTY_JOB_IO_BOUND},
{"demuxer_streams", 1, enif_demuxer_streams},
{"demuxer_read_packet", 1, enif_demuxer_read_packet},
// Decoder
// // Decoder
{"decoder_alloc", 1, enif_decoder_alloc},
{"decoder_stream_format", 1, enif_decoder_stream_format},
{"decoder_add_data", 2, enif_decoder_add_data},
// General
// // General
{"packet_stream_index", 1, enif_packet_stream_index},
// TODO
// Maybe unpack_* would be better function naming.
// // TODO
// // Maybe unpack_* would be better function naming.
{"packet_unpack", 1, enif_packet_unpack},
{"audio_frame_unpack", 1, enif_audio_frame_unpack},
};
Expand Down
4 changes: 0 additions & 4 deletions lib/avx/nif.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ defmodule AVx.NIF do
raise "NIF demuxer_alloc_from_file/1 not implemented"
end

def demuxer_read_header(_ctx) do
raise "NIF demuxer_read_header/1 not implemented"
end

def demuxer_streams(_ctx) do
raise "NIF demuxer_streams/1 not implemented"
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule AVx.MixProject do
{:nimble_options, "~> 1.0.0"},
{:elixir_make, "~> 0.6", runtime: false},
{:jason, "~> 1.4.1"},
{:thousand_island, "~> 1.1", only: :test}
{:thousand_island, "~> 1.0", only: :test}
]
end

Expand Down
20 changes: 19 additions & 1 deletion test/avx/demuxer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,25 @@ defmodule AVx.DemuxerTest do

describe "demuxer" do
test "from file" do
{:ok, demuxer} = Demuxer.new_from_file(@input)
assert_demuxer(@input)
end

test "from tcp socket" do
pid =
start_link_supervised!(
{ThousandIsland,
[port: 0, handler_module: Support.TiHandler, handler_options: %{path: @input}]}
)

{:ok, {_, port}} = ThousandIsland.listener_info(pid)
addr = "tcp://127.0.0.1:#{port}"

assert_demuxer(addr)
ThousandIsland.stop(pid)
end

defp assert_demuxer(input_path) do
{:ok, demuxer} = Demuxer.new_from_file(input_path)
streams = Demuxer.read_streams(demuxer)

stream = Enum.find(streams, fn stream -> stream.codec_type == :audio end)
Expand Down
11 changes: 11 additions & 0 deletions test/support.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,14 @@ defmodule Support do
|> Enum.filter(fn %{"type" => type} -> type == "frame" end)
end
end

defmodule Support.TiHandler do
use ThousandIsland.Handler

@impl ThousandIsland.Handler
def handle_connection(socket, state) do
data = File.read!(state.path)
ThousandIsland.Socket.send(socket, data)
{:close, state}
end
end

0 comments on commit 900cbdc

Please sign in to comment.