From 343e7f9f4bcd450d5f6a7132d5d1f7f22139f397 Mon Sep 17 00:00:00 2001 From: Nico Duldhardt Date: Tue, 24 Feb 2026 22:23:49 +0100 Subject: [PATCH 1/2] perf: parallelize S3 segment and index uploads in flushLocked Upload .kfs and .index files concurrently using errgroup to eliminate one sequential S3 round-trip from the produce path. Side effects (cache population, segment list update) are deferred until both uploads succeed. Closes #109 --- pkg/storage/log.go | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/pkg/storage/log.go b/pkg/storage/log.go index 7479fb6..bc966c7 100644 --- a/pkg/storage/log.go +++ b/pkg/storage/log.go @@ -27,6 +27,7 @@ import ( "time" "github.com/KafScale/platform/pkg/cache" + "golang.org/x/sync/errgroup" ) // PartitionLogConfig configures per-partition log behavior. @@ -242,21 +243,25 @@ func (l *PartitionLog) flushLocked(ctx context.Context) (*SegmentArtifact, error segmentKey := l.segmentKey(artifact.BaseOffset) indexKey := l.indexKey(artifact.BaseOffset) - start := time.Now() - uploadErr := l.s3.UploadSegment(ctx, segmentKey, artifact.SegmentBytes) - if l.onS3Op != nil { - l.onS3Op("upload_segment", time.Since(start), uploadErr) - } - if uploadErr != nil { - return nil, uploadErr - } - start = time.Now() - uploadErr = l.s3.UploadIndex(ctx, indexKey, artifact.IndexBytes) - if l.onS3Op != nil { - l.onS3Op("upload_index", time.Since(start), uploadErr) - } - if uploadErr != nil { - return nil, uploadErr + g, gctx := errgroup.WithContext(ctx) + g.Go(func() error { + start := time.Now() + err := l.s3.UploadSegment(gctx, segmentKey, artifact.SegmentBytes) + if l.onS3Op != nil { + l.onS3Op("upload_segment", time.Since(start), err) + } + return err + }) + g.Go(func() error { + start := time.Now() + err := l.s3.UploadIndex(gctx, indexKey, artifact.IndexBytes) + if l.onS3Op != nil { + l.onS3Op("upload_index", time.Since(start), err) + } + return err + }) + if err := g.Wait(); err != nil { + return nil, err } if l.cache != nil && l.cfg.CacheEnabled { l.cache.SetSegment(l.cacheTopicKey(), l.partition, artifact.BaseOffset, artifact.SegmentBytes) From 4ae09e558a1df8f0a1b11cb44076044f75bf650e Mon Sep 17 00:00:00 2001 From: Nico Duldhardt Date: Wed, 25 Feb 2026 08:28:27 +0100 Subject: [PATCH 2/2] fix: use atomic counter in test to avoid data race with parallel uploads --- pkg/storage/log_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/storage/log_test.go b/pkg/storage/log_test.go index b06ff2c..28bf7ad 100644 --- a/pkg/storage/log_test.go +++ b/pkg/storage/log_test.go @@ -18,6 +18,7 @@ package storage import ( "context" "encoding/binary" + "sync/atomic" "testing" "time" @@ -143,7 +144,7 @@ func TestPartitionLogReadUsesIndexRange(t *testing.T) { func TestPartitionLogReportsS3Uploads(t *testing.T) { s3 := NewMemoryS3Client() c := cache.NewSegmentCache(1024) - var uploads int + var uploads atomic.Int64 log := NewPartitionLog("default", "orders", 0, 0, s3, c, PartitionLogConfig{ Buffer: WriteBufferConfig{ MaxBytes: 1, @@ -153,7 +154,7 @@ func TestPartitionLogReportsS3Uploads(t *testing.T) { IndexIntervalMessages: 1, }, }, nil, func(op string, d time.Duration, err error) { - uploads++ + uploads.Add(1) }) batchData := make([]byte, 70) @@ -165,8 +166,8 @@ func TestPartitionLogReportsS3Uploads(t *testing.T) { if err := log.Flush(context.Background()); err != nil { t.Fatalf("Flush: %v", err) } - if uploads < 2 { - t.Fatalf("expected upload callback for segment + index, got %d", uploads) + if uploads.Load() < 2 { + t.Fatalf("expected upload callback for segment + index, got %d", uploads.Load()) } }