Skip to content

Commit 10f64d2

Browse files
committed
refactor: use common sidecar flags functionality
1 parent 3a057e7 commit 10f64d2

File tree

1 file changed

+25
-80
lines changed

1 file changed

+25
-80
lines changed

cmd/csi-attacher/main.go

Lines changed: 25 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ import (
3131
utilfeature "k8s.io/apiserver/pkg/util/feature"
3232
"k8s.io/client-go/informers"
3333
"k8s.io/client-go/kubernetes"
34-
"k8s.io/client-go/rest"
35-
"k8s.io/client-go/tools/clientcmd"
3634
"k8s.io/client-go/util/workqueue"
3735
utilflag "k8s.io/component-base/cli/flag"
3836
"k8s.io/component-base/featuregate"
@@ -48,6 +46,7 @@ import (
4846
"github.com/container-storage-interface/spec/lib/go/csi"
4947
"github.com/kubernetes-csi/csi-lib-utils/connection"
5048
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
49+
libconfig "github.com/kubernetes-csi/csi-lib-utils/config"
5150
"github.com/kubernetes-csi/csi-lib-utils/metrics"
5251
"github.com/kubernetes-csi/csi-lib-utils/rpc"
5352
"github.com/kubernetes-csi/csi-lib-utils/standardflags"
@@ -65,34 +64,15 @@ const (
6564

6665
// Command line flags
6766
var (
68-
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
6967
resync = flag.Duration("resync", 10*time.Minute, "Resync interval of the controller.")
70-
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
71-
showVersion = flag.Bool("version", false, "Show version.")
7268
timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.")
7369
workerThreads = flag.Uint("worker-threads", 10, "Number of attacher worker threads")
7470
maxEntries = flag.Int("max-entries", 0, "Max entries per each page in volume lister call, 0 means no limit.")
7571

76-
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed create volume or deletion. It doubles with each failure, up to retry-interval-max.")
77-
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed create volume or deletion.")
78-
79-
enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.")
80-
leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.")
81-
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.")
82-
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.")
83-
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.")
84-
8572
defaultFSType = flag.String("default-fstype", "", "The default filesystem type of the volume to publish. Defaults to empty string")
8673

8774
reconcileSync = flag.Duration("reconcile-sync", 1*time.Minute, "Resync interval of the VolumeAttachment reconciler.")
8875

89-
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
90-
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
91-
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
92-
93-
kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
94-
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")
95-
9676
maxGRPCLogLength = flag.Int("max-grpc-log-length", -1, "The maximum amount of characters logged for every grpc responses. Defaults to no limit")
9777

9878
featureGates map[string]bool
@@ -111,6 +91,7 @@ func main() {
11191
c := logsapi.NewLoggingConfiguration()
11292
logsapi.AddGoFlags(c, flag.CommandLine)
11393
logs.InitLogs()
94+
standardflags.RegisterCommonFlags(flag.CommandLine)
11495
standardflags.AddAutomaxprocs(klog.Infof)
11596
flag.Parse()
11697
logger := klog.Background()
@@ -124,29 +105,27 @@ func main() {
124105
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
125106
}
126107

127-
if *showVersion {
108+
if standardflags.Configuration.ShowVersion {
128109
fmt.Println(os.Args[0], version)
129110
return
130111
}
131112
logger.Info("Version", "version", version)
132113

133-
if *metricsAddress != "" && *httpEndpoint != "" {
114+
if standardflags.Configuration.MetricsAddress != "" && standardflags.Configuration.HttpEndpoint != "" {
134115
logger.Error(nil, "Only one of `--metrics-address` and `--http-endpoint` can be set")
135116
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
136117
}
137-
addr := *metricsAddress
118+
addr := standardflags.Configuration.MetricsAddress
138119
if addr == "" {
139-
addr = *httpEndpoint
120+
addr = standardflags.Configuration.HttpEndpoint
140121
}
141122

142123
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
143-
config, err := buildConfig(*kubeconfig)
124+
config, err := libconfig.BuildConfig(standardflags.Configuration.KubeConfig, standardflags.Configuration)
144125
if err != nil {
145126
logger.Error(err, "Failed to build a Kubernetes config")
146127
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
147128
}
148-
config.QPS = (float32)(*kubeAPIQPS)
149-
config.Burst = *kubeAPIBurst
150129
config.ContentType = runtime.ContentTypeProtobuf
151130

152131
if *workerThreads == 0 {
@@ -167,9 +146,9 @@ func main() {
167146
// Connect to CSI.
168147
connection.SetMaxGRPCLogLength(*maxGRPCLogLength)
169148
ctx := context.Background()
170-
csiConn, err := connection.Connect(ctx, *csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
149+
csiConn, err := connection.Connect(ctx, standardflags.Configuration.CSIAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
171150
if err != nil {
172-
logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress)
151+
logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", standardflags.Configuration.CSIAddress)
173152
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
174153
}
175154

@@ -194,9 +173,9 @@ func main() {
194173
translator := csitrans.New()
195174
if translator.IsMigratedCSIDriverByName(csiAttacher) {
196175
metricsManager = metrics.NewCSIMetricsManagerWithOptions(csiAttacher, metrics.WithMigration())
197-
migratedCsiClient, err := connection.Connect(ctx, *csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
176+
migratedCsiClient, err := connection.Connect(ctx, standardflags.Configuration.CSIAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
198177
if err != nil {
199-
logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress, "migrated", true)
178+
logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", standardflags.Configuration.CSIAddress, "migrated", true)
200179
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
201180
}
202181
csiConn.Close()
@@ -216,13 +195,13 @@ func main() {
216195
// Prepare http endpoint for metrics + leader election healthz
217196
mux := http.NewServeMux()
218197
if addr != "" {
219-
metricsManager.RegisterToServer(mux, *metricsPath)
198+
metricsManager.RegisterToServer(mux, standardflags.Configuration.MetricsPath)
220199
metricsManager.SetDriverName(csiAttacher)
221200
go func() {
222-
logger.Info("ServeMux listening", "address", addr, "metricsPath", *metricsPath)
201+
logger.Info("ServeMux listening", "address", addr, "metricsPath", standardflags.Configuration.MetricsPath)
223202
err := http.ListenAndServe(addr, mux)
224203
if err != nil {
225-
logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "metricsPath", *metricsPath)
204+
logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "metricsPath", standardflags.Configuration.MetricsPath)
226205
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
227206
}
228207
}()
@@ -291,8 +270,8 @@ func main() {
291270
handler,
292271
factory.Storage().V1().VolumeAttachments(),
293272
factory.Core().V1().PersistentVolumes(),
294-
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
295-
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
273+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax),
274+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax),
296275
supportsListVolumesPublishedNodes,
297276
*reconcileSync,
298277
)
@@ -332,49 +311,15 @@ func main() {
332311
}
333312
}
334313

335-
if !*enableLeaderElection {
336-
run(klog.NewContext(context.Background(), logger))
337-
} else {
338-
// Create a new clientset for leader election. When the attacher
339-
// gets busy and its client gets throttled, the leader election
340-
// can proceed without issues.
341-
leClientset, err := kubernetes.NewForConfig(config)
342-
if err != nil {
343-
logger.Error(err, "Failed to create leaderelection client")
344-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
345-
}
346-
347-
// Name of config map with leader election lock
348-
lockName := "external-attacher-leader-" + csiAttacher
349-
le := leaderelection.NewLeaderElection(leClientset, lockName, run)
350-
if *httpEndpoint != "" {
351-
le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)
352-
}
353-
354-
if *leaderElectionNamespace != "" {
355-
le.WithNamespace(*leaderElectionNamespace)
356-
}
357-
358-
le.WithLeaseDuration(*leaderElectionLeaseDuration)
359-
le.WithRenewDeadline(*leaderElectionRenewDeadline)
360-
le.WithRetryPeriod(*leaderElectionRetryPeriod)
361-
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
362-
le.WithReleaseOnCancel(true)
363-
le.WithContext(ctx)
364-
}
365-
366-
if err := le.Run(); err != nil {
367-
logger.Error(err, "Failed to initialize leader election")
368-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
369-
}
370-
}
371-
}
372-
373-
func buildConfig(kubeconfig string) (*rest.Config, error) {
374-
if kubeconfig != "" {
375-
return clientcmd.BuildConfigFromFlags("", kubeconfig)
376-
}
377-
return rest.InClusterConfig()
314+
leaderelection.RunWithLeaderElection(
315+
ctx,
316+
config,
317+
standardflags.Configuration,
318+
run,
319+
"external-attacher-leader-" + csiAttacher,
320+
mux,
321+
utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit),
322+
)
378323
}
379324

380325
func supportsControllerCapabilities(ctx context.Context, csiConn *grpc.ClientConn) (bool, bool, bool, bool, error) {

0 commit comments

Comments
 (0)