Skip to content

Commit

Permalink
feat: Flags and stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
ananthb committed Aug 7, 2024
1 parent 58ace81 commit f20e744
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 79 deletions.
78 changes: 78 additions & 0 deletions flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"flag"
"fmt"
"log/slog"
"net"
"net/url"
"os"
"path"
"strings"
)

var logLevel = flag.String("log-level", "info", "log level")
var listenAddr = flag.String("listen-addr", "/run/systemd/cri/cri.sock", "address to listen on")
var stateDir = flag.String("state-dir", "/var/lib/systemd/cri", "directory to store state")
var version = flag.Bool("version", false, "Print version and exit")

func init() {
flag.Parse()

var level slog.Level
if err := level.UnmarshalText([]byte(*logLevel)); err != nil {
slog.Error("error parsing log level", "error", err)
os.Exit(1)
}
handler := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level})
slog.SetDefault(slog.New(handler))

if sd, ok := os.LookupEnv("STATE_DIRECTORY"); *stateDir == "" && ok {
*stateDir = sd
}

if err := os.MkdirAll(*stateDir, 0755); err != nil {
slog.Error("error creating state directory", "error", err)
os.Exit(1)
}
}

func listen() (net.Listener, error) {
if *listenAddr == "" {
return nil, nil
}

addr, err := url.Parse(*listenAddr)
if err != nil {
return nil, err
}

network := "unix"
address := path.Join(addr.Host, addr.Path)

switch addr.Scheme {
case "unix":
case "tcp":
network = "tcp"
address = addr.Host
case "":
if strings.Contains(addr.Path, ":") {
network = "tcp"
address = addr.Path
}
default:
return nil, fmt.Errorf("unsupported scheme %s", addr.Scheme)
}

if network == "unix" {
if err := os.Remove(address); err != nil && !os.IsNotExist(err) {
return nil, err
}

if err := os.MkdirAll(path.Dir(address), 0755); err != nil {
return nil, err
}
}

return net.Listen(network, address)
}
56 changes: 50 additions & 6 deletions internal/crisvc/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,47 @@ package crisvc

import (
"context"
"os"
"path/filepath"

"github.com/containers/image/v5/copy"
"github.com/containers/image/v5/directory"
"github.com/containers/image/v5/signature"
"github.com/containers/image/v5/transports/alltransports"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)

func (i *criService) ListImages(
context.Context,
*runtimeapi.ListImagesRequest,
ctx context.Context,
req *runtimeapi.ListImagesRequest,
) (*runtimeapi.ListImagesResponse, error) {
return nil, nil
dis, err := os.ReadDir(i.imagesDir())
if err != nil {
return nil, err
}

images := make([]*runtimeapi.Image, 0, len(dis))
for _, di := range dis {
if !di.IsDir() {
continue
}

image, err := directory.NewReference(filepath.Join(i.imagesDir(), di.Name()))
if err != nil {
return nil, err
}

images = append(images, &runtimeapi.Image{
Id: image.DockerReference().String(),
RepoTags: []string{image.DockerReference().String()},
RepoDigests: []string{image.DockerReference().String()},
Size_: 0,
})
}

return &runtimeapi.ListImagesResponse{
Images: images,
}, nil
}

func (i *criService) ImageStatus(
Expand All @@ -29,26 +58,33 @@ func (i *criService) PullImage(
) (*runtimeapi.PullImageResponse, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

policyContext, err := signature.NewPolicyContext(&signature.Policy{
Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()},
})
if err != nil {
return nil, err
}

srcRef, err := alltransports.ParseImageName(req.Image.GetImage())
if err != nil {
return nil, err
}
destRef, err := alltransports.ParseImageName("docker://localhost:5000/alpine:latest")

destDir := i.imageDir(srcRef.DockerReference().String())

dir, err := directory.NewReference(destDir)
if err != nil {
return nil, err
}

options := &copy.Options{}
if _, err := copy.Image(ctx, policyContext, destRef, srcRef, options); err != nil {
if _, err := copy.Image(ctx, policyContext, dir, srcRef, options); err != nil {
return nil, err
}

response := &runtimeapi.PullImageResponse{
ImageRef: destRef.DockerReference().String(),
ImageRef: dir.DockerReference().String(),
}
return response, nil
}
Expand All @@ -66,3 +102,11 @@ func (i *criService) ImageFsInfo(
) (*runtimeapi.ImageFsInfoResponse, error) {
return nil, nil
}

func (c *criService) imagesDir() string {
return filepath.Join(c.stateDir, "images")
}

func (c *criService) imageDir(imageName string) string {
return filepath.Join(c.imagesDir(), imageName)
}
2 changes: 1 addition & 1 deletion internal/crisvc/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type criService struct {
runtimeClient runtime.RuntimeServiceClient
stateDir string
}

func (r *criService) RuntimeConfig(ctx context.Context, req *runtime.RuntimeConfigRequest) (*runtime.RuntimeConfigResponse, error) {
Expand Down
6 changes: 4 additions & 2 deletions internal/crisvc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type CRIService interface {
runtimeapi.ImageServiceServer
}

func New() (CRIService, error) {
return &criService{}, nil
func New(stateDir string) (CRIService, error) {
return &criService{
stateDir: stateDir,
}, nil
}
72 changes: 2 additions & 70 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,18 @@ package main

import (
"context"
"flag"
"fmt"
"log/slog"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"runtime/debug"
"strings"

"github.com/ananthb/systemd-cri/internal/crisvc"
"github.com/coreos/go-systemd/v22/daemon"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)

var logLevel = flag.String("log-level", "info", "log level")
var listenAddr = flag.String("listen-addr", "unix:///run/systemd-cri.sock", "address to listen on")
var version = flag.Bool("version", false, "Print version and exit")

func init() {
flag.Parse()

var level slog.Level
if err := level.UnmarshalText([]byte(*logLevel)); err != nil {
slog.Error("error parsing log level", "error", err)
os.Exit(1)
}
handler := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level})
slog.SetDefault(slog.New(handler))
}

func main() {
if *version {
v := "(devel)"
Expand All @@ -50,14 +27,13 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

cri, err := crisvc.New()
cri, err := crisvc.New(*stateDir)
if err != nil {
slog.Error("error creating service", "error", err)
os.Exit(1)
}

grpcServer := grpc.NewServer()
httpServer := new(http.Server)

runtime.RegisterRuntimeServiceServer(grpcServer, cri)
runtime.RegisterImageServiceServer(grpcServer, cri)
Expand All @@ -68,23 +44,13 @@ func main() {
os.Exit(1)
}

m := cmux.New(lis)
grpcLis := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
httpLis := m.Match(cmux.HTTP1Fast())

slog.Info("starting server", "address", *listenAddr)
go func() {
if err := grpcServer.Serve(grpcLis); err != nil {
if err := grpcServer.Serve(lis); err != nil {
slog.Error("error serving grpc", "error", err)
os.Exit(1)
}
}()
go func() {
if err := httpServer.Serve(httpLis); err != nil {
slog.Error("error serving http", "error", err)
os.Exit(1)
}
}()

_, _ = daemon.SdNotify(false, daemon.SdNotifyReady)

Expand All @@ -97,38 +63,4 @@ func main() {
}

grpcServer.GracefulStop()

ctx, cancel = context.WithTimeout(context.Background(), 5)
defer cancel()
if err := httpServer.Shutdown(ctx); err != nil {
slog.Error("error shutting down http", "error", err)
}
}

func listen() (net.Listener, error) {
if *listenAddr == "" {
return nil, nil
}

addr, err := url.Parse(*listenAddr)
if err != nil {
return nil, err
}

unixAddr := path.Join(addr.Host, addr.Path)

switch addr.Scheme {
case "unix":
return net.Listen("unix", unixAddr)
case "tcp":
return net.Listen("tcp", addr.Host)
case "":
if strings.Contains(addr.Path, ":") {
return net.Listen("tcp", addr.Host)
}

return net.Listen("unix", unixAddr)
default:
return nil, fmt.Errorf("unsupported scheme %s", addr.Scheme)
}
}

0 comments on commit f20e744

Please sign in to comment.