diff --git a/cmd/kshark/main.go b/cmd/kshark/main.go index f3fdcdd..a2128e2 100644 --- a/cmd/kshark/main.go +++ b/cmd/kshark/main.go @@ -22,8 +22,8 @@ import ( "crypto/sha256" "crypto/tls" "crypto/x509" - "encoding/json" "encoding/hex" + "encoding/json" "errors" "flag" "fmt" @@ -320,16 +320,16 @@ type CheckStats struct { } type AnalysisMeta struct { - PromptFile string `json:"prompt_file,omitempty"` - PromptSHA256 string `json:"prompt_sha256,omitempty"` - PromptContent string `json:"prompt_content,omitempty"` - ResponseFile string `json:"response_file,omitempty"` - ResponseSHA256 string `json:"response_sha256,omitempty"` + PromptFile string `json:"prompt_file,omitempty"` + PromptSHA256 string `json:"prompt_sha256,omitempty"` + PromptContent string `json:"prompt_content,omitempty"` + ResponseFile string `json:"response_file,omitempty"` + ResponseSHA256 string `json:"response_sha256,omitempty"` ResponseContent string `json:"response_content,omitempty"` - ResponseStatus string `json:"response_status,omitempty"` - ResponseError string `json:"response_error,omitempty"` - Provider string `json:"provider,omitempty"` - Model string `json:"model,omitempty"` + ResponseStatus string `json:"response_status,omitempty"` + ResponseError string `json:"response_error,omitempty"` + Provider string `json:"provider,omitempty"` + Model string `json:"model,omitempty"` } type RunMeta struct { @@ -774,13 +774,21 @@ func probeProduceConsume(ctx context.Context, r *Report, p map[string]string, bo logf("step kafka.leaders topic=%s err=unavailable", topic) } baseBalancer := selectBalancer(balancer) + // Create shared Transport that uses the dialer for all connections + // This ensures proper metadata refresh and broker discovery for Confluent Cloud + transport := &kafka.Transport{ + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + return dialer.DialContext(ctx, network, address) + }, + DialTimeout: produceTimeout, + } w := &kafka.Writer{ Addr: kafka.TCP(strings.Split(bootstrap, ",")...), Topic: topic, Balancer: &loggingBalancer{base: baseBalancer, topic: topic, leaders: leaders}, RequiredAcks: kafka.RequireOne, Async: false, - Transport: &kafka.Transport{SASL: dialer.SASLMechanism, TLS: dialer.TLS, DialTimeout: produceTimeout}, + Transport: transport, } defer w.Close() @@ -793,15 +801,42 @@ func probeProduceConsume(ctx context.Context, r *Report, p map[string]string, bo } writeStart := time.Now() - writeCtx, writeCancel := context.WithTimeout(ctx, produceTimeout) - defer writeCancel() - if err := w.WriteMessages(writeCtx, msg); err != nil { - logf("step kafka.produce topic=%s dur=%s err=%v", topic, time.Since(writeStart).Truncate(time.Millisecond), err) - addRow(r, Row{"kafka", topic, L7, FAIL, policyHint("Produce", err), hint(err)}) + // Retry logic for metadata refresh errors (common with Confluent Cloud) + var writeErr error + maxRetries := 3 + for attempt := 0; attempt < maxRetries; attempt++ { + if attempt > 0 { + backoff := time.Duration(attempt) * 500 * time.Millisecond + logf("step kafka.produce.retry topic=%s attempt=%d backoff=%s", topic, attempt, backoff) + time.Sleep(backoff) + } + + writeCtx, writeCancel := context.WithTimeout(ctx, produceTimeout) + writeErr = w.WriteMessages(writeCtx, msg) + writeCancel() + + if writeErr == nil { + logf("step kafka.produce topic=%s dur=%s err=nil", topic, time.Since(writeStart).Truncate(time.Millisecond)) + addRow(r, Row{"kafka", topic, L7, OK, "Produce OK", ""}) + break + } + + // Check if error is retryable (metadata-related) + if errors.Is(writeErr, kafka.NotLeaderForPartition) || errors.Is(writeErr, kafka.LeaderNotAvailable) { + logf("step kafka.produce topic=%s dur=%s err=%v (retrying)", topic, time.Since(writeStart).Truncate(time.Millisecond), writeErr) + continue + } + + // Non-retryable error, break immediately + logf("step kafka.produce topic=%s dur=%s err=%v (not retryable)", topic, time.Since(writeStart).Truncate(time.Millisecond), writeErr) + break + } + + if writeErr != nil { + logf("step kafka.produce topic=%s dur=%s err=%v", topic, time.Since(writeStart).Truncate(time.Millisecond), writeErr) + addRow(r, Row{"kafka", topic, L7, FAIL, policyHint("Produce", writeErr), hint(writeErr)}) return } - logf("step kafka.produce topic=%s dur=%s err=nil", topic, time.Since(writeStart).Truncate(time.Millisecond)) - addRow(r, Row{"kafka", topic, L7, OK, "Produce OK", ""}) if group == "" { group = fmt.Sprintf("kwire-%d", time.Now().UnixNano()) @@ -1507,25 +1542,25 @@ func main() { report.Run = &RunMeta{ Args: os.Args, Params: map[string]ParamMeta{ - "props": paramMeta(*propsPath, "", providedFlags["props"]), - "topic": paramMeta(*topic, "", providedFlags["topic"]), - "group": paramMeta(*group, "", providedFlags["group"]), - "json": paramMeta(*jsonOut, "", providedFlags["json"]), - "analyze": paramMeta(strconv.FormatBool(*analyze), "false", providedFlags["analyze"]), - "no-ai": paramMeta(strconv.FormatBool(*noAI), "false", providedFlags["no-ai"]), - "provider": paramMeta(*provider, "", providedFlags["provider"]), - "timeout": paramMeta(timeout.String(), "1m0s", providedFlags["timeout"]), - "kafka-timeout": paramMeta(kafkaTimeout.String(), "10s", providedFlags["kafka-timeout"]), - "op-timeout": paramMeta(opTimeout.String(), "10s", providedFlags["op-timeout"]), - "produce-timeout": paramMeta(produceTimeoutVal.String(), "inherit op-timeout", providedFlags["produce-timeout"]), - "consume-timeout": paramMeta(consumeTimeoutVal.String(), "inherit op-timeout", providedFlags["consume-timeout"]), - "start-offset": paramMeta(*startOffset, "earliest", providedFlags["start-offset"]), - "balancer": paramMeta(*balancer, "least", providedFlags["balancer"]), - "preset": paramMeta(*preset, "", providedFlags["preset"]), - "diag": paramMeta(strconv.FormatBool(*diag), "true", providedFlags["diag"]), - "log": paramMeta(*logPath, "", providedFlags["log"]), - "y": paramMeta(strconv.FormatBool(*yes), "false", providedFlags["y"]), - "version": paramMeta(strconv.FormatBool(*showVersion), "false", providedFlags["version"]), + "props": paramMeta(*propsPath, "", providedFlags["props"]), + "topic": paramMeta(*topic, "", providedFlags["topic"]), + "group": paramMeta(*group, "", providedFlags["group"]), + "json": paramMeta(*jsonOut, "", providedFlags["json"]), + "analyze": paramMeta(strconv.FormatBool(*analyze), "false", providedFlags["analyze"]), + "no-ai": paramMeta(strconv.FormatBool(*noAI), "false", providedFlags["no-ai"]), + "provider": paramMeta(*provider, "", providedFlags["provider"]), + "timeout": paramMeta(timeout.String(), "1m0s", providedFlags["timeout"]), + "kafka-timeout": paramMeta(kafkaTimeout.String(), "10s", providedFlags["kafka-timeout"]), + "op-timeout": paramMeta(opTimeout.String(), "10s", providedFlags["op-timeout"]), + "produce-timeout": paramMeta(produceTimeoutVal.String(), "inherit op-timeout", providedFlags["produce-timeout"]), + "consume-timeout": paramMeta(consumeTimeoutVal.String(), "inherit op-timeout", providedFlags["consume-timeout"]), + "start-offset": paramMeta(*startOffset, "earliest", providedFlags["start-offset"]), + "balancer": paramMeta(*balancer, "least", providedFlags["balancer"]), + "preset": paramMeta(*preset, "", providedFlags["preset"]), + "diag": paramMeta(strconv.FormatBool(*diag), "true", providedFlags["diag"]), + "log": paramMeta(*logPath, "", providedFlags["log"]), + "y": paramMeta(strconv.FormatBool(*yes), "false", providedFlags["y"]), + "version": paramMeta(strconv.FormatBool(*showVersion), "false", providedFlags["version"]), }, }