diff --git a/pkg/storage/log.go b/pkg/storage/log.go index 7479fb6..bc966c7 100644 --- a/pkg/storage/log.go +++ b/pkg/storage/log.go @@ -27,6 +27,7 @@ import ( "time" "github.com/KafScale/platform/pkg/cache" + "golang.org/x/sync/errgroup" ) // PartitionLogConfig configures per-partition log behavior. @@ -242,21 +243,25 @@ func (l *PartitionLog) flushLocked(ctx context.Context) (*SegmentArtifact, error segmentKey := l.segmentKey(artifact.BaseOffset) indexKey := l.indexKey(artifact.BaseOffset) - start := time.Now() - uploadErr := l.s3.UploadSegment(ctx, segmentKey, artifact.SegmentBytes) - if l.onS3Op != nil { - l.onS3Op("upload_segment", time.Since(start), uploadErr) - } - if uploadErr != nil { - return nil, uploadErr - } - start = time.Now() - uploadErr = l.s3.UploadIndex(ctx, indexKey, artifact.IndexBytes) - if l.onS3Op != nil { - l.onS3Op("upload_index", time.Since(start), uploadErr) - } - if uploadErr != nil { - return nil, uploadErr + g, gctx := errgroup.WithContext(ctx) + g.Go(func() error { + start := time.Now() + err := l.s3.UploadSegment(gctx, segmentKey, artifact.SegmentBytes) + if l.onS3Op != nil { + l.onS3Op("upload_segment", time.Since(start), err) + } + return err + }) + g.Go(func() error { + start := time.Now() + err := l.s3.UploadIndex(gctx, indexKey, artifact.IndexBytes) + if l.onS3Op != nil { + l.onS3Op("upload_index", time.Since(start), err) + } + return err + }) + if err := g.Wait(); err != nil { + return nil, err } if l.cache != nil && l.cfg.CacheEnabled { l.cache.SetSegment(l.cacheTopicKey(), l.partition, artifact.BaseOffset, artifact.SegmentBytes) diff --git a/pkg/storage/log_test.go b/pkg/storage/log_test.go index b06ff2c..28bf7ad 100644 --- a/pkg/storage/log_test.go +++ b/pkg/storage/log_test.go @@ -18,6 +18,7 @@ package storage import ( "context" "encoding/binary" + "sync/atomic" "testing" "time" @@ -143,7 +144,7 @@ func TestPartitionLogReadUsesIndexRange(t *testing.T) { func TestPartitionLogReportsS3Uploads(t *testing.T) { s3 := NewMemoryS3Client() c := cache.NewSegmentCache(1024) - var uploads int + var uploads atomic.Int64 log := NewPartitionLog("default", "orders", 0, 0, s3, c, PartitionLogConfig{ Buffer: WriteBufferConfig{ MaxBytes: 1, @@ -153,7 +154,7 @@ func TestPartitionLogReportsS3Uploads(t *testing.T) { IndexIntervalMessages: 1, }, }, nil, func(op string, d time.Duration, err error) { - uploads++ + uploads.Add(1) }) batchData := make([]byte, 70) @@ -165,8 +166,8 @@ func TestPartitionLogReportsS3Uploads(t *testing.T) { if err := log.Flush(context.Background()); err != nil { t.Fatalf("Flush: %v", err) } - if uploads < 2 { - t.Fatalf("expected upload callback for segment + index, got %d", uploads) + if uploads.Load() < 2 { + t.Fatalf("expected upload callback for segment + index, got %d", uploads.Load()) } }