diff --git a/build/package/peerd-helm/templates/_helpers.tpl b/build/package/peerd-helm/templates/_helpers.tpl index 61d3ad8..f7fd591 100644 --- a/build/package/peerd-helm/templates/_helpers.tpl +++ b/build/package/peerd-helm/templates/_helpers.tpl @@ -6,8 +6,12 @@ Expand the name of the chart. {{- end }} {{- define "peerd.namespace" -}} +{{- if .Values.peerd.namespace.k8s }} +{{- .Values.peerd.namespace.k8s }} +{{- else }} {{ include "peerd.name" . }}-ns {{- end }} +{{- end }} {{- define "peerd.serviceAccountName" -}} {{ include "peerd.name" . }}-sa diff --git a/build/package/peerd-helm/templates/peerd-ds.yml b/build/package/peerd-helm/templates/peerd-ds.yml index 899b7b8..10ceb2a 100644 --- a/build/package/peerd-helm/templates/peerd-ds.yml +++ b/build/package/peerd-helm/templates/peerd-ds.yml @@ -25,9 +25,10 @@ spec: - image: "{{ .Values.peerd.image.ref }}" imagePullPolicy: "{{ .Values.peerd.image.pullPolicy }}" args: - - "--log-level=debug" + - "--log-level={{ .Values.peerd.logLevel }}" - "run" - "--http-addr=0.0.0.0:5000" + - "--k8s-namespace={{ .Values.peerd.namespace.k8s }}" - "--add-mirror-configuration={{ .Values.peerd.configureMirrors }}" {{- with .Values.peerd.hosts }} - --hosts diff --git a/build/package/peerd-helm/values.yaml b/build/package/peerd-helm/values.yaml index 28a5d1c..50275bd 100644 --- a/build/package/peerd-helm/values.yaml +++ b/build/package/peerd-helm/values.yaml @@ -3,6 +3,8 @@ peerd: ref: ghcr.io/azure/acr/dev/peerd:stable pullPolicy: IfNotPresent + logLevel: debug + # Whether to add configuration for mirrors specified below to /etc/containerd/certs.d. # https://github.com/containerd/containerd/blob/main/docs/hosts.md#registry-configuration---examples configureMirrors: true @@ -25,5 +27,9 @@ peerd: requests: cpu: "10m" + namespace: + # The kubernetes namespace for pod deployment and leader election. + k8s: peerd-ns + # Uncomment to add tolerations. # tolerations: diff --git a/cmd/proxy/cmd.go b/cmd/proxy/cmd.go index c0e93b6..e387140 100644 --- a/cmd/proxy/cmd.go +++ b/cmd/proxy/cmd.go @@ -9,6 +9,9 @@ type ServerCmd struct { PromAddr string `arg:"--prom-addr" help:"address of prometheus metrics endpoint" default:"0.0.0.0:5004"` PrefetchWorkers int `arg:"--prefetch-workers" help:"number of workers to prefetch content" default:"50"` + // Leader election namespace. + K8sNamespace string `arg:"--k8s-namespace" help:"namespace for leader election" default:"peerd-ns"` + // Mirror configuration. Hosts []string `arg:"--hosts" help:"list of hosts to mirror"` AddMirrorConfiguration bool `arg:"--add-mirror-configuration" help:"add mirror configuration to containerd host configuration" default:"false"` diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index f23df17..88c323f 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -85,7 +85,7 @@ func serverCommand(ctx context.Context, args *ServerCmd) (err error) { return err } - ctx, err = events.WithContext(ctx, clientset) + ctx, err = events.WithContext(ctx, clientset, args.K8sNamespace) if err != nil { return err } @@ -98,7 +98,7 @@ func serverCommand(ctx context.Context, args *ServerCmd) (err error) { eventsRecorder.Initializing() - r, err := routing.NewRouter(ctx, clientset, args.RouterAddr, httpsPort) + r, err := routing.NewRouter(ctx, clientset, args.RouterAddr, httpsPort, args.K8sNamespace) if err != nil { return err } diff --git a/internal/context/context.go b/internal/context/context.go index e1ffdec..bcbae63 100644 --- a/internal/context/context.go +++ b/internal/context/context.go @@ -59,7 +59,6 @@ const ( var ( NodeName, _ = os.Hostname() - Namespace = "peerd-ns" // KubeConfigPath is the path of the kubeconfig file. KubeConfigPath = "/opt/peerd/kubeconfig" diff --git a/internal/k8s/events/events.go b/internal/k8s/events/events.go index d17f66d..0feef34 100644 --- a/internal/k8s/events/events.go +++ b/internal/k8s/events/events.go @@ -21,7 +21,7 @@ var ( ) // NewRecorder creates a new event recorder. -func NewRecorder(ctx context.Context, k *k8s.ClientSet) (EventRecorder, error) { +func NewRecorder(ctx context.Context, k *k8s.ClientSet, k8sNamespace string) (EventRecorder, error) { clientset := k.Interface inPod := k.InPod @@ -38,7 +38,7 @@ func NewRecorder(ctx context.Context, k *k8s.ClientSet) (EventRecorder, error) { APIVersion: node.APIVersion, } } else { - pod, err := clientset.CoreV1().Pods(p2pcontext.Namespace).Get(ctx, p2pcontext.NodeName, metav1.GetOptions{}) + pod, err := clientset.CoreV1().Pods(k8sNamespace).Get(ctx, p2pcontext.NodeName, metav1.GetOptions{}) if err != nil { return nil, err } @@ -53,7 +53,7 @@ func NewRecorder(ctx context.Context, k *k8s.ClientSet) (EventRecorder, error) { broadcaster := record.NewBroadcaster() broadcaster.StartStructuredLogging(4) - broadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: clientset.CoreV1().Events(p2pcontext.Namespace)}) + broadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: clientset.CoreV1().Events(k8sNamespace)}) return &eventRecorder{ recorder: broadcaster.NewRecorder( @@ -65,8 +65,8 @@ func NewRecorder(ctx context.Context, k *k8s.ClientSet) (EventRecorder, error) { } // WithContext returns a new context with an event recorder. -func WithContext(ctx context.Context, clientset *k8s.ClientSet) (context.Context, error) { - er, err := NewRecorder(ctx, clientset) +func WithContext(ctx context.Context, clientset *k8s.ClientSet, k8sNamespace string) (context.Context, error) { + er, err := NewRecorder(ctx, clientset, k8sNamespace) if err != nil { return nil, err } diff --git a/internal/k8s/events/events_test.go b/internal/k8s/events/events_test.go index 6c79660..7a43290 100644 --- a/internal/k8s/events/events_test.go +++ b/internal/k8s/events/events_test.go @@ -16,11 +16,12 @@ import ( ) func TestWithContext(t *testing.T) { + ns := "test-ns" fcs := fake.NewSimpleClientset([]runtime.Object{ &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: p2pcontext.NodeName, - Namespace: p2pcontext.Namespace, + Namespace: ns, UID: "test-uid", }, }, @@ -28,7 +29,7 @@ func TestWithContext(t *testing.T) { cs := &k8s.ClientSet{Interface: fcs, InPod: true} - ctx, err := WithContext(context.Background(), cs) + ctx, err := WithContext(context.Background(), cs, ns) if err != nil { t.Fatal(err) } @@ -44,6 +45,8 @@ func TestWithContext(t *testing.T) { } func TestNewRecorderInNode(t *testing.T) { + ns := "test-ns" + fcs := fake.NewSimpleClientset([]runtime.Object{ &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -55,7 +58,7 @@ func TestNewRecorderInNode(t *testing.T) { cs := &k8s.ClientSet{Interface: fcs, InPod: false} - r, err := NewRecorder(context.Background(), cs) + r, err := NewRecorder(context.Background(), cs, ns) if err != nil { t.Fatal(err) } @@ -77,11 +80,13 @@ func TestNewRecorderInNode(t *testing.T) { } func TestNewRecorderInPod(t *testing.T) { + ns := "test-ns" + fcs := fake.NewSimpleClientset([]runtime.Object{ &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: p2pcontext.NodeName, - Namespace: p2pcontext.Namespace, + Namespace: ns, UID: "test-uid", }, }, @@ -89,7 +94,7 @@ func TestNewRecorderInPod(t *testing.T) { cs := &k8s.ClientSet{Interface: fcs, InPod: true} - r, err := NewRecorder(context.Background(), cs) + r, err := NewRecorder(context.Background(), cs, ns) if err != nil { t.Fatal(err) } @@ -105,8 +110,8 @@ func TestNewRecorderInPod(t *testing.T) { if er.objRef.Name != p2pcontext.NodeName { t.Errorf("expected name to be %s, got %s", p2pcontext.NodeName, er.objRef.Name) } - if er.objRef.Namespace != p2pcontext.Namespace { - t.Errorf("expected namespace to be %s, got %s", p2pcontext.Namespace, er.objRef.Namespace) + if er.objRef.Namespace != ns { + t.Errorf("expected namespace to be %s, got %s", ns, er.objRef.Namespace) } if er.objRef.UID != "test-uid" { t.Errorf("expected uid to be test-uid, got %s", er.objRef.UID) diff --git a/internal/routing/router.go b/internal/routing/router.go index df1cdbd..0325e85 100644 --- a/internal/routing/router.go +++ b/internal/routing/router.go @@ -27,12 +27,13 @@ import ( ) type router struct { - clientset *k8s.ClientSet - p2pnet peernet.Network - host host.Host - rd *routing.RoutingDiscovery - port string - lookupCache *ristretto.Cache + clientset *k8s.ClientSet + p2pnet peernet.Network + host host.Host + rd *routing.RoutingDiscovery + port string + lookupCache *ristretto.Cache + k8sNamespace string active atomic.Bool } @@ -44,7 +45,7 @@ type PeerNotFoundError struct { } // NewRouter creates a new router. -func NewRouter(ctx context.Context, clientset *k8s.ClientSet, routerAddr, serverPort string) (Router, error) { +func NewRouter(ctx context.Context, clientset *k8s.ClientSet, routerAddr, serverPort, k8sNamespace string) (Router, error) { log := zerolog.Ctx(ctx).With().Str("component", "router").Logger() h, p, err := net.SplitHostPort(routerAddr) @@ -82,7 +83,7 @@ func NewRouter(ctx context.Context, clientset *k8s.ClientSet, routerAddr, server self := fmt.Sprintf("%s/p2p/%s", host.Addrs()[0].String(), host.ID().String()) log.Info().Str("id", self).Msg("starting p2p router") - leaderElection := election.New(p2pcontext.Namespace, "peerd-leader-election", p2pcontext.KubeConfigPath) + leaderElection := election.New(k8sNamespace, "peerd-leader-election", p2pcontext.KubeConfigPath) err = leaderElection.RunOrDie(ctx, self) if err != nil { @@ -139,12 +140,13 @@ func NewRouter(ctx context.Context, clientset *k8s.ClientSet, routerAddr, server } return &router{ - clientset: clientset, - p2pnet: n, - host: host, - rd: rd, - port: serverPort, - lookupCache: c, + clientset: clientset, + p2pnet: n, + host: host, + rd: rd, + port: serverPort, + lookupCache: c, + k8sNamespace: k8sNamespace, }, nil } @@ -199,7 +201,7 @@ func (r *router) Resolve(ctx context.Context, key string, allowSelf bool, count peerCh <- PeerInfo{info.ID, fmt.Sprintf("https://%s:%s", v, r.port)} if r.active.CompareAndSwap(false, true) { - er, err := events.NewRecorder(ctx, r.clientset) + er, err := events.NewRecorder(ctx, r.clientset, r.k8sNamespace) if err != nil { log.Error().Err(err).Msg("could not create event recorder") } else {