Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions cmd/broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/KafScale/platform/pkg/metadata"
"github.com/KafScale/platform/pkg/protocol"
"github.com/KafScale/platform/pkg/storage"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -54,8 +55,9 @@ const (
defaultMinioRegion = "us-east-1"
defaultMinioEndpoint = "http://127.0.0.1:9000"
defaultMinioAccessKey = "minioadmin"
defaultMinioSecretKey = "minioadmin"
brokerVersion = "dev"
defaultMinioSecretKey = "minioadmin"
defaultS3Concurrency = 64
brokerVersion = "dev"
)

type handler struct {
Expand Down Expand Up @@ -91,6 +93,7 @@ type handler struct {
authMetrics *authMetrics
authLogMu sync.Mutex
authLogLast map[string]time.Time
s3sem *semaphore.Weighted
}

type etcdAvailability interface {
Expand Down Expand Up @@ -1971,7 +1974,7 @@ func (h *handler) getPartitionLog(ctx context.Context, topic string, partition i
if err := h.store.UpdateOffsets(cbCtx, topic, partition, artifact.LastOffset); err != nil {
h.logger.Error("update offsets failed", "error", err, "topic", topic, "partition", partition)
}
}, h.recordS3Op)
}, h.recordS3Op, h.s3sem)
lastOffset, err := plog.RestoreFromS3(ctx)
if err != nil {
h.logger.Error("restore partition log from S3 failed", "topic", topic, "partition", partition, "error", err)
Expand Down Expand Up @@ -2007,6 +2010,12 @@ func newHandler(store metadata.Store, s3Client storage.S3Client, brokerInfo prot
segmentBytes := parseEnvInt("KAFSCALE_SEGMENT_BYTES", 4<<20)
flushInterval := time.Duration(parseEnvInt("KAFSCALE_FLUSH_INTERVAL_MS", 500)) * time.Millisecond
flushOnAck := parseEnvBool("KAFSCALE_PRODUCE_SYNC_FLUSH", true)
// 0 or negative disables the S3 concurrency limit (no semaphore, default HTTP pool).
s3Concurrency := parseEnvInt("KAFSCALE_S3_CONCURRENCY", defaultS3Concurrency)
var s3sem *semaphore.Weighted
if s3Concurrency > 0 {
s3sem = semaphore.NewWeighted(int64(s3Concurrency))
}
produceLatencyBuckets := []float64{1, 2, 5, 10, 25, 50, 100, 250, 500, 1000, 2000, 5000}
consumerLagBuckets := []float64{1, 10, 100, 1000, 5000, 10000, 50000, 100000, 500000, 1000000}
if autoPartitions < 1 {
Expand Down Expand Up @@ -2055,6 +2064,7 @@ func newHandler(store metadata.Store, s3Client storage.S3Client, brokerInfo prot
authorizer: authorizer,
authMetrics: newAuthMetrics(),
authLogLast: make(map[string]time.Time),
s3sem: s3sem,
}
}

Expand Down Expand Up @@ -2184,6 +2194,7 @@ func buildS3ConfigsFromEnv() (storage.S3Config, storage.S3Config, bool, bool, bo
secretKey = defaultMinioSecretKey
}
credsProvided := accessKey != "" && secretKey != ""
s3Concurrency := parseEnvInt("KAFSCALE_S3_CONCURRENCY", defaultS3Concurrency)
writeCfg := storage.S3Config{
Bucket: writeBucket,
Region: writeRegion,
Expand All @@ -2193,6 +2204,7 @@ func buildS3ConfigsFromEnv() (storage.S3Config, storage.S3Config, bool, bool, bo
SecretAccessKey: secretKey,
SessionToken: sessionToken,
KMSKeyARN: kmsARN,
MaxConnections: s3Concurrency,
}

readBucket := os.Getenv("KAFSCALE_S3_READ_BUCKET")
Expand All @@ -2217,6 +2229,7 @@ func buildS3ConfigsFromEnv() (storage.S3Config, storage.S3Config, bool, bool, bo
SecretAccessKey: secretKey,
SessionToken: sessionToken,
KMSKeyARN: kmsARN,
MaxConnections: s3Concurrency,
}
return writeCfg, readCfg, false, usingDefaultMinio, credsProvided, useReadReplica
}
Expand Down
1 change: 1 addition & 0 deletions docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ Recommended operator alerting (when using Prometheus Operator):
- `KAFSCALE_S3_PATH_STYLE` – Force path-style addressing (`true/false`).
- `KAFSCALE_S3_KMS_ARN` – KMS key ARN for SSE-KMS.
- `KAFSCALE_S3_ACCESS_KEY`, `KAFSCALE_S3_SECRET_KEY`, `KAFSCALE_S3_SESSION_TOKEN` – S3 credentials.
- `KAFSCALE_S3_CONCURRENCY` – Broker-wide cap on concurrent S3 operations (default `64`, `0` to disable). Lower for slower S3-compatible backends.

Read replica example (multi-region reads):

Expand Down
Loading