Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Length on Send Functions, Expose KeepAlive, Finalize, etc #334

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Deepgram.Microphone/Microphone.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Deepgram.Microphone;
/// </summary>
public class Microphone
{
private Action<byte[]> _push_callback;
private Action<byte[], int> _push_callback;

private int _rate;
private uint _chunk;
Expand All @@ -27,7 +27,7 @@ public class Microphone
/// Constructor for Microphone
/// </summary>
public Microphone(
Action<byte[]> push_callback,
Action<byte[], int> push_callback,
int rate = Defaults.RATE,
uint chunkSize = Defaults.CHUNK_SIZE,
int channels = Defaults.CHANNELS,
Expand Down Expand Up @@ -120,7 +120,7 @@ private StreamCallbackResult _callback(nint input, nint output, uint frameCount,
}

// Push the data to the callback
_push_callback(buf);
_push_callback(buf, buf.Length);

return StreamCallbackResult.Continue;
}
Expand Down
15 changes: 15 additions & 0 deletions Deepgram/Clients/Interfaces/v1/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2024 Deepgram .NET SDK contributors. All Rights Reserved.
// Use of this source code is governed by a MIT license that can be found in the LICENSE file.
// SPDX-License-Identifier: MIT

namespace Deepgram.Clients.Interfaces.v1;

/// <summary>
/// Headers of interest in the return values from the Deepgram Speak API.
/// </summary>
public static class Constants
{
// WS buffer size
public const int UseArrayLengthForSend = -1;
}

25 changes: 20 additions & 5 deletions Deepgram/Clients/Interfaces/v1/IListenWebSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,47 @@ public Task Connect(LiveSchema options, CancellationTokenSource? cancelToken = n
#endregion

#region Send Functions
/// <summary>
/// Sends a KeepAlive message to Deepgram
/// </summary>
public void SendKeepAlive();

/// <summary>
/// Sends a Finalize message to Deepgram
/// </summary>
public void SendFinalize();

/// <summary>
/// Sends a Close message to Deepgram
/// </summary>
public void SendClose(bool nullByte = false);

/// <summary>
/// Sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data">The data to be sent over the WebSocket.</param>
public void Send(byte[] data);
public void Send(byte[] data, int length = Constants.UseArrayLengthForSend);

/// <summary>
/// This method sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data"></param>
public void SendBinary(byte[] data);
public void SendBinary(byte[] data, int length = Constants.UseArrayLengthForSend);

/// <summary>
/// This method sends a text message over the WebSocket connection.
/// </summary>
public void SendMessage(byte[] data);
public void SendMessage(byte[] data, int length = Constants.UseArrayLengthForSend);

/// <summary>
/// This method sends a binary message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendBinaryImmediately(byte[] data);
public void SendBinaryImmediately(byte[] data, int length = Constants.UseArrayLengthForSend);

/// <summary>
/// This method sends a text message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendMessageImmediately(byte[] data);
public void SendMessageImmediately(byte[] data, int length = Constants.UseArrayLengthForSend);
#endregion

#region Helpers
Expand Down
6 changes: 3 additions & 3 deletions Deepgram/Clients/Interfaces/v1/ISpeakWebSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public Task Connect(SpeakSchema options, CancellationTokenSource? cancelToken =
/// Sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data">The data to be sent over the WebSocket.</param>
public void Send(byte[] data);
public void Send(byte[] data, int length = Constants.UseArrayLengthForSend);

///// <summary>
///// This method sends a binary message over the WebSocket connection.
Expand All @@ -120,7 +120,7 @@ public Task Connect(SpeakSchema options, CancellationTokenSource? cancelToken =
/// <summary>
/// This method sends a text message over the WebSocket connection.
/// </summary>
public void SendMessage(byte[] data);
public void SendMessage(byte[] data, int length = Constants.UseArrayLengthForSend);

///// <summary>
///// This method sends a binary message over the WebSocket connection immediately without queueing.
Expand All @@ -130,7 +130,7 @@ public Task Connect(SpeakSchema options, CancellationTokenSource? cancelToken =
/// <summary>
/// This method sends a text message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendMessageImmediately(byte[] data);
public void SendMessageImmediately(byte[] data, int length = Constants.UseArrayLengthForSend);
#endregion

#region Helpers
Expand Down
47 changes: 38 additions & 9 deletions Deepgram/Clients/Listen/v1/WebSocket/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,47 +314,76 @@
SendMessageImmediately(data);
}

/// <summary>
/// Sends a Close message to Deepgram
/// </summary>
public void SendClose(bool nullByte = false)
{
Log.Debug("SendFinalize", "Sending Close Message Immediately...");
if (nullByte && _clientWebSocket != null)
{
// send a close to Deepgram
lock (_mutexSend)
{
_clientWebSocket.SendAsync(new ArraySegment<byte>([0]), WebSocketMessageType.Binary, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
return;
}

byte[] data = Encoding.ASCII.GetBytes("{\"type\": \"CloseStream\"}");
SendMessageImmediately(data);
}

/// <summary>
/// Sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data">The data to be sent over the WebSocket.</param>
public void Send(byte[] data) => SendBinary(data);
public void Send(byte[] data, int length = Constants.UseArrayLengthForSend) => SendBinary(data, length);

/// <summary>
/// This method sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data"></param>
public void SendBinary(byte[] data) =>
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Binary));
public void SendBinary(byte[] data, int length = Constants.UseArrayLengthForSend) =>
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Binary, length));

/// <summary>
/// This method sends a text message over the WebSocket connection.
/// </summary>
public void SendMessage(byte[] data) =>
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Text));
public void SendMessage(byte[] data, int length = Constants.UseArrayLengthForSend) =>
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Text, length));

/// <summary>
/// This method sends a binary message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendBinaryImmediately(byte[] data)
public void SendBinaryImmediately(byte[] data, int length = Constants.UseArrayLengthForSend)
{
lock (_mutexSend)
{
Log.Verbose("SendBinaryImmediately", "Sending binary message immediately.."); // TODO: dump this message
_clientWebSocket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Binary, true, _cancellationTokenSource.Token)
if (length == Constants.UseArrayLengthForSend)
{
length = data.Length;
}
_clientWebSocket.SendAsync(new ArraySegment<byte>(data, 0, length), WebSocketMessageType.Binary, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}

/// <summary>
/// This method sends a text message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendMessageImmediately(byte[] data)
public void SendMessageImmediately(byte[] data, int length = Constants.UseArrayLengthForSend)
{
lock (_mutexSend)
{
Log.Verbose("SendBinaryImmediately", "Sending binary message immediately.."); // TODO: dump this message
_clientWebSocket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Text, true, _cancellationTokenSource.Token)
if (length == Constants.UseArrayLengthForSend)
{
length = data.Length;
}
_clientWebSocket.SendAsync(new ArraySegment<byte>(data, 0, length), WebSocketMessageType.Text, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -967,7 +996,7 @@

if (_deepgramClientOptions.AutoFlushReplyDelta > 0)
{
if ((bool)resultResponse.IsFinal)

Check warning on line 999 in Deepgram/Clients/Listen/v1/WebSocket/Client.cs

View workflow job for this annotation

GitHub Actions / build

Nullable value type may be null.

Check warning on line 999 in Deepgram/Clients/Listen/v1/WebSocket/Client.cs

View workflow job for this annotation

GitHub Actions / test (6.0.x)

Nullable value type may be null.

Check warning on line 999 in Deepgram/Clients/Listen/v1/WebSocket/Client.cs

View workflow job for this annotation

GitHub Actions / test (7.0.x)

Nullable value type may be null.

Check warning on line 999 in Deepgram/Clients/Listen/v1/WebSocket/Client.cs

View workflow job for this annotation

GitHub Actions / test (8.0.x)

Nullable value type may be null.
{
var now = DateTime.Now;
Log.Debug("InspectMessage", $"AutoFlush IsFinal received. Time: {now}");
Expand Down
2 changes: 2 additions & 0 deletions Deepgram/Clients/Listen/v1/WebSocket/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ public static class Constants
{
// WS buffer size
public const int BufferSize = 1024 * 16;
public const int UseArrayLengthForSend = -1;

// Default timeout for connect/disconnect
public const int DefaultConnectTimeout = 5000;
public const int DefaultDisconnectTimeout = 5000;

// Default flush period
public const int DefaultFlushPeriodInMs = 500;
}

27 changes: 24 additions & 3 deletions Deepgram/Clients/Listen/v1/WebSocket/WebSocketMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,30 @@

namespace Deepgram.Clients.Listen.v1.WebSocket;

internal readonly struct WebSocketMessage(byte[] message, WebSocketMessageType type)
internal readonly struct WebSocketMessage
{
public ArraySegment<byte> Message { get; } = new ArraySegment<byte>(message);
public WebSocketMessage(byte[] message, WebSocketMessageType type)
: this(message, type, Constants.UseArrayLengthForSend)
{
}

public WebSocketMessageType MessageType { get; } = type;
public WebSocketMessage(byte[] message, WebSocketMessageType type, int length)
{
if (length != Constants.UseArrayLengthForSend || length < Constants.UseArrayLengthForSend)
{
Message = new ArraySegment<byte>(message, 0, length);
}
else
{
Message = new ArraySegment<byte>(message, 0, message.Length);
}
MessageType = type;
Length = length;
}

public int Length { get; }

public ArraySegment<byte> Message { get; }

public WebSocketMessageType MessageType { get; }
}
14 changes: 9 additions & 5 deletions Deepgram/Clients/Speak/v1/WebSocket/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void Close()
/// Sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data">The data to be sent over the WebSocket.</param>
public void Send(byte[] data) => SendMessage(data);
public void Send(byte[] data, int length = Constants.UseArrayLengthForSend) => SendMessage(data, length);

///// <summary>
///// This method sends a binary message over the WebSocket connection.
Expand All @@ -361,7 +361,7 @@ public void Close()
/// <summary>
/// This method sends a text message over the WebSocket connection.
/// </summary>
public void SendMessage(byte[] data)
public void SendMessage(byte[] data, int length = Constants.UseArrayLengthForSend)
{
// auto flush
if (_deepgramClientOptions.InspectSpeakMessage())
Expand All @@ -384,7 +384,7 @@ public void SendMessage(byte[] data)
}

// send message
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Text));
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Text, length));
}
///// <summary>
///// This method sends a binary message over the WebSocket connection immediately without queueing.
Expand All @@ -403,7 +403,7 @@ public void SendMessage(byte[] data)
/// <summary>
/// This method sends a text message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendMessageImmediately(byte[] data)
public void SendMessageImmediately(byte[] data, int length = Constants.UseArrayLengthForSend)
{
// auto flush
if (_deepgramClientOptions.InspectSpeakMessage())
Expand All @@ -428,7 +428,11 @@ public void SendMessageImmediately(byte[] data)
lock (_mutexSend)
{
Log.Verbose("SendBinaryImmediately", "Sending text message immediately.."); // TODO: dump this message
_clientWebSocket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Text, true, _cancellationTokenSource.Token)
if (length == Constants.UseArrayLengthForSend)
{
length = data.Length;
}
_clientWebSocket.SendAsync(new ArraySegment<byte>(data, 0, length), WebSocketMessageType.Text, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}
Expand Down
1 change: 1 addition & 0 deletions Deepgram/Clients/Speak/v1/WebSocket/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public static class Constants
{
// WS buffer size
public const int BufferSize = 1024 * 16;
public const int UseArrayLengthForSend = -1;

// Default timeout for connect/disconnect
public const int DefaultConnectTimeout = 5000;
Expand Down
27 changes: 24 additions & 3 deletions Deepgram/Clients/Speak/v1/WebSocket/WebSocketMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,30 @@

namespace Deepgram.Clients.Speak.v1.WebSocket;

internal readonly struct WebSocketMessage(byte[] message, WebSocketMessageType type)
internal readonly struct WebSocketMessage
{
public ArraySegment<byte> Message { get; } = new ArraySegment<byte>(message);
public WebSocketMessage(byte[] message, WebSocketMessageType type)
: this(message, type, Constants.UseArrayLengthForSend)
{
}

public WebSocketMessageType MessageType { get; } = type;
public WebSocketMessage(byte[] message, WebSocketMessageType type, int length)
{
if (length != Constants.UseArrayLengthForSend || length < Constants.UseArrayLengthForSend)
{
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of the || condition? If length is < -1, then it is a negative number, what does it mean to pass a negative number to ArraySegment? Perhaps you meant the following instead?

if (length != Constants.UseArrayLengthForSend && length <= message.Length) // ensure provided length is valid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you pass a negative number (which is invalid), it just takes the length of the buffer. We could also throw an exception.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But a negative length will go into the top half of the if and will be passed to ArraySegment.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the following would be best:
if (length != Constants.UseArrayLengthForSend && length <= message.Length && length > 0) // if length is provided ensure it is valid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. will make an update for more bumper rails in a follow up PR. In the meantime, I hope that someone would not put in a negative length.

Message = new ArraySegment<byte>(message, 0, length);
}
else
{
Message = new ArraySegment<byte>(message, 0, message.Length);
}
MessageType = type;
Length = length;
}

public int Length { get; }

public ArraySegment<byte> Message { get; }

public WebSocketMessageType MessageType { get; }
}
Loading