Skip to content

Commit

Permalink
feat: Dev stateless cni (#2276)
Browse files Browse the repository at this point in the history
* feat: 🌈 StatelessCNI: Adding getEndpoint and UpdateEndpoint API to CNS (#2102)

* Adding getEndpoint and UpdateEndpoint API to CNS with the respective clients in support of stateless CNI.

* Updating the unit tests and address the comments.

* Addressing the comments.

* Addressing the coments regarding CNS support for Stateless CNI

* Adddressing the PR comments

* 🌈 feat: adding flags for stateless cni (#2103)

feat: stateless cni

* 🌈 feat: StatelessCNI: Applying stateless CNI mode changes in network package. (#2197)

* Apllying stateless CNI mode in network package.

* Addresing the commetns.

* feat: create stateless cni binary for swift (#2275)

* enabling CNS telemetry

* Master rebase changes

* CNI Telemetry enabled on CNS

* Stateless CNI changes.

* making change to CNSendpointStorePath

* Updating makefile to avoid creating statless CNI release.

---------

Co-authored-by: Vipul Singh <vipul21sept@gmail.com>
  • Loading branch information
behzad-mir and vipul-21 authored Dec 12, 2023
1 parent 4034aad commit c4fe3a8
Show file tree
Hide file tree
Showing 19 changed files with 947 additions and 132 deletions.
1 change: 1 addition & 0 deletions cni/linux.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ COPY . .
RUN GOOS=$OS CGO_ENABLED=0 go build -a -o /go/bin/azure-vnet -trimpath -ldflags "-X main.version="$VERSION"" -gcflags="-dwarflocationlists=true" cni/network/plugin/main.go
RUN GOOS=$OS CGO_ENABLED=0 go build -a -o /go/bin/azure-vnet-telemetry -trimpath -ldflags "-X main.version="$VERSION"" -gcflags="-dwarflocationlists=true" cni/telemetry/service/telemetrymain.go
RUN GOOS=$OS CGO_ENABLED=0 go build -a -o /go/bin/azure-vnet-ipam -trimpath -ldflags "-X main.version="$VERSION"" -gcflags="-dwarflocationlists=true" cni/ipam/plugin/main.go
RUN GOOS=$OS CGO_ENABLED=0 go build -a -o /go/bin/azurecni-stateless -trimpath -ldflags "-X main.version="$VERSION"" -gcflags="-dwarflocationlists=true" cni/network/stateless/main.go

FROM mcr.microsoft.com/cbl-mariner/base/core:2.0 AS compressor
ARG OS
Expand Down
98 changes: 98 additions & 0 deletions cni/network/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package network

import (
"encoding/json"
"io"
"os"
"reflect"

"github.com/Azure/azure-container-networking/cni"
"github.com/Azure/azure-container-networking/telemetry"
"github.com/containernetworking/cni/pkg/skel"
cniTypes "github.com/containernetworking/cni/pkg/types"
"github.com/pkg/errors"
"go.uber.org/zap"
)

// send error report to hostnetagent if CNI encounters any error.
func ReportPluginError(reportManager *telemetry.ReportManager, tb *telemetry.TelemetryBuffer, err error) {
logger.Error("Report plugin error")
reflect.ValueOf(reportManager.Report).Elem().FieldByName("ErrorMessage").SetString(err.Error())

if err := reportManager.SendReport(tb); err != nil {
logger.Error("SendReport failed", zap.Error(err))
}
}

func validateConfig(jsonBytes []byte) error {
var conf struct {
Name string `json:"name"`
}
if err := json.Unmarshal(jsonBytes, &conf); err != nil {
return errors.Wrapf(err, "error reading network config")
}
if conf.Name == "" {
return errors.New("missing network name")
}
return nil
}

func getCmdArgsFromEnv() (string, *skel.CmdArgs, error) {
logger.Info("Going to read from stdin")
stdinData, err := io.ReadAll(os.Stdin)
if err != nil {
return "", nil, errors.Wrapf(err, "error reading from stdin")
}

cmdArgs := &skel.CmdArgs{
ContainerID: os.Getenv("CNI_CONTAINERID"),
Netns: os.Getenv("CNI_NETNS"),
IfName: os.Getenv("CNI_IFNAME"),
Args: os.Getenv("CNI_ARGS"),
Path: os.Getenv("CNI_PATH"),
StdinData: stdinData,
}

cmd := os.Getenv("CNI_COMMAND")
return cmd, cmdArgs, nil
}

func HandleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) {
isupdate := true

if os.Getenv("CNI_COMMAND") != cni.CmdUpdate {
return false, nil
}

logger.Info("CNI UPDATE received")

_, cmdArgs, err := getCmdArgsFromEnv()
if err != nil {
logger.Error("Received error while retrieving cmds from environment", zap.Error(err))
return isupdate, err
}

logger.Info("Retrieved command args for update", zap.Any("args", cmdArgs))
err = validateConfig(cmdArgs.StdinData)
if err != nil {
logger.Error("Failed to handle CNI UPDATE", zap.Error(err))
return isupdate, err
}

err = update(cmdArgs)
if err != nil {
logger.Error("Failed to handle CNI UPDATE", zap.Error(err))
return isupdate, err
}

return isupdate, nil
}

func PrintCNIError(msg string) {
logger.Error(msg)
cniErr := &cniTypes.Error{
Code: cniTypes.ErrTryAgainLater,
Msg: msg,
}
cniErr.Print()
}
11 changes: 7 additions & 4 deletions cni/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
options := make(map[string]any)
networkID, err = plugin.getNetworkName(args.Netns, &ipamAddResult, nwCfg)

endpointID := GetEndpointID(args)
endpointID := plugin.nm.GetEndpointID(args.ContainerID, args.IfName)
policies := cni.GetPoliciesFromNwCfg(nwCfg.AdditionalArgs)

// Check whether the network already exists.
Expand Down Expand Up @@ -1041,12 +1041,15 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
// Log the error but return success if the network is not found.
// if cni hits this, mostly state file would be missing and it can be reboot scenario where
// container runtime tries to delete and create pods which existed before reboot.
// this condition will not apply to stateless CNI since the network struct will be crated on each call
err = nil
return err
if !plugin.nm.IsStatelessCNIMode() {
return err
}
}
}

endpointID := GetEndpointID(args)
endpointID := plugin.nm.GetEndpointID(args.ContainerID, args.IfName)
// Query the endpoint.
if epInfo, err = plugin.nm.GetEndpointInfo(networkID, endpointID); err != nil {
logger.Info("GetEndpoint",
Expand Down Expand Up @@ -1077,7 +1080,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
zap.String("endpointID", endpointID))
sendEvent(plugin, fmt.Sprintf("Deleting endpoint:%v", endpointID))
// Delete the endpoint.
if err = plugin.nm.DeleteEndpoint(networkID, endpointID); err != nil {
if err = plugin.nm.DeleteEndpoint(networkID, endpointID, epInfo); err != nil {
// return a retriable error so the container runtime will retry this DEL later
// the implementation of this function returns nil if the endpoint doens't exist, so
// we don't have to check that here
Expand Down
103 changes: 8 additions & 95 deletions cni/network/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
package main

import (
"encoding/json"
"fmt"
"io"
"os"
"reflect"
"time"

"github.com/Azure/azure-container-networking/aitelemetry"
Expand All @@ -21,8 +18,6 @@ import (
"github.com/Azure/azure-container-networking/platform"
"github.com/Azure/azure-container-networking/store"
"github.com/Azure/azure-container-networking/telemetry"
"github.com/containernetworking/cni/pkg/skel"
cniTypes "github.com/containernetworking/cni/pkg/types"
"github.com/pkg/errors"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -57,96 +52,14 @@ func printVersion() {
fmt.Printf("Azure CNI Version %v\n", version)
}

// send error report to hostnetagent if CNI encounters any error.
func reportPluginError(reportManager *telemetry.ReportManager, tb *telemetry.TelemetryBuffer, err error) {
logger.Error("Report plugin error")
reflect.ValueOf(reportManager.Report).Elem().FieldByName("ErrorMessage").SetString(err.Error())

if err := reportManager.SendReport(tb); err != nil {
logger.Error("SendReport failed", zap.Error(err))
}
}

func validateConfig(jsonBytes []byte) error {
var conf struct {
Name string `json:"name"`
}
if err := json.Unmarshal(jsonBytes, &conf); err != nil {
return fmt.Errorf("error reading network config: %s", err)
}
if conf.Name == "" {
return fmt.Errorf("missing network name")
}
return nil
}

func getCmdArgsFromEnv() (string, *skel.CmdArgs, error) {
logger.Info("Going to read from stdin")
stdinData, err := io.ReadAll(os.Stdin)
if err != nil {
return "", nil, fmt.Errorf("error reading from stdin: %v", err)
}

cmdArgs := &skel.CmdArgs{
ContainerID: os.Getenv("CNI_CONTAINERID"),
Netns: os.Getenv("CNI_NETNS"),
IfName: os.Getenv("CNI_IFNAME"),
Args: os.Getenv("CNI_ARGS"),
Path: os.Getenv("CNI_PATH"),
StdinData: stdinData,
}

cmd := os.Getenv("CNI_COMMAND")
return cmd, cmdArgs, nil
}

func handleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) {
isupdate := true

if os.Getenv("CNI_COMMAND") != cni.CmdUpdate {
return false, nil
}

logger.Info("CNI UPDATE received")

_, cmdArgs, err := getCmdArgsFromEnv()
if err != nil {
logger.Error("Received error while retrieving cmds from environment", zap.Error(err))
return isupdate, err
}

logger.Info("Retrieved command args for update", zap.Any("args", cmdArgs))
err = validateConfig(cmdArgs.StdinData)
if err != nil {
logger.Error("Failed to handle CNI UPDATE", zap.Error(err))
return isupdate, err
}

err = update(cmdArgs)
if err != nil {
logger.Error("Failed to handle CNI UPDATE", zap.Error(err))
return isupdate, err
}

return isupdate, nil
}

func printCNIError(msg string) {
logger.Error(msg)
cniErr := &cniTypes.Error{
Code: cniTypes.ErrTryAgainLater,
Msg: msg,
}
cniErr.Print()
}

func rootExecute() error {
var (
config common.PluginConfig
tb *telemetry.TelemetryBuffer
)

config.Version = version

reportManager := &telemetry.ReportManager{
HostNetAgentURL: hostNetAgentURL,
ContentType: telemetry.ContentType,
Expand All @@ -169,7 +82,7 @@ func rootExecute() error {
&network.Multitenancy{},
)
if err != nil {
printCNIError(fmt.Sprintf("Failed to create network plugin, err:%v.\n", err))
network.PrintCNIError(fmt.Sprintf("Failed to create network plugin, err:%v.\n", err))
return errors.Wrap(err, "Create plugin error")
}

Expand All @@ -190,15 +103,15 @@ func rootExecute() error {

// CNI Acquires lock
if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil {
printCNIError(fmt.Sprintf("Failed to initialize key-value store of network plugin: %v", err))
network.PrintCNIError(fmt.Sprintf("Failed to initialize key-value store of network plugin: %v", err))

tb = telemetry.NewTelemetryBuffer(logger)
if tberr := tb.Connect(); tberr != nil {
logger.Error("Cannot connect to telemetry service", zap.Error(tberr))
return errors.Wrap(err, "lock acquire error")
}

reportPluginError(reportManager, tb, err)
network.ReportPluginError(reportManager, tb, err)

if errors.Is(err, store.ErrTimeoutLockingStore) {
var cniMetric telemetry.AIMetric
Expand Down Expand Up @@ -239,8 +152,8 @@ func rootExecute() error {
cniReport.Timestamp = t.Format("2006-01-02 15:04:05")

if err = netPlugin.Start(&config); err != nil {
printCNIError(fmt.Sprintf("Failed to start network plugin, err:%v.\n", err))
reportPluginError(reportManager, tb, err)
network.PrintCNIError(fmt.Sprintf("Failed to start network plugin, err:%v.\n", err))
network.ReportPluginError(reportManager, tb, err)
panic("network plugin start fatal error")
}

Expand All @@ -263,7 +176,7 @@ func rootExecute() error {
}
}

handled, _ := handleIfCniUpdate(netPlugin.Update)
handled, _ := network.HandleIfCniUpdate(netPlugin.Update)
if handled {
logger.Info("CNI UPDATE finished.")
} else if err = netPlugin.Execute(cni.PluginApi(netPlugin)); err != nil {
Expand All @@ -277,7 +190,7 @@ func rootExecute() error {
netPlugin.Stop()

if err != nil {
reportPluginError(reportManager, tb, err)
network.ReportPluginError(reportManager, tb, err)
}

return errors.Wrap(err, "Execute netplugin failure")
Expand Down
Loading

0 comments on commit c4fe3a8

Please sign in to comment.