From 6d146e0d9f728edf787473dce463aa750d9f7549 Mon Sep 17 00:00:00 2001 From: "Lucian I. Last" Date: Tue, 17 Dec 2024 12:00:29 +0100 Subject: [PATCH] I hate this fix --- grpc/routes.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/grpc/routes.go b/grpc/routes.go index 0c52762..eff1e99 100644 --- a/grpc/routes.go +++ b/grpc/routes.go @@ -25,6 +25,7 @@ type TimedPayloadResponse struct { Error error } type TimedPayload struct { + buf *timedbuf.TimedBuf[TimedPayload] c chan TimedPayloadResponse Transfers []types.Transfer } @@ -54,6 +55,8 @@ func (a *App) Close() { a.TB.Close() } +const TB_MAX_BATCH_SIZE = 8190 + func NewApp() *App { tigerbeetle_go, err := tigerbeetle_go.NewClient(types.Uint128{uint8(Config.TbClusterID)}, Config.TbAddresses) if err != nil { @@ -66,13 +69,25 @@ func NewApp() *App { if Config.IsBuffered { tbufs = make([]*timedbuf.TimedBuf[TimedPayload], Config.BufferCluster) + // The maximum batch size is set in the TigerBeetle server. The default is 8190. lenMaxBuf := float64(Config.BufferSize) flushFunc := func(payloads []TimedPayload) { transfers := []types.Transfer{} lenPayloads := float64(len(payloads)) - for _, payload := range payloads { + totalTransferSize := 0 + isOverflowMaxTransferSizeIndex := -1 + for i, payload := range payloads { + totalTransferSize += len(payload.Transfers) + if totalTransferSize > TB_MAX_BATCH_SIZE { + isOverflowMaxTransferSizeIndex = i + break + } transfers = append(transfers, payload.Transfers...) } + if isOverflowMaxTransferSizeIndex != -1 { + // If the total transfer size exceeds the maximum batch size, the overflow with be sent back to the buffer + payloads[isOverflowMaxTransferSizeIndex].buf.Put(payloads[isOverflowMaxTransferSizeIndex:]...) + } metrics.TotalBufferCount.Inc() metrics.TotalBufferContents.Add(lenPayloads) metrics.TotalBufferMax.Add(lenMaxBuf) @@ -217,6 +232,7 @@ func (s *App) CreateTransfers(ctx context.Context, in *proto.CreateTransfersRequ c := make(chan TimedPayloadResponse) buf.Put(TimedPayload{ c: c, + buf: buf, Transfers: transfers, }) res := <-c