Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Reuse buffers in table iterations #2151

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions crates/bindings-csharp/Runtime/Internal/FFI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,33 @@ internal static partial class FFI
#endif
;

public static void Check(this Errno status)
{
if (status == Errno.OK)
{
return;
}
throw status switch
{
Errno.NOT_IN_TRANSACTION => new NotInTransactionException(),
Errno.BSATN_DECODE_ERROR => new BsatnDecodeException(),
Errno.NO_SUCH_TABLE => new NoSuchTableException(),
Errno.NO_SUCH_INDEX => new NoSuchIndexException(),
Errno.NO_SUCH_ITER => new NoSuchIterException(),
Errno.NO_SUCH_CONSOLE_TIMER => new NoSuchLogStopwatch(),
Errno.NO_SUCH_BYTES => new NoSuchBytesException(),
Errno.NO_SPACE => new NoSpaceException(),
Errno.BUFFER_TOO_SMALL => new BufferTooSmallException(),
Errno.UNIQUE_ALREADY_EXISTS => new UniqueConstraintViolationException(),
Errno.SCHEDULE_AT_DELAY_TOO_LONG => new ScheduleAtDelayTooLongException(),
Errno.INDEX_NOT_UNIQUE => new IndexNotUniqueException(),
Errno.NO_SUCH_ROW => new NoSuchRowException(),
_ => new UnknownException(status),
};
}

[NativeMarshalling(typeof(Marshaller))]
public struct CheckedStatus
internal struct CheckedStatus
{
// This custom marshaller takes care of checking the status code
// returned from the host and throwing an exception if it's not 0.
Expand All @@ -69,27 +94,8 @@ internal static class Marshaller
{
public static CheckedStatus ConvertToManaged(Errno status)
{
if (status == 0)
{
return default;
}
throw status switch
{
Errno.NOT_IN_TRANSACTION => new NotInTransactionException(),
Errno.BSATN_DECODE_ERROR => new BsatnDecodeException(),
Errno.NO_SUCH_TABLE => new NoSuchTableException(),
Errno.NO_SUCH_INDEX => new NoSuchIndexException(),
Errno.NO_SUCH_ITER => new NoSuchIterException(),
Errno.NO_SUCH_CONSOLE_TIMER => new NoSuchLogStopwatch(),
Errno.NO_SUCH_BYTES => new NoSuchBytesException(),
Errno.NO_SPACE => new NoSpaceException(),
Errno.BUFFER_TOO_SMALL => new BufferTooSmallException(),
Errno.UNIQUE_ALREADY_EXISTS => new UniqueConstraintViolationException(),
Errno.SCHEDULE_AT_DELAY_TOO_LONG => new ScheduleAtDelayTooLongException(),
Errno.INDEX_NOT_UNIQUE => new IndexNotUniqueException(),
Errno.NO_SUCH_ROW => new NoSuchRowException(),
_ => new UnknownException(status),
};
status.Check();
return default;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bindings-csharp/Runtime/Internal/IIndex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ out ReadOnlySpan<byte> rend
}

protected IEnumerable<Row> DoFilter<Bounds>(Bounds bounds)
where Bounds : IBTreeIndexBounds => new RawTableIter<Bounds>(indexId, bounds).Parse();
where Bounds : IBTreeIndexBounds => new RawTableIter<Bounds>(indexId, bounds);

protected uint DoDelete<Bounds>(Bounds bounds)
where Bounds : IBTreeIndexBounds
Expand Down
100 changes: 38 additions & 62 deletions crates/bindings-csharp/Runtime/Internal/ITable.cs
Original file line number Diff line number Diff line change
@@ -1,95 +1,71 @@
namespace SpacetimeDB.Internal;

using System.Buffers;
using System.Collections;
using SpacetimeDB.BSATN;

internal abstract class RawTableIterBase<T>
internal abstract class RawTableIterBase<T> : IEnumerable<T>
where T : IStructuralReadWrite, new()
{
public sealed class Enumerator(FFI.RowIter handle) : IDisposable
{
byte[] buffer = new byte[0x20_000];
public byte[] Current { get; private set; } = [];
protected abstract void IterStart(out FFI.RowIter handle);

public bool MoveNext()
public IEnumerator<T> GetEnumerator()
{
IterStart(out var handle);
// Initial buffer size to match Rust one (see `DEFAULT_BUFFER_CAPACITY` in `bindings/src/lib.rs`).
// Use pool to reduce GC pressure between iterations.
var buffer = ArrayPool<byte>.Shared.Rent(0x10_000);
try
{
if (handle == FFI.RowIter.INVALID)
{
return false;
}

uint buffer_len;
while (true)
while (handle != FFI.RowIter.INVALID)
{
buffer_len = (uint)buffer.Length;
var buffer_len = (uint)buffer.Length;
var ret = FFI.row_iter_bsatn_advance(handle, buffer, ref buffer_len);
if (ret == Errno.EXHAUSTED)
{
handle = FFI.RowIter.INVALID;
}
// On success, the only way `buffer_len == 0` is for the iterator to be exhausted.
// This happens when the host iterator was empty from the start.
System.Diagnostics.Debug.Assert(!(ret == Errno.OK && buffer_len == 0));
switch (ret)
{
// Iterator advanced and may also be `EXHAUSTED`.
// When `OK`, we'll need to advance the iterator in the next call to `MoveNext`.
// In both cases, copy over the row data to `Current` from the scratch `buffer`.
case Errno.EXHAUSTED
or Errno.OK:
Current = new byte[buffer_len];
Array.Copy(buffer, 0, Current, 0, buffer_len);
return buffer_len != 0;
// Couldn't find the iterator, error!
case Errno.NO_SUCH_ITER:
throw new NoSuchIterException();
// Iterator is exhausted.
// Treat in the same way as OK, just tell the next iteration to stop.
case Errno.EXHAUSTED:
handle = FFI.RowIter.INVALID;
goto case Errno.OK;
// We got a chunk of rows, parse all of them before moving to the next chunk.
case Errno.OK:
{
using var stream = new MemoryStream(buffer, 0, (int)buffer_len);
using var reader = new BinaryReader(stream);
while (stream.Position < stream.Length)
{
yield return IStructuralReadWrite.Read<T>(reader);
}
break;
}
// The scratch `buffer` is too small to fit a row / chunk.
// Grow `buffer` and try again.
// The `buffer_len` will have been updated with the necessary size.
case Errno.BUFFER_TOO_SMALL:
buffer = new byte[buffer_len];
continue;
ArrayPool<byte>.Shared.Return(buffer);
buffer = ArrayPool<byte>.Shared.Rent((int)buffer_len);
break;
default:
throw new UnknownException(ret);
ret.Check();
break;
}
}
}

public void Dispose()
finally
{
if (handle != FFI.RowIter.INVALID)
{
FFI.row_iter_bsatn_close(handle);
handle = FFI.RowIter.INVALID;
}
}

public void Reset()
{
throw new NotImplementedException();
ArrayPool<byte>.Shared.Return(buffer);
}
}

protected abstract void IterStart(out FFI.RowIter handle);

// Note: using the GetEnumerator() duck-typing protocol instead of IEnumerable to avoid extra boxing.
public Enumerator GetEnumerator()
{
IterStart(out var handle);
return new(handle);
}

public IEnumerable<T> Parse()
{
foreach (var chunk in this)
{
using var stream = new MemoryStream(chunk);
using var reader = new BinaryReader(stream);
while (stream.Position < stream.Length)
{
yield return IStructuralReadWrite.Read<T>(reader);
}
}
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

public interface ITableView<View, T>
Expand Down Expand Up @@ -136,7 +112,7 @@ protected static ulong DoCount()
return count;
}

protected static IEnumerable<T> DoIter() => new RawTableIter(tableId).Parse();
protected static IEnumerable<T> DoIter() => new RawTableIter(tableId);

protected static T DoInsert(T row)
{
Expand Down
22 changes: 12 additions & 10 deletions crates/bindings-csharp/Runtime/Internal/Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,12 @@ public static void RegisterTable<T, View>()
moduleDef.RegisterTable(View.MakeTableDesc(typeRegistrar));
}

private static byte[] Consume(this BytesSource source)
private static MemoryStream Consume(this BytesSource source, ref byte[] buffer)
{
if (source == BytesSource.INVALID)
{
return [];
return new();
}
var buffer = new byte[0x20_000];
var written = 0U;
while (true)
{
Expand All @@ -114,8 +113,7 @@ private static byte[] Consume(this BytesSource source)
{
// Host side source exhausted, we're done.
case Errno.EXHAUSTED:
Array.Resize(ref buffer, (int)written);
return buffer;
return new(buffer, 0, (int)written);
// Wrote the entire spare capacity.
// Need to reserve more space in the buffer.
case Errno.OK when written == buffer.Length:
Expand All @@ -126,11 +124,8 @@ private static byte[] Consume(this BytesSource source)
// The host will likely not trigger this branch (current host doesn't),
// but a module should be prepared for it.
case Errno.OK:
ret.Check();
break;
case Errno.NO_SUCH_BYTES:
throw new NoSuchBytesException();
default:
throw new UnknownException(ret);
}
}
}
Expand Down Expand Up @@ -164,6 +159,13 @@ public static void __describe_module__(BytesSink description)
}
}

// Note: `__call_reducer__` can't be invoked in parallel because we don't support multithreading in Wasm,
// nor is it supposed to be invoked recursively.
//
// This means we can reuse the same argument buffer for all `__call_reducer__` invocations -
// unlike in e.g. iterators, where multiple iterators can easily exist at the same time.
private static byte[] reducerArgsBuffer = new byte[0x10_000];

public static Errno __call_reducer__(
uint id,
ulong sender_0,
Expand All @@ -190,7 +192,7 @@ BytesSink error

var ctx = newContext!(senderIdentity, senderAddress, random, time);

using var stream = new MemoryStream(args.Consume());
using var stream = args.Consume(ref reducerArgsBuffer);
using var reader = new BinaryReader(stream);
reducers[(int)id].Invoke(reader, ctx);
if (stream.Position != stream.Length)
Expand Down