Skip to content

Commit

Permalink
Multithread refactor (#13)
Browse files Browse the repository at this point in the history
* disconnect support

* Fix name

* To handle better connection and disconnection from room

* change to avoid using of Coroutines, using async methods instead

* async OnAudioRead

* Cleaning logs

* Cancel tokens

* Cancel token

* remove Task.Run, since usually SendRequest is called from other task

* getting rid of use YieldInstruction

* adding audio filter in main thread

* passing CancellationToken instead of CancellationTokenSource

* added check for cancellation at beginning

* Cleaning handles

* fix await

* remove cruft

* Cancellation token cleanup

* Factory async method

* basic handling of remaining events

* more async

* wip

* Compile pass

* formatting

* fix pending check

* revert async changes

* prep audio filter for threads

* loggin

* logs to debug publishing track

* minor cleanup

* added some methods to handle localtrack and SetSubscribed, Clenaing code

* Cleaning up

* fixes for yield instructions

* macro for verbose logs

* Audio data and pointer fix

Still says samplerate and num of channels are incorrect

* remove possibility of null

* todo ref buffering

* minor cleanup

* cruft

* todo object pooling

* remove redundant Task usages on separated threads

---------

Co-authored-by: cdga777 <cdga777@gmail.com>
Co-authored-by: nickkhalow <nickhalow@gmail.com>
  • Loading branch information
3 people authored and cloudwebrtc committed Mar 30, 2024
1 parent a61571a commit 2282fc6
Show file tree
Hide file tree
Showing 13 changed files with 338 additions and 161 deletions.
7 changes: 7 additions & 0 deletions README.md.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

142 changes: 100 additions & 42 deletions Runtime/Scripts/AudioSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using UnityEngine;
using LiveKit.Proto;
using LiveKit.Internal;
using System.Threading;

namespace LiveKit
{
Expand All @@ -15,6 +16,12 @@ public class RtcAudioSource

// Used on the AudioThread
private AudioFrame _frame;
private Thread _readAudioThread;
private object _lock = new object();
private float[] _data;
private int _channels;
private int _sampleRate;
private volatile bool _pending = false;

public RtcAudioSource(AudioSource source)
{
Expand All @@ -34,62 +41,113 @@ public RtcAudioSource(AudioSource source)
UpdateSource(source);
}

private void UpdateSource(AudioSource source)
public void Start()
{
_audioSource = source;
_audioFilter = source.gameObject.AddComponent<AudioFilter>();
//_audioFilter.hideFlags = HideFlags.HideInInspector;
_audioFilter.AudioRead += OnAudioRead;
source.Play();
Stop();
_readAudioThread = new Thread(Update);
_readAudioThread.Start();
}


private void OnAudioRead(float[] data, int channels, int sampleRate)
public void Stop()
{
var samplesPerChannel = data.Length / channels;
if (_frame == null || _frame.NumChannels != channels
|| _frame.SampleRate != sampleRate
|| _frame.SamplesPerChannel != samplesPerChannel)
if (_readAudioThread != null)
{
_frame = new AudioFrame(sampleRate, channels, samplesPerChannel);
}

static short FloatToS16(float v) {
v *= 32768f;
v = Math.Min(v, 32767f);
v = Math.Max(v, -32768f);
return (short)(v + Math.Sign(v) * 0.5f);
_readAudioThread.Abort();
_readAudioThread = null;
}
}

unsafe {
var frameData = new Span<short>(_frame.Data.ToPointer(), _frame.Length / sizeof(short));
for (int i = 0; i < data.Length; i++)
private void Update()
{
while (true)
{
Thread.Sleep(Constants.TASK_DELAY);
if (_pending)
{
frameData[i] = FloatToS16(data[i]);
ReadAudio();
}
}
}

// Don't play the audio locally
Array.Clear(data, 0, data.Length);

var audioFrameBufferInfo = new AudioFrameBufferInfo();

audioFrameBufferInfo.DataPtr = (ulong)_frame.Data;
audioFrameBufferInfo.NumChannels = _frame.NumChannels;
audioFrameBufferInfo.SampleRate = _frame.SampleRate;
audioFrameBufferInfo.SamplesPerChannel = _frame.SamplesPerChannel;

var pushFrame = new CaptureAudioFrameRequest();
pushFrame.SourceHandle = Handle.Id;
pushFrame.Buffer = audioFrameBufferInfo;

var request = new FfiRequest();
request.CaptureAudioFrame = pushFrame;
private void ReadAudio()
{
_pending = false;
lock (_lock)
{
var samplesPerChannel = _data.Length / _channels;
if (_frame == null
|| _frame.NumChannels != _channels
|| _frame.SampleRate != _sampleRate
|| _frame.SamplesPerChannel != samplesPerChannel)
{
_frame = new AudioFrame(_sampleRate, _channels, samplesPerChannel);
}
try
{

FfiClient.SendRequest(request);
static short FloatToS16(float v)
{
v *= 32768f;
v = Math.Min(v, 32767f);
v = Math.Max(v, -32768f);
return (short)(v + Math.Sign(v) * 0.5f);
}

unsafe
{
var frameData = new Span<short>(_frame.Data.ToPointer(), _frame.Length / sizeof(short));
for (int i = 0; i < _data.Length; i++)
{
frameData[i] = FloatToS16(_data[i]);
}

var pushFrame = new CaptureAudioFrameRequest();
pushFrame.SourceHandle = (ulong)Handle.Id;
pushFrame.Buffer = new AudioFrameBufferInfo()
{
DataPtr = (ulong)_frame.Data,
NumChannels = _frame.NumChannels,
SampleRate = _frame.SampleRate,
SamplesPerChannel = _frame.SamplesPerChannel
};
var request = new FfiRequest();
request.CaptureAudioFrame = pushFrame;

FfiClient.SendRequest(request);
}

Utils.Debug($"Pushed audio frame with {_data.Length} sample rate "
+ _frame.SampleRate
+ " num channels "
+ _frame.NumChannels
+ " and samplers per channel "
+ _frame.SamplesPerChannel);
}
catch (Exception e)
{
Utils.Error("Audio Framedata error: " + e.Message);
}
}
}

private void UpdateSource(AudioSource source)
{
_audioSource = source;
_audioFilter = source.gameObject.AddComponent<AudioFilter>();
//_audioFilter.hideFlags = HideFlags.HideInInspector;
_audioFilter.AudioRead += OnAudioRead;
source.Play();
}

//Debug.Log($"Pushed audio frame with {data.Length} samples");
private void OnAudioRead(float[] data, int channels, int sampleRate)
{
lock (_lock)
{
_data = data;
_channels = channels;
_sampleRate = sampleRate;
_pending = true;
}
}
}
}
2 changes: 1 addition & 1 deletion Runtime/Scripts/AudioStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class AudioStream
internal readonly FfiOwnedHandle Handle;
private AudioSource _audioSource;
private AudioFilter _audioFilter;
private RingBuffer? _buffer;
private RingBuffer _buffer;
private short[] _tempBuffer;
private uint _numChannels;
private uint _sampleRate;
Expand Down
10 changes: 10 additions & 0 deletions Runtime/Scripts/Internal/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Diagnostics;
using UnityEngine;

namespace LiveKit.Internal
{
internal static class Constants
{
internal const int TASK_DELAY = 5;
}
}
11 changes: 11 additions & 0 deletions Runtime/Scripts/Internal/Constants.cs.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 20 additions & 7 deletions Runtime/Scripts/Internal/FFIClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace LiveKit.Internal
// Callbacks
internal delegate void PublishTrackDelegate(PublishTrackCallback e);
internal delegate void ConnectReceivedDelegate(ConnectCallback e);
internal delegate void DisconnectReceivedDelegate(DisconnectCallback e);

// Events
internal delegate void RoomEventReceivedDelegate(RoomEvent e);
Expand All @@ -36,6 +37,7 @@ internal sealed class FfiClient

public event PublishTrackDelegate PublishTrackReceived;
public event ConnectReceivedDelegate ConnectReceived;
public event DisconnectReceivedDelegate DisconnectReceived;
public event RoomEventReceivedDelegate RoomEventReceived;
public event TrackEventReceivedDelegate TrackEventReceived;
//public event ParticipantEventReceivedDelegate ParticipantEventReceived;
Expand Down Expand Up @@ -72,7 +74,7 @@ static void Init()
}
#endif

static void Quit()
private static void Quit()
{
#if UNITY_EDITOR
AssemblyReloadEvents.beforeAssemblyReload -= OnBeforeAssemblyReload;
Expand Down Expand Up @@ -103,15 +105,23 @@ static void Dispose()
internal static FfiResponse SendRequest(FfiRequest request)
{
var data = request.ToByteArray();
FfiResponse response;
unsafe
{
var handle = NativeMethods.FfiNewRequest(data, data.Length, out byte* dataPtr, out int dataLen);
response = FfiResponse.Parser.ParseFrom(new Span<byte>(dataPtr, dataLen));
handle.Dispose();
try
{
var handle = NativeMethods.FfiNewRequest(data, data.Length, out byte* dataPtr, out int dataLen);
var response = FfiResponse.Parser.ParseFrom(new Span<byte>(dataPtr, dataLen));
handle.Dispose();
return response;
}
catch (Exception e)
{
// Since we are in a thread I want to make sure we catch and log
Utils.Error(e);
// But we aren't actually handling this exception so we should re-throw here
throw e;
}
}

return response;
}


Expand All @@ -131,6 +141,9 @@ static unsafe void FFICallback(IntPtr data, int size)
case FfiEvent.MessageOneofCase.Connect:
Instance.ConnectReceived?.Invoke(response.Connect);
break;
case FfiEvent.MessageOneofCase.Disconnect:
Instance.DisconnectReceived?.Invoke(response.Disconnect);
break;
case FfiEvent.MessageOneofCase.PublishTrack:
Instance.PublishTrackReceived?.Invoke(response.PublishTrack);
break;
Expand Down
Loading

0 comments on commit 2282fc6

Please sign in to comment.