diff --git a/go.mod b/go.mod index 983383d4e..8eabe7f05 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/container-storage-interface/spec v1.9.0 github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444 github.com/go-logr/logr v1.4.1 - github.com/kubernetes-csi/csi-lib-utils v0.17.0 + github.com/kubernetes-csi/csi-lib-utils v0.18.0 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.33.1 github.com/robfig/cron/v3 v3.0.1 diff --git a/go.sum b/go.sum index 5f17fd0eb..9dfa61724 100644 --- a/go.sum +++ b/go.sum @@ -94,8 +94,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubernetes-csi/csi-lib-utils v0.17.0 h1:xEpJ3WYgMyyYF6fvcKHh4cDRtknuTkBS9rG8bYoLTCU= -github.com/kubernetes-csi/csi-lib-utils v0.17.0/go.mod h1:2Ba5/aQgUjbpqyC2uCcFwMF3rnPVs5jhZXm8jAzcT9Q= +github.com/kubernetes-csi/csi-lib-utils v0.18.0 h1:Tpt1qLIbmpz5ux1hllut/dEWww2VRxdvSSOF4gGwhnA= +github.com/kubernetes-csi/csi-lib-utils v0.18.0/go.mod h1:FEQIcHcZmXZKWKTg18dJbXCHvgtCjnH7/uM0trmZyhU= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= diff --git a/sidecar/internal/client/client.go b/sidecar/internal/client/client.go index d74553d27..69a1b5ec8 100644 --- a/sidecar/internal/client/client.go +++ b/sidecar/internal/client/client.go @@ -51,14 +51,14 @@ type clientImpl struct { } // Connect to the GRPC client -func (c *clientImpl) connect(address string) (*grpc.ClientConn, error) { - return connection.Connect(address, metrics.NewCSIMetricsManager(""), connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) +func (c *clientImpl) connect(ctx context.Context, address string) (*grpc.ClientConn, error) { + return connection.Connect(ctx, address, metrics.NewCSIMetricsManager(""), connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) } // New creates and returns the GRPC client -func New(address string, timeout time.Duration) (Client, error) { +func New(ctx context.Context, address string, timeout time.Duration) (Client, error) { c := &clientImpl{} - cc, err := c.connect(address) + cc, err := c.connect(ctx, address) if err != nil { return nil, err } diff --git a/sidecar/main.go b/sidecar/main.go index 620442dbf..260f44840 100644 --- a/sidecar/main.go +++ b/sidecar/main.go @@ -75,7 +75,7 @@ func main() { klog.Fatalf("Failed to validate controller endpoint: %v", err) } - csiClient, err := client.New(*csiAddonsAddress, *timeout) + csiClient, err := client.New(context.Background(), *csiAddonsAddress, *timeout) if err != nil { klog.Fatalf("Failed to connect to %q : %v", *csiAddonsAddress, err) } diff --git a/tools/go.mod b/tools/go.mod index a665bdb2c..96528e394 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -3,7 +3,7 @@ module github.com/csi-addons/kubernetes-csi-addons/tools go 1.22.2 require ( - github.com/operator-framework/operator-sdk v1.34.1 + github.com/operator-framework/operator-sdk v1.34.2 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 google.golang.org/protobuf v1.34.1 sigs.k8s.io/controller-runtime/tools/setup-envtest v0.0.0-20240102165319-7f316f1309b1 diff --git a/tools/go.sum b/tools/go.sum index f500e6ff8..cc6b19917 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -441,8 +441,8 @@ github.com/operator-framework/operator-manifest-tools v0.2.3-0.20230525225330-52 github.com/operator-framework/operator-manifest-tools v0.2.3-0.20230525225330-523bad646f89/go.mod h1:PT1D+dEbD9TCoo62TkRP4BHYXA+h+zC0i3Ql7WZW9os= github.com/operator-framework/operator-registry v1.35.0 h1:BvytqLwhgb0QiAkEODEKXq3vc2vWiHQq0IlofvFA+OI= github.com/operator-framework/operator-registry v1.35.0/go.mod h1:foC+NO1V9JuDIOk3pjjlrPE0KVkq09m8oDVRz/a/nFA= -github.com/operator-framework/operator-sdk v1.34.1 h1:grKzW8v+LfQYX0eqd7NDvZeKE5ZTIfWbaeJ6YsGC/Wo= -github.com/operator-framework/operator-sdk v1.34.1/go.mod h1:2zrCdmaGoh0lMz0n4g9Qk8djD5+9yRVPU82lYIHWga0= +github.com/operator-framework/operator-sdk v1.34.2 h1:chRaTC8CNxo6Q63f+mBMjP5XTUxhnaftESUkxZHiYhg= +github.com/operator-framework/operator-sdk v1.34.2/go.mod h1:2zrCdmaGoh0lMz0n4g9Qk8djD5+9yRVPU82lYIHWga0= github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU= github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w= github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= diff --git a/tools/vendor/modules.txt b/tools/vendor/modules.txt index bc9ecd7b6..ee8e1c850 100644 --- a/tools/vendor/modules.txt +++ b/tools/vendor/modules.txt @@ -732,7 +732,7 @@ github.com/operator-framework/operator-registry/pkg/prettyunmarshaler github.com/operator-framework/operator-registry/pkg/registry github.com/operator-framework/operator-registry/pkg/sqlite github.com/operator-framework/operator-registry/pkg/sqlite/migrations -# github.com/operator-framework/operator-sdk v1.34.1 +# github.com/operator-framework/operator-sdk v1.34.2 ## explicit; go 1.21 github.com/operator-framework/operator-sdk/cmd/operator-sdk github.com/operator-framework/operator-sdk/internal/annotations/metrics diff --git a/vendor/github.com/kubernetes-csi/csi-lib-utils/connection/connection.go b/vendor/github.com/kubernetes-csi/csi-lib-utils/connection/connection.go index ee0388934..3efc871b5 100644 --- a/vendor/github.com/kubernetes-csi/csi-lib-utils/connection/connection.go +++ b/vendor/github.com/kubernetes-csi/csi-lib-utils/connection/connection.go @@ -29,6 +29,8 @@ import ( "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials/insecure" "k8s.io/klog/v2" ) @@ -73,21 +75,21 @@ func SetMaxGRPCLogLength(characterCount int) { // // For other connections, the default behavior from gRPC is used and // loss of connection is not detected reliably. -func Connect(address string, metricsManager metrics.CSIMetricsManager, options ...Option) (*grpc.ClientConn, error) { +func Connect(ctx context.Context, address string, metricsManager metrics.CSIMetricsManager, options ...Option) (*grpc.ClientConn, error) { // Prepend default options options = append([]Option{WithTimeout(time.Second * 30)}, options...) if metricsManager != nil { options = append([]Option{WithMetrics(metricsManager)}, options...) } - return connect(address, options) + return connect(ctx, address, options) } // ConnectWithoutMetrics behaves exactly like Connect except no metrics are recorded. // This function is deprecated, prefer using Connect with `nil` as the metricsManager. -func ConnectWithoutMetrics(address string, options ...Option) (*grpc.ClientConn, error) { +func ConnectWithoutMetrics(ctx context.Context, address string, options ...Option) (*grpc.ClientConn, error) { // Prepend default options options = append([]Option{WithTimeout(time.Second * 30)}, options...) - return connect(address, options) + return connect(ctx, address, options) } // Option is the type of all optional parameters for Connect. @@ -97,7 +99,7 @@ type Option func(o *options) // connection got lost. If that callback returns true, the connection // is reestablished. Otherwise the connection is left as it is and // all future gRPC calls using it will fail with status.Unavailable. -func OnConnectionLoss(reconnect func() bool) Option { +func OnConnectionLoss(reconnect func(context.Context) bool) Option { return func(o *options) { o.reconnect = reconnect } @@ -105,19 +107,25 @@ func OnConnectionLoss(reconnect func() bool) Option { // ExitOnConnectionLoss returns callback for OnConnectionLoss() that writes // an error to /dev/termination-log and exits. -func ExitOnConnectionLoss() func() bool { - return func() bool { +func ExitOnConnectionLoss() func(context.Context) bool { + return func(ctx context.Context) bool { terminationMsg := "Lost connection to CSI driver, exiting" if err := os.WriteFile(terminationLogPath, []byte(terminationMsg), 0644); err != nil { - klog.Errorf("%s: %s", terminationLogPath, err) + klog.FromContext(ctx).Error(err, "Failed to write a message to the termination logfile", "terminationLogPath", terminationLogPath) } - klog.Exit(terminationMsg) + klog.FromContext(ctx).Error(nil, terminationMsg) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) // Not reached. return false } } // WithTimeout adds a configurable timeout on the gRPC calls. +// Note that this timeout also prevents all attempts to reconnect +// because it uses context.WithTimeout internally. +// +// For more details, see https://github.com/grpc/grpc-go/issues/133 +// and https://github.com/kubernetes-csi/csi-lib-utils/pull/149#discussion_r1574707477 func WithTimeout(timeout time.Duration) Option { return func(o *options) { o.timeout = timeout @@ -139,7 +147,7 @@ func WithOtelTracing() Option { } type options struct { - reconnect func() bool + reconnect func(context.Context) bool timeout time.Duration metricsManager metrics.CSIMetricsManager enableOtelTracing bool @@ -147,22 +155,28 @@ type options struct { // connect is the internal implementation of Connect. It has more options to enable testing. func connect( + ctx context.Context, address string, connectOptions []Option) (*grpc.ClientConn, error) { + logger := klog.FromContext(ctx) var o options for _, option := range connectOptions { option(&o) } + bc := backoff.DefaultConfig + bc.MaxDelay = time.Second dialOptions := []grpc.DialOption{ - grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container. - grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure. + grpc.WithTransportCredentials(insecure.NewCredentials()), // Don't use TLS, it's usually local Unix domain socket in a container. + grpc.WithConnectParams(grpc.ConnectParams{Backoff: bc}), // Retry every second after failure. grpc.WithBlock(), // Block until connection succeeds. grpc.WithIdleTimeout(time.Duration(0)), // Never close connection because of inactivity. } if o.timeout > 0 { - dialOptions = append(dialOptions, grpc.WithTimeout(o.timeout)) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, o.timeout) + defer cancel() } interceptors := []grpc.UnaryClientInterceptor{LogGRPC} @@ -186,20 +200,25 @@ func connect( lostConnection := false reconnect := true - dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + dialOptions = append(dialOptions, grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + logger := klog.FromContext(ctx) if haveConnected && !lostConnection { // We have detected a loss of connection for the first time. Decide what to do... // Record this once. TODO (?): log at regular time intervals. - klog.Errorf("Lost connection to %s.", address) + logger.Error(nil, "Lost connection", "address", address) // Inform caller and let it decide? Default is to reconnect. if o.reconnect != nil { - reconnect = o.reconnect() + reconnect = o.reconnect(ctx) } lostConnection = true } if !reconnect { return nil, errors.New("connection lost, reconnecting disabled") } + var timeout time.Duration + if deadline, ok := ctx.Deadline(); ok { + timeout = time.Until(deadline) + } conn, err := net.DialTimeout("unix", address[len(unixPrefix):], timeout) if err == nil { // Connection reestablished. @@ -212,14 +231,14 @@ func connect( return nil, errors.New("OnConnectionLoss callback only supported for unix:// addresses") } - klog.V(5).Infof("Connecting to %s", address) + logger.V(5).Info("Connecting", "address", address) // Connect in background. var conn *grpc.ClientConn var err error ready := make(chan bool) go func() { - conn, err = grpc.Dial(address, dialOptions...) + conn, err = grpc.DialContext(ctx, address, dialOptions...) close(ready) }() @@ -231,7 +250,7 @@ func connect( for { select { case <-ticker.C: - klog.Warningf("Still connecting to %s", address) + logger.Info("Still connecting", "address", address) case <-ready: return conn, err @@ -241,15 +260,14 @@ func connect( // LogGRPC is gPRC unary interceptor for logging of CSI messages at level 5. It removes any secrets from the message. func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - klog.V(5).Infof("GRPC call: %s", method) - klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req)) + logger := klog.FromContext(ctx) + logger.V(5).Info("GRPC call", "method", method, "request", protosanitizer.StripSecrets(req)) err := invoker(ctx, method, req, reply, cc, opts...) cappedStr := protosanitizer.StripSecrets(reply).String() if maxLogChar > 0 && len(cappedStr) > maxLogChar { cappedStr = cappedStr[:maxLogChar] + fmt.Sprintf(" [response body too large, log capped to %d chars]", maxLogChar) } - klog.V(5).Infof("GRPC response: %s", cappedStr) - klog.V(5).Infof("GRPC error: %v", err) + logger.V(5).Info("GRPC response", "response", cappedStr, "err", err) return err } @@ -286,14 +304,14 @@ func (cmm ExtendedCSIMetricsManager) RecordMetricsClientInterceptor( if additionalInfo != nil { additionalInfoVal, ok := additionalInfo.(AdditionalInfo) if !ok { - klog.Errorf("Failed to record migrated status, cannot convert additional info %v", additionalInfo) + klog.FromContext(ctx).Error(nil, "Failed to record migrated status, cannot convert additional info", "additionalInfo", additionalInfo) return err } migrated = additionalInfoVal.Migrated } cmmv, metricsErr := cmm.WithLabelValues(map[string]string{metrics.LabelMigrated: migrated}) if metricsErr != nil { - klog.Errorf("Failed to record migrated status, error: %v", metricsErr) + klog.FromContext(ctx).Error(metricsErr, "Failed to record migrated status") } else { cmmBase = cmmv } diff --git a/vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go b/vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go index 8f5b1b99a..ea2b4f13b 100644 --- a/vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go +++ b/vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go @@ -25,7 +25,7 @@ import ( "strings" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -153,9 +153,15 @@ func (l *leaderElection) Run() error { l.namespace = inClusterNamespace() } - broadcaster := record.NewBroadcaster() + ctx := l.ctx + if ctx == nil { + ctx = context.Background() + } + logger := klog.FromContext(ctx) + + broadcaster := record.NewBroadcaster(record.WithContext(ctx)) broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)}) - eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s/%s", l.lockName, string(l.identity))}) + eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s/%s", l.lockName, string(l.identity))}).WithLogger(logger) rlConfig := resourcelock.ResourceLockConfig{ Identity: sanitizeName(l.identity), @@ -174,23 +180,21 @@ func (l *leaderElection) Run() error { RetryPeriod: l.retryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { - klog.V(2).Info("became leader, starting") + logger := klog.FromContext(ctx) + logger.V(2).Info("became leader, starting") l.runFunc(ctx) }, OnStoppedLeading: func() { - klog.Fatal("stopped leading") + logger.Error(nil, "Stopped leading") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) }, OnNewLeader: func(identity string) { - klog.V(3).Infof("new leader detected, current leader: %s", identity) + logger.V(3).Info("New leader detected", "leader", identity) }, }, WatchDog: l.healthCheck, } - ctx := l.ctx - if ctx == nil { - ctx = context.Background() - } leaderelection.RunOrDie(ctx, leaderConfig) return nil // should never reach here } diff --git a/vendor/modules.txt b/vendor/modules.txt index fab7044d6..9be74bf4f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -102,8 +102,8 @@ github.com/josharian/intern # github.com/json-iterator/go v1.1.12 ## explicit; go 1.12 github.com/json-iterator/go -# github.com/kubernetes-csi/csi-lib-utils v0.17.0 -## explicit; go 1.21 +# github.com/kubernetes-csi/csi-lib-utils v0.18.0 +## explicit; go 1.22.0 github.com/kubernetes-csi/csi-lib-utils/accessmodes github.com/kubernetes-csi/csi-lib-utils/connection github.com/kubernetes-csi/csi-lib-utils/leaderelection