Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dev stateless cni #2276

Merged
merged 10 commits into from
Dec 12, 2023
1 change: 1 addition & 0 deletions cni/linux.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ COPY . .
RUN GOOS=$OS CGO_ENABLED=0 go build -a -o /go/bin/azure-vnet -trimpath -ldflags "-X main.version="$VERSION"" -gcflags="-dwarflocationlists=true" cni/network/plugin/main.go
RUN GOOS=$OS CGO_ENABLED=0 go build -a -o /go/bin/azure-vnet-telemetry -trimpath -ldflags "-X main.version="$VERSION"" -gcflags="-dwarflocationlists=true" cni/telemetry/service/telemetrymain.go
RUN GOOS=$OS CGO_ENABLED=0 go build -a -o /go/bin/azure-vnet-ipam -trimpath -ldflags "-X main.version="$VERSION"" -gcflags="-dwarflocationlists=true" cni/ipam/plugin/main.go
RUN GOOS=$OS CGO_ENABLED=0 go build -a -o /go/bin/azurecni-stateless -trimpath -ldflags "-X main.version="$VERSION"" -gcflags="-dwarflocationlists=true" cni/network/stateless/main.go

FROM mcr.microsoft.com/cbl-mariner/base/core:2.0 AS compressor
ARG OS
Expand Down
98 changes: 98 additions & 0 deletions cni/network/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package network

import (
"encoding/json"
"io"
"os"
"reflect"

"github.com/Azure/azure-container-networking/cni"
"github.com/Azure/azure-container-networking/telemetry"
"github.com/containernetworking/cni/pkg/skel"
cniTypes "github.com/containernetworking/cni/pkg/types"
"github.com/pkg/errors"
"go.uber.org/zap"
)

// send error report to hostnetagent if CNI encounters any error.
func ReportPluginError(reportManager *telemetry.ReportManager, tb *telemetry.TelemetryBuffer, err error) {
logger.Error("Report plugin error")
reflect.ValueOf(reportManager.Report).Elem().FieldByName("ErrorMessage").SetString(err.Error())

if err := reportManager.SendReport(tb); err != nil {
logger.Error("SendReport failed", zap.Error(err))
}
}

func validateConfig(jsonBytes []byte) error {
var conf struct {
Name string `json:"name"`
}
if err := json.Unmarshal(jsonBytes, &conf); err != nil {
return errors.Wrapf(err, "error reading network config")
}
if conf.Name == "" {
return errors.New("missing network name")
}
return nil
}

func getCmdArgsFromEnv() (string, *skel.CmdArgs, error) {
logger.Info("Going to read from stdin")
stdinData, err := io.ReadAll(os.Stdin)
if err != nil {
return "", nil, errors.Wrapf(err, "error reading from stdin")
}

cmdArgs := &skel.CmdArgs{
ContainerID: os.Getenv("CNI_CONTAINERID"),
Netns: os.Getenv("CNI_NETNS"),
IfName: os.Getenv("CNI_IFNAME"),
Args: os.Getenv("CNI_ARGS"),
Path: os.Getenv("CNI_PATH"),
StdinData: stdinData,
}

cmd := os.Getenv("CNI_COMMAND")
return cmd, cmdArgs, nil
}

func HandleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) {
isupdate := true

if os.Getenv("CNI_COMMAND") != cni.CmdUpdate {
return false, nil
}

logger.Info("CNI UPDATE received")

_, cmdArgs, err := getCmdArgsFromEnv()
if err != nil {
logger.Error("Received error while retrieving cmds from environment", zap.Error(err))
return isupdate, err
}

logger.Info("Retrieved command args for update", zap.Any("args", cmdArgs))
err = validateConfig(cmdArgs.StdinData)
if err != nil {
logger.Error("Failed to handle CNI UPDATE", zap.Error(err))
return isupdate, err
}

err = update(cmdArgs)
if err != nil {
logger.Error("Failed to handle CNI UPDATE", zap.Error(err))
return isupdate, err
}

return isupdate, nil
}

func PrintCNIError(msg string) {
logger.Error(msg)
cniErr := &cniTypes.Error{
Code: cniTypes.ErrTryAgainLater,
Msg: msg,
}
cniErr.Print()
}
11 changes: 7 additions & 4 deletions cni/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
options := make(map[string]any)
networkID, err = plugin.getNetworkName(args.Netns, &ipamAddResult, nwCfg)

endpointID := GetEndpointID(args)
endpointID := plugin.nm.GetEndpointID(args.ContainerID, args.IfName)
policies := cni.GetPoliciesFromNwCfg(nwCfg.AdditionalArgs)

// Check whether the network already exists.
Expand Down Expand Up @@ -1041,12 +1041,15 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
// Log the error but return success if the network is not found.
// if cni hits this, mostly state file would be missing and it can be reboot scenario where
// container runtime tries to delete and create pods which existed before reboot.
// this condition will not apply to stateless CNI since the network struct will be crated on each call
err = nil
return err
if !plugin.nm.IsStatelessCNIMode() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this check if it doesn't apply to stateless cni?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment on top of it :
// this condition will not apply to stateless CNI since the network struct will be crated on each call

return err
}
}
}

endpointID := GetEndpointID(args)
endpointID := plugin.nm.GetEndpointID(args.ContainerID, args.IfName)
// Query the endpoint.
if epInfo, err = plugin.nm.GetEndpointInfo(networkID, endpointID); err != nil {
logger.Info("GetEndpoint",
Expand Down Expand Up @@ -1077,7 +1080,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
zap.String("endpointID", endpointID))
sendEvent(plugin, fmt.Sprintf("Deleting endpoint:%v", endpointID))
// Delete the endpoint.
if err = plugin.nm.DeleteEndpoint(networkID, endpointID); err != nil {
if err = plugin.nm.DeleteEndpoint(networkID, endpointID, epInfo); err != nil {
// return a retriable error so the container runtime will retry this DEL later
// the implementation of this function returns nil if the endpoint doens't exist, so
// we don't have to check that here
Expand Down
103 changes: 8 additions & 95 deletions cni/network/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
package main

import (
"encoding/json"
"fmt"
"io"
"os"
"reflect"
"time"

"github.com/Azure/azure-container-networking/aitelemetry"
Expand All @@ -21,8 +18,6 @@ import (
"github.com/Azure/azure-container-networking/platform"
"github.com/Azure/azure-container-networking/store"
"github.com/Azure/azure-container-networking/telemetry"
"github.com/containernetworking/cni/pkg/skel"
cniTypes "github.com/containernetworking/cni/pkg/types"
"github.com/pkg/errors"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -57,96 +52,14 @@ func printVersion() {
fmt.Printf("Azure CNI Version %v\n", version)
}

// send error report to hostnetagent if CNI encounters any error.
func reportPluginError(reportManager *telemetry.ReportManager, tb *telemetry.TelemetryBuffer, err error) {
logger.Error("Report plugin error")
reflect.ValueOf(reportManager.Report).Elem().FieldByName("ErrorMessage").SetString(err.Error())

if err := reportManager.SendReport(tb); err != nil {
logger.Error("SendReport failed", zap.Error(err))
}
}

func validateConfig(jsonBytes []byte) error {
var conf struct {
Name string `json:"name"`
}
if err := json.Unmarshal(jsonBytes, &conf); err != nil {
return fmt.Errorf("error reading network config: %s", err)
}
if conf.Name == "" {
return fmt.Errorf("missing network name")
}
return nil
}

func getCmdArgsFromEnv() (string, *skel.CmdArgs, error) {
logger.Info("Going to read from stdin")
stdinData, err := io.ReadAll(os.Stdin)
if err != nil {
return "", nil, fmt.Errorf("error reading from stdin: %v", err)
}

cmdArgs := &skel.CmdArgs{
ContainerID: os.Getenv("CNI_CONTAINERID"),
Netns: os.Getenv("CNI_NETNS"),
IfName: os.Getenv("CNI_IFNAME"),
Args: os.Getenv("CNI_ARGS"),
Path: os.Getenv("CNI_PATH"),
StdinData: stdinData,
}

cmd := os.Getenv("CNI_COMMAND")
return cmd, cmdArgs, nil
}

func handleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) {
isupdate := true

if os.Getenv("CNI_COMMAND") != cni.CmdUpdate {
return false, nil
}

logger.Info("CNI UPDATE received")

_, cmdArgs, err := getCmdArgsFromEnv()
if err != nil {
logger.Error("Received error while retrieving cmds from environment", zap.Error(err))
return isupdate, err
}

logger.Info("Retrieved command args for update", zap.Any("args", cmdArgs))
err = validateConfig(cmdArgs.StdinData)
if err != nil {
logger.Error("Failed to handle CNI UPDATE", zap.Error(err))
return isupdate, err
}

err = update(cmdArgs)
if err != nil {
logger.Error("Failed to handle CNI UPDATE", zap.Error(err))
return isupdate, err
}

return isupdate, nil
}

func printCNIError(msg string) {
logger.Error(msg)
cniErr := &cniTypes.Error{
Code: cniTypes.ErrTryAgainLater,
Msg: msg,
}
cniErr.Print()
}

func rootExecute() error {
var (
config common.PluginConfig
tb *telemetry.TelemetryBuffer
)

config.Version = version

reportManager := &telemetry.ReportManager{
HostNetAgentURL: hostNetAgentURL,
ContentType: telemetry.ContentType,
Expand All @@ -169,7 +82,7 @@ func rootExecute() error {
&network.Multitenancy{},
)
if err != nil {
printCNIError(fmt.Sprintf("Failed to create network plugin, err:%v.\n", err))
network.PrintCNIError(fmt.Sprintf("Failed to create network plugin, err:%v.\n", err))
return errors.Wrap(err, "Create plugin error")
}

Expand All @@ -190,15 +103,15 @@ func rootExecute() error {

// CNI Acquires lock
if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil {
printCNIError(fmt.Sprintf("Failed to initialize key-value store of network plugin: %v", err))
network.PrintCNIError(fmt.Sprintf("Failed to initialize key-value store of network plugin: %v", err))

tb = telemetry.NewTelemetryBuffer(logger)
if tberr := tb.Connect(); tberr != nil {
logger.Error("Cannot connect to telemetry service", zap.Error(tberr))
return errors.Wrap(err, "lock acquire error")
}

reportPluginError(reportManager, tb, err)
network.ReportPluginError(reportManager, tb, err)

if errors.Is(err, store.ErrTimeoutLockingStore) {
var cniMetric telemetry.AIMetric
Expand Down Expand Up @@ -239,8 +152,8 @@ func rootExecute() error {
cniReport.Timestamp = t.Format("2006-01-02 15:04:05")

if err = netPlugin.Start(&config); err != nil {
printCNIError(fmt.Sprintf("Failed to start network plugin, err:%v.\n", err))
reportPluginError(reportManager, tb, err)
network.PrintCNIError(fmt.Sprintf("Failed to start network plugin, err:%v.\n", err))
network.ReportPluginError(reportManager, tb, err)
panic("network plugin start fatal error")
}

Expand All @@ -263,7 +176,7 @@ func rootExecute() error {
}
}

handled, _ := handleIfCniUpdate(netPlugin.Update)
handled, _ := network.HandleIfCniUpdate(netPlugin.Update)
if handled {
logger.Info("CNI UPDATE finished.")
} else if err = netPlugin.Execute(cni.PluginApi(netPlugin)); err != nil {
Expand All @@ -277,7 +190,7 @@ func rootExecute() error {
netPlugin.Stop()

if err != nil {
reportPluginError(reportManager, tb, err)
network.ReportPluginError(reportManager, tb, err)
}

return errors.Wrap(err, "Execute netplugin failure")
Expand Down
Loading
Loading