Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let API subcommand accept the runtime config via stdin #5422

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 57 additions & 36 deletions cmd/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path"
Expand All @@ -41,68 +42,88 @@ import (

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
tokenutil "k8s.io/cluster-bootstrap/token/util"
bootstraptokenv1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/bootstraptoken/v1"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)

type command struct {
*config.CLIOptions
client kubernetes.Interface
}

func NewAPICmd() *cobra.Command {
cmd := &cobra.Command{
Use: "api",
Short: "Run the controller API",
Args: cobra.NoArgs,
Long: `Run the controller API.
Reads the runtime configuration from standard input.`,
Args: cobra.NoArgs,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
logrus.SetOutput(cmd.OutOrStdout())
internallog.SetInfoLevel()
return config.CallParentPersistentPreRun(cmd, args)
},
RunE: func(cmd *cobra.Command, _ []string) error {
opts, err := config.GetCmdOpts(cmd)
if err != nil {
var run func() error

if runtimeConfig, err := loadRuntimeConfig(cmd.InOrStdin()); err != nil {
return err
} else if run, err = buildServer(runtimeConfig.Spec.K0sVars, runtimeConfig.Spec.NodeConfig); err != nil {
return err
}
return (&command{CLIOptions: opts}).start()

return run()
},
}

cmd.Flags().AddFlagSet(config.GetPersistentFlagSet())
flags := cmd.Flags()
config.GetPersistentFlagSet().VisitAll(func(f *pflag.Flag) {
switch f.Name {
case "debug", "debugListenOn", "verbose":
flags.AddFlag(f)
}
})

return cmd
}

func (c *command) start() (err error) {
func loadRuntimeConfig(stdin io.Reader) (*config.RuntimeConfig, error) {
logrus.Info("Reading runtime configuration from standard input ...")
bytes, err := io.ReadAll(stdin)
if err != nil {
return nil, fmt.Errorf("failed to read from standard input: %w", err)
}

runtimeConfig, err := config.ParseRuntimeConfig(bytes)
if err != nil {
return nil, fmt.Errorf("failed to load runtime configuration: %w", err)
}

return runtimeConfig, nil
}

func buildServer(k0sVars *config.CfgVars, nodeConfig *v1beta1.ClusterConfig) (func() error, error) {
// Single kube client for whole lifetime of the API
c.client, err = kubeutil.NewClientFromFile(c.K0sVars.AdminKubeConfigPath)
client, err := kubeutil.NewClientFromFile(k0sVars.AdminKubeConfigPath)
if err != nil {
return err
return nil, err
}
secrets := client.CoreV1().Secrets("kube-system")

prefix := "/v1beta1"
mux := http.NewServeMux()
nodeConfig, err := c.K0sVars.NodeConfig()
if err != nil {
return err
}
storage := nodeConfig.Spec.Storage

if storage.Type == v1beta1.EtcdStorageType && !storage.Etcd.IsExternalClusterUsed() {
// Only mount the etcd handler if we're running on internal etcd storage
// by default the mux will return 404 back which the caller should handle
mux.Handle(prefix+"/etcd/members", mw.AllowMethods(http.MethodPost)(
c.authMiddleware(c.etcdHandler(), "controller-join")))
authMiddleware(etcdHandler(k0sVars.CertRootDir, k0sVars.EtcdCertDir), secrets, "controller-join")))
}

if storage.IsJoinable() {
mux.Handle(prefix+"/ca", mw.AllowMethods(http.MethodGet)(
c.authMiddleware(c.caHandler(), "controller-join")))
authMiddleware(caHandler(k0sVars.CertRootDir), secrets, "controller-join")))
}

srv := &http.Server{
Expand All @@ -116,13 +137,13 @@ func (c *command) start() (err error) {
ReadTimeout: 15 * time.Second,
}

return srv.ListenAndServeTLS(
filepath.Join(c.K0sVars.CertRootDir, "k0s-api.crt"),
filepath.Join(c.K0sVars.CertRootDir, "k0s-api.key"),
)
cert := filepath.Join(k0sVars.CertRootDir, "k0s-api.crt")
key := filepath.Join(k0sVars.CertRootDir, "k0s-api.key")

return func() error { return srv.ListenAndServeTLS(cert, key) }, nil
}

func (c *command) etcdHandler() http.Handler {
func etcdHandler(certRootDir, etcdCertDir string) http.Handler {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
ctx := req.Context()
var etcdReq v1beta1.EtcdRequest
Expand All @@ -138,7 +159,7 @@ func (c *command) etcdHandler() http.Handler {
return
}

etcdClient, err := etcd.NewClient(c.K0sVars.CertRootDir, c.K0sVars.EtcdCertDir, nil)
etcdClient, err := etcd.NewClient(certRootDir, etcdCertDir, nil)
if err != nil {
sendError(err, resp)
return
Expand All @@ -154,7 +175,7 @@ func (c *command) etcdHandler() http.Handler {
InitialCluster: memberList,
}

etcdCaCertPath, etcdCaCertKey := filepath.Join(c.K0sVars.EtcdCertDir, "ca.crt"), filepath.Join(c.K0sVars.EtcdCertDir, "ca.key")
etcdCaCertPath, etcdCaCertKey := filepath.Join(etcdCertDir, "ca.crt"), filepath.Join(etcdCertDir, "ca.key")
etcdCACert, err := os.ReadFile(etcdCaCertPath)
if err != nil {
sendError(err, resp)
Expand All @@ -178,30 +199,30 @@ func (c *command) etcdHandler() http.Handler {
})
}

func (c *command) caHandler() http.Handler {
func caHandler(certRootDir string) http.Handler {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
caResp := v1beta1.CaResponse{}
key, err := os.ReadFile(path.Join(c.K0sVars.CertRootDir, "ca.key"))
key, err := os.ReadFile(path.Join(certRootDir, "ca.key"))
if err != nil {
sendError(err, resp)
return
}
caResp.Key = key
crt, err := os.ReadFile(path.Join(c.K0sVars.CertRootDir, "ca.crt"))
crt, err := os.ReadFile(path.Join(certRootDir, "ca.crt"))
if err != nil {
sendError(err, resp)
return
}
caResp.Cert = crt

saKey, err := os.ReadFile(path.Join(c.K0sVars.CertRootDir, "sa.key"))
saKey, err := os.ReadFile(path.Join(certRootDir, "sa.key"))
if err != nil {
sendError(err, resp)
return
}
caResp.SAKey = saKey

saPub, err := os.ReadFile(path.Join(c.K0sVars.CertRootDir, "sa.pub"))
saPub, err := os.ReadFile(path.Join(certRootDir, "sa.pub"))
if err != nil {
sendError(err, resp)
return
Expand All @@ -223,14 +244,14 @@ func (c *command) caHandler() http.Handler {
// We need to validate:
// - that we find a secret with the ID
// - that the token matches whats inside the secret
func (c *command) isValidToken(ctx context.Context, rawTokenString string, usage string) bool {
func isValidToken(ctx context.Context, secrets clientcorev1.SecretInterface, rawTokenString, usage string) bool {
tokenString, err := bootstraptokenv1.NewBootstrapTokenString(rawTokenString)
if err != nil {
return false
}

secretName := tokenutil.BootstrapTokenSecretName(tokenString.ID)
secret, err := c.client.CoreV1().Secrets("kube-system").Get(ctx, secretName, metav1.GetOptions{})
secret, err := secrets.Get(ctx, secretName, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
logrus.WithError(err).Error("Failed to get bootstrap token with ID ", tokenString.ID)
Expand Down Expand Up @@ -262,12 +283,12 @@ func (c *command) isValidToken(ctx context.Context, rawTokenString string, usage
}
}

func (c *command) authMiddleware(next http.Handler, usage string) http.Handler {
func authMiddleware(next http.Handler, secrets clientcorev1.SecretInterface, usage string) http.Handler {
unauthorizedErr := errors.New("go away")

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
token, ok := strings.CutPrefix(r.Header.Get("Authorization"), "Bearer ")
if ok && c.isValidToken(r.Context(), token, usage) {
if ok && isValidToken(r.Context(), secrets, token, usage) {
next.ServeHTTP(w, r)
} else {
sendError(unauthorizedErr, w, http.StatusUnauthorized)
Expand Down
7 changes: 2 additions & 5 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (c *command) start(ctx context.Context) error {
return fmt.Errorf("failed to initialize runtime config: %w", err)
}
defer func() {
if err := rtc.Cleanup(); err != nil {
if err := rtc.Spec.Cleanup(); err != nil {
logrus.WithError(err).Warn("Failed to cleanup runtime config")
}
}()
Expand Down Expand Up @@ -329,10 +329,7 @@ func (c *command) start(ctx context.Context) error {
}

if !c.SingleNode && !slices.Contains(c.DisableComponents, constant.ControlAPIComponentName) {
nodeComponents.Add(ctx, &controller.K0SControlAPI{
ConfigPath: c.CfgFile,
K0sVars: c.K0sVars,
})
nodeComponents.Add(ctx, &controller.K0SControlAPI{RuntimeConfig: rtc})
}

if !slices.Contains(c.DisableComponents, constant.CsrApproverComponentName) {
Expand Down
23 changes: 15 additions & 8 deletions pkg/component/controller/k0scontrolapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@ limitations under the License.
package controller

import (
"bytes"
"context"
"io"
"os"

"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/config"
"github.com/k0sproject/k0s/pkg/supervisor"
"sigs.k8s.io/yaml"
)

// K0SControlAPI implements the k0s control API component
type K0SControlAPI struct {
ConfigPath string
K0sVars *config.CfgVars
RuntimeConfig *config.RuntimeConfig

supervisor supervisor.Supervisor
}

Expand All @@ -48,15 +51,19 @@ func (m *K0SControlAPI) Start(_ context.Context) error {
if err != nil {
return err
}

runtimeConfig, err := yaml.Marshal(m.RuntimeConfig)
if err != nil {
return err
}

m.supervisor = supervisor.Supervisor{
Name: "k0s-control-api",
BinPath: selfExe,
RunDir: m.K0sVars.RunDir,
DataDir: m.K0sVars.DataDir,
Args: []string{
"api",
"--data-dir=" + m.K0sVars.DataDir,
},
RunDir: m.RuntimeConfig.Spec.K0sVars.RunDir,
DataDir: m.RuntimeConfig.Spec.K0sVars.DataDir,
Args: []string{"api"},
Stdin: func() io.Reader { return bytes.NewReader(runtimeConfig) },
}

return m.supervisor.Supervise()
Expand Down
46 changes: 28 additions & 18 deletions pkg/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
var (
ErrK0sNotRunning = errors.New("k0s is not running")
ErrK0sAlreadyRunning = errors.New("an instance of k0s is already running")
ErrInvalidRuntimeConfig = errors.New("invalid runtime config")
ErrInvalidRuntimeConfig = errors.New("invalid runtime configuration")
)

// Runtime config is a static copy of the start up config and CfgVars that is used by
Expand All @@ -65,23 +65,11 @@ func LoadRuntimeConfig(path string) (*RuntimeConfigSpec, error) {
return nil, err
}

config := &RuntimeConfig{}
if err := yaml.Unmarshal(content, config); err != nil {
return nil, err
}

if config.APIVersion != v1beta1.ClusterConfigAPIVersion {
return nil, fmt.Errorf("%w: invalid api version: %s", ErrInvalidRuntimeConfig, config.APIVersion)
}

if config.Kind != RuntimeConfigKind {
return nil, fmt.Errorf("%w: invalid kind: %s", ErrInvalidRuntimeConfig, config.Kind)
config, err := ParseRuntimeConfig(content)
if err != nil {
return nil, fmt.Errorf("failed to parse runtime configuration: %w", err)
}

spec := config.Spec
if spec == nil {
return nil, fmt.Errorf("%w: spec is nil", ErrInvalidRuntimeConfig)
}

// If a pid is defined but there's no process found, the instance of k0s is
// expected to have died, in which case the existing config is removed and
Expand All @@ -97,7 +85,29 @@ func LoadRuntimeConfig(path string) (*RuntimeConfigSpec, error) {
return spec, nil
}

func NewRuntimeConfig(k0sVars *CfgVars) (*RuntimeConfigSpec, error) {
func ParseRuntimeConfig(content []byte) (*RuntimeConfig, error) {
var config RuntimeConfig

if err := yaml.Unmarshal(content, &config); err != nil {
return nil, err
}

if config.APIVersion != v1beta1.ClusterConfigAPIVersion {
return nil, fmt.Errorf("%w: invalid api version: %q", ErrInvalidRuntimeConfig, config.APIVersion)
}

if config.Kind != RuntimeConfigKind {
return nil, fmt.Errorf("%w: invalid kind: %q", ErrInvalidRuntimeConfig, config.Kind)
}

if config.Spec == nil {
return nil, fmt.Errorf("%w: spec is nil", ErrInvalidRuntimeConfig)
}

return &config, nil
}

func NewRuntimeConfig(k0sVars *CfgVars) (*RuntimeConfig, error) {
if _, err := LoadRuntimeConfig(k0sVars.RuntimeConfigPath); err == nil {
return nil, ErrK0sAlreadyRunning
}
Expand Down Expand Up @@ -135,7 +145,7 @@ func NewRuntimeConfig(k0sVars *CfgVars) (*RuntimeConfigSpec, error) {
return nil, fmt.Errorf("failed to write runtime config: %w", err)
}

return cfg.Spec, nil
return cfg, nil
}

func (r *RuntimeConfigSpec) Cleanup() error {
Expand Down
7 changes: 4 additions & 3 deletions pkg/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ func TestNewRuntimeConfig(t *testing.T) {
}

// create a new runtime config and check if it's valid
spec, err := NewRuntimeConfig(k0sVars)
cfg, err := NewRuntimeConfig(k0sVars)
spec := cfg.Spec
assert.NoError(t, err)
assert.NotNil(t, spec)
assert.Same(t, k0sVars, spec.K0sVars)
assert.Equal(t, os.Getpid(), spec.Pid)
assert.NotNil(t, spec.NodeConfig)
cfg, err := spec.K0sVars.NodeConfig()
nodeConfig, err := spec.K0sVars.NodeConfig()
assert.NoError(t, err)
assert.Equal(t, "10.0.0.1", cfg.Spec.API.Address)
assert.Equal(t, "10.0.0.1", nodeConfig.Spec.API.Address)
assert.FileExists(t, rtConfigPath)

// try to create a new runtime config when one is already active and check if it returns an error
Expand Down
Loading
Loading