Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 72 additions & 37 deletions cmd/kshark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/hex"
"encoding/json"
"errors"
"flag"
"fmt"
Expand Down Expand Up @@ -320,16 +320,16 @@ type CheckStats struct {
}

type AnalysisMeta struct {
PromptFile string `json:"prompt_file,omitempty"`
PromptSHA256 string `json:"prompt_sha256,omitempty"`
PromptContent string `json:"prompt_content,omitempty"`
ResponseFile string `json:"response_file,omitempty"`
ResponseSHA256 string `json:"response_sha256,omitempty"`
PromptFile string `json:"prompt_file,omitempty"`
PromptSHA256 string `json:"prompt_sha256,omitempty"`
PromptContent string `json:"prompt_content,omitempty"`
ResponseFile string `json:"response_file,omitempty"`
ResponseSHA256 string `json:"response_sha256,omitempty"`
ResponseContent string `json:"response_content,omitempty"`
ResponseStatus string `json:"response_status,omitempty"`
ResponseError string `json:"response_error,omitempty"`
Provider string `json:"provider,omitempty"`
Model string `json:"model,omitempty"`
ResponseStatus string `json:"response_status,omitempty"`
ResponseError string `json:"response_error,omitempty"`
Provider string `json:"provider,omitempty"`
Model string `json:"model,omitempty"`
}

type RunMeta struct {
Expand Down Expand Up @@ -774,13 +774,21 @@ func probeProduceConsume(ctx context.Context, r *Report, p map[string]string, bo
logf("step kafka.leaders topic=%s err=unavailable", topic)
}
baseBalancer := selectBalancer(balancer)
// Create shared Transport that uses the dialer for all connections
// This ensures proper metadata refresh and broker discovery for Confluent Cloud
transport := &kafka.Transport{
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
return dialer.DialContext(ctx, network, address)
},
DialTimeout: produceTimeout,
}
w := &kafka.Writer{
Addr: kafka.TCP(strings.Split(bootstrap, ",")...),
Topic: topic,
Balancer: &loggingBalancer{base: baseBalancer, topic: topic, leaders: leaders},
RequiredAcks: kafka.RequireOne,
Async: false,
Transport: &kafka.Transport{SASL: dialer.SASLMechanism, TLS: dialer.TLS, DialTimeout: produceTimeout},
Transport: transport,
}
defer w.Close()

Expand All @@ -793,15 +801,42 @@ func probeProduceConsume(ctx context.Context, r *Report, p map[string]string, bo
}

writeStart := time.Now()
writeCtx, writeCancel := context.WithTimeout(ctx, produceTimeout)
defer writeCancel()
if err := w.WriteMessages(writeCtx, msg); err != nil {
logf("step kafka.produce topic=%s dur=%s err=%v", topic, time.Since(writeStart).Truncate(time.Millisecond), err)
addRow(r, Row{"kafka", topic, L7, FAIL, policyHint("Produce", err), hint(err)})
// Retry logic for metadata refresh errors (common with Confluent Cloud)
var writeErr error
maxRetries := 3
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(attempt) * 500 * time.Millisecond
logf("step kafka.produce.retry topic=%s attempt=%d backoff=%s", topic, attempt, backoff)
time.Sleep(backoff)
}

writeCtx, writeCancel := context.WithTimeout(ctx, produceTimeout)
writeErr = w.WriteMessages(writeCtx, msg)
writeCancel()

if writeErr == nil {
logf("step kafka.produce topic=%s dur=%s err=nil", topic, time.Since(writeStart).Truncate(time.Millisecond))
addRow(r, Row{"kafka", topic, L7, OK, "Produce OK", ""})
break
}

// Check if error is retryable (metadata-related)
if errors.Is(writeErr, kafka.NotLeaderForPartition) || errors.Is(writeErr, kafka.LeaderNotAvailable) {
logf("step kafka.produce topic=%s dur=%s err=%v (retrying)", topic, time.Since(writeStart).Truncate(time.Millisecond), writeErr)
continue
}

// Non-retryable error, break immediately
logf("step kafka.produce topic=%s dur=%s err=%v (not retryable)", topic, time.Since(writeStart).Truncate(time.Millisecond), writeErr)
break
}

if writeErr != nil {
logf("step kafka.produce topic=%s dur=%s err=%v", topic, time.Since(writeStart).Truncate(time.Millisecond), writeErr)
addRow(r, Row{"kafka", topic, L7, FAIL, policyHint("Produce", writeErr), hint(writeErr)})
return
}
logf("step kafka.produce topic=%s dur=%s err=nil", topic, time.Since(writeStart).Truncate(time.Millisecond))
addRow(r, Row{"kafka", topic, L7, OK, "Produce OK", ""})

if group == "" {
group = fmt.Sprintf("kwire-%d", time.Now().UnixNano())
Expand Down Expand Up @@ -1507,25 +1542,25 @@ func main() {
report.Run = &RunMeta{
Args: os.Args,
Params: map[string]ParamMeta{
"props": paramMeta(*propsPath, "", providedFlags["props"]),
"topic": paramMeta(*topic, "", providedFlags["topic"]),
"group": paramMeta(*group, "", providedFlags["group"]),
"json": paramMeta(*jsonOut, "", providedFlags["json"]),
"analyze": paramMeta(strconv.FormatBool(*analyze), "false", providedFlags["analyze"]),
"no-ai": paramMeta(strconv.FormatBool(*noAI), "false", providedFlags["no-ai"]),
"provider": paramMeta(*provider, "", providedFlags["provider"]),
"timeout": paramMeta(timeout.String(), "1m0s", providedFlags["timeout"]),
"kafka-timeout": paramMeta(kafkaTimeout.String(), "10s", providedFlags["kafka-timeout"]),
"op-timeout": paramMeta(opTimeout.String(), "10s", providedFlags["op-timeout"]),
"produce-timeout": paramMeta(produceTimeoutVal.String(), "inherit op-timeout", providedFlags["produce-timeout"]),
"consume-timeout": paramMeta(consumeTimeoutVal.String(), "inherit op-timeout", providedFlags["consume-timeout"]),
"start-offset": paramMeta(*startOffset, "earliest", providedFlags["start-offset"]),
"balancer": paramMeta(*balancer, "least", providedFlags["balancer"]),
"preset": paramMeta(*preset, "", providedFlags["preset"]),
"diag": paramMeta(strconv.FormatBool(*diag), "true", providedFlags["diag"]),
"log": paramMeta(*logPath, "", providedFlags["log"]),
"y": paramMeta(strconv.FormatBool(*yes), "false", providedFlags["y"]),
"version": paramMeta(strconv.FormatBool(*showVersion), "false", providedFlags["version"]),
"props": paramMeta(*propsPath, "", providedFlags["props"]),
"topic": paramMeta(*topic, "", providedFlags["topic"]),
"group": paramMeta(*group, "", providedFlags["group"]),
"json": paramMeta(*jsonOut, "", providedFlags["json"]),
"analyze": paramMeta(strconv.FormatBool(*analyze), "false", providedFlags["analyze"]),
"no-ai": paramMeta(strconv.FormatBool(*noAI), "false", providedFlags["no-ai"]),
"provider": paramMeta(*provider, "", providedFlags["provider"]),
"timeout": paramMeta(timeout.String(), "1m0s", providedFlags["timeout"]),
"kafka-timeout": paramMeta(kafkaTimeout.String(), "10s", providedFlags["kafka-timeout"]),
"op-timeout": paramMeta(opTimeout.String(), "10s", providedFlags["op-timeout"]),
"produce-timeout": paramMeta(produceTimeoutVal.String(), "inherit op-timeout", providedFlags["produce-timeout"]),
"consume-timeout": paramMeta(consumeTimeoutVal.String(), "inherit op-timeout", providedFlags["consume-timeout"]),
"start-offset": paramMeta(*startOffset, "earliest", providedFlags["start-offset"]),
"balancer": paramMeta(*balancer, "least", providedFlags["balancer"]),
"preset": paramMeta(*preset, "", providedFlags["preset"]),
"diag": paramMeta(strconv.FormatBool(*diag), "true", providedFlags["diag"]),
"log": paramMeta(*logPath, "", providedFlags["log"]),
"y": paramMeta(strconv.FormatBool(*yes), "false", providedFlags["y"]),
"version": paramMeta(strconv.FormatBool(*showVersion), "false", providedFlags["version"]),
},
}

Expand Down