Skip to content

Commit

Permalink
I hate this fix
Browse files Browse the repository at this point in the history
  • Loading branch information
lil5 committed Dec 17, 2024
1 parent b1a60f8 commit 6d146e0
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion grpc/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type TimedPayloadResponse struct {
Error error
}
type TimedPayload struct {
buf *timedbuf.TimedBuf[TimedPayload]
c chan TimedPayloadResponse
Transfers []types.Transfer
}
Expand Down Expand Up @@ -54,6 +55,8 @@ func (a *App) Close() {
a.TB.Close()
}

const TB_MAX_BATCH_SIZE = 8190

func NewApp() *App {
tigerbeetle_go, err := tigerbeetle_go.NewClient(types.Uint128{uint8(Config.TbClusterID)}, Config.TbAddresses)
if err != nil {
Expand All @@ -66,13 +69,25 @@ func NewApp() *App {
if Config.IsBuffered {
tbufs = make([]*timedbuf.TimedBuf[TimedPayload], Config.BufferCluster)

// The maximum batch size is set in the TigerBeetle server. The default is 8190.
lenMaxBuf := float64(Config.BufferSize)
flushFunc := func(payloads []TimedPayload) {
transfers := []types.Transfer{}
lenPayloads := float64(len(payloads))
for _, payload := range payloads {
totalTransferSize := 0
isOverflowMaxTransferSizeIndex := -1
for i, payload := range payloads {
totalTransferSize += len(payload.Transfers)
if totalTransferSize > TB_MAX_BATCH_SIZE {
isOverflowMaxTransferSizeIndex = i
break
}
transfers = append(transfers, payload.Transfers...)
}
if isOverflowMaxTransferSizeIndex != -1 {
// If the total transfer size exceeds the maximum batch size, the overflow with be sent back to the buffer
payloads[isOverflowMaxTransferSizeIndex].buf.Put(payloads[isOverflowMaxTransferSizeIndex:]...)
}
metrics.TotalBufferCount.Inc()
metrics.TotalBufferContents.Add(lenPayloads)
metrics.TotalBufferMax.Add(lenMaxBuf)
Expand Down Expand Up @@ -217,6 +232,7 @@ func (s *App) CreateTransfers(ctx context.Context, in *proto.CreateTransfersRequ
c := make(chan TimedPayloadResponse)
buf.Put(TimedPayload{
c: c,
buf: buf,
Transfers: transfers,
})
res := <-c
Expand Down

0 comments on commit 6d146e0

Please sign in to comment.