Skip to content

Commit

Permalink
refactor: simply the usb device plugin again
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Yu <jack.yu@suse.com>
  • Loading branch information
Yu-Jack committed Jul 11, 2024
1 parent e78dc0d commit 4b1cdc8
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 117 deletions.
95 changes: 20 additions & 75 deletions pkg/controller/usbdevice/usbdevice_claim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"reflect"
"sync"
"time"

"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -18,18 +17,12 @@ import (
)

type DevClaimHandler struct {
usbClaimClient ctldevicerv1beta1.USBDeviceClaimClient
usbClient ctldevicerv1beta1.USBDeviceClient
virtClient ctlkubevirtv1.KubeVirtClient
lock *sync.Mutex
usbDeviceCache ctldevicerv1beta1.USBDeviceCache
managedDeviceControllers map[string]*deviceController
}

type deviceController struct {
device *deviceplugins.USBDevicePlugin
stop chan struct{}
started bool
usbClaimClient ctldevicerv1beta1.USBDeviceClaimClient
usbClient ctldevicerv1beta1.USBDeviceClient
virtClient ctlkubevirtv1.KubeVirtClient
lock *sync.Mutex
usbDeviceCache ctldevicerv1beta1.USBDeviceCache
managedDevicePlugins map[string]*deviceplugins.USBDevicePlugin
}

func NewClaimHandler(
Expand All @@ -39,12 +32,12 @@ func NewClaimHandler(
virtClient ctlkubevirtv1.KubeVirtClient,
) *DevClaimHandler {
return &DevClaimHandler{
usbDeviceCache: usbDeviceCache,
usbClaimClient: usbClaimClient,
usbClient: usbClient,
virtClient: virtClient,
lock: &sync.Mutex{},
managedDeviceControllers: map[string]*deviceController{},
usbDeviceCache: usbDeviceCache,
usbClaimClient: usbClaimClient,
usbClient: usbClient,
virtClient: virtClient,
lock: &sync.Mutex{},
managedDevicePlugins: map[string]*deviceplugins.USBDevicePlugin{},
}
}

Expand Down Expand Up @@ -88,7 +81,7 @@ func (h *DevClaimHandler) OnUSBDeviceClaimChanged(_ string, usbDeviceClaim *v1be
return usbDeviceClaim, err
}

deviceControl, ok := h.managedDeviceControllers[usbDeviceClaim.Name]
devicePlugin, ok := h.managedDevicePlugins[usbDeviceClaim.Name]

if !ok {
usbDevicePlugin, err := deviceplugins.NewUSBDevicePlugin(*usbDevice)
Expand All @@ -98,15 +91,12 @@ func (h *DevClaimHandler) OnUSBDeviceClaimChanged(_ string, usbDeviceClaim *v1be
return usbDeviceClaim, err
}

dc := &deviceController{
device: usbDevicePlugin,
}
h.managedDeviceControllers[usbDeviceClaim.Name] = dc
deviceControl = dc
h.managedDevicePlugins[usbDeviceClaim.Name] = usbDevicePlugin
devicePlugin = usbDevicePlugin
}

if !deviceControl.started {
deviceControl.startDevicePlugin(usbDeviceClaim.Name)
if !devicePlugin.IsStarted() {
devicePlugin.StartDevicePlugin()
}

if !usbDevice.Status.Enabled {
Expand Down Expand Up @@ -179,9 +169,9 @@ func (h *DevClaimHandler) OnRemove(_ string, claim *v1beta1.USBDeviceClaim) (*v1
}
}

if deviceControl, ok := h.managedDeviceControllers[claim.Name]; ok {
deviceControl.stopDevicePlugin()
delete(h.managedDeviceControllers, claim.Name)
if devicePlugin, ok := h.managedDevicePlugins[claim.Name]; ok {
devicePlugin.StopDevicePlugin()
delete(h.managedDevicePlugins, claim.Name)
}

usbDeviceCp := usbDevice.DeepCopy()
Expand Down Expand Up @@ -230,48 +220,3 @@ func (h *DevClaimHandler) updateKubeVirt(virt *kubevirtv1.KubeVirt, usbDevice *v

return h.virtClient.Update(virtDp)
}

// startDevicePlugin starts the usb device plugin.
// In current device plugin design, we'll use stop to control the flow.
// It's different from `StopDevicePlugin` in usb_device_plugin.go.
//
// The stop from outside is to control the device plugin to start or stop.
// The done from inside `StopDevicePlugin` is to control the inner flow, such as grpc server.
//
// Besides, it already calls `defer StopDevicePlugin` when calling `dc.device.Start`.
// So, we won't call `StopDevicePlugin` again.
func (dc *deviceController) startDevicePlugin(deviceName string) {
if dc.started {
return
}

dc.stop = make(chan struct{})

go func() {
for {
// This will be blocked by a channel read inside function
if err := dc.device.Start(dc.stop); err != nil {
logrus.Errorf("Error starting %s device plugin", deviceName)
}

select {
case <-dc.stop:
return
case <-time.After(5 * time.Second):
// try to start device plugin again when getting error
continue
}
}
}()

dc.started = true
}

func (dc *deviceController) stopDevicePlugin() {
if !dc.started {
return
}

close(dc.stop)
dc.started = false
}
92 changes: 50 additions & 42 deletions pkg/deviceplugins/usb_device_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,16 @@ var (
// The actual plugin
type USBDevicePlugin struct {
socketPath string
stop <-chan struct{}
stop chan struct{}
update chan struct{}
done chan struct{}
deregistered chan struct{}
server *grpc.Server
resourceName string
device *PluginDevice
logger *log.FilteredLogger

initialized bool
lock *sync.Mutex
started bool
lock *sync.Mutex
}

type PluginDevice struct {
Expand All @@ -84,12 +83,8 @@ func (pd *PluginDevice) toKubeVirtDevicePlugin() *pluginapi.Device {
}
}

func (plugin *USBDevicePlugin) Device() *PluginDevice {
return plugin.device
}

func (plugin *USBDevicePlugin) setDeviceHealth(isHealthy bool) {
pd := plugin.Device()
pd := plugin.device
isDifferent := pd.isHealthy != isHealthy
pd.isHealthy = isHealthy
if isDifferent {
Expand All @@ -101,32 +96,11 @@ func (plugin *USBDevicePlugin) devicesToKubeVirtDevicePlugin() []*pluginapi.Devi
return []*pluginapi.Device{plugin.device.toKubeVirtDevicePlugin()}
}

func (plugin *USBDevicePlugin) GetInitialized() bool {
plugin.lock.Lock()
defer plugin.lock.Unlock()
return plugin.initialized
}

func (plugin *USBDevicePlugin) setInitialized(initialized bool) {
plugin.lock.Lock()
plugin.initialized = initialized
plugin.lock.Unlock()
}

func (plugin *USBDevicePlugin) GetDeviceName() string {
func (plugin *USBDevicePlugin) DeviceName() string {
return plugin.resourceName
}

func (plugin *USBDevicePlugin) stopDevicePlugin() error {
defer func() {
select {
case <-plugin.done:
return
default:
close(plugin.done)
}
}()

// Give the device plugin one second to properly deregister
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
Expand All @@ -136,13 +110,10 @@ func (plugin *USBDevicePlugin) stopDevicePlugin() error {
}

plugin.server.Stop()
plugin.setInitialized(false)
return plugin.cleanup()
}

func (plugin *USBDevicePlugin) Start(stop <-chan struct{}) error {
plugin.stop = stop
plugin.done = make(chan struct{})
func (plugin *USBDevicePlugin) startDevicePlugin() error {
plugin.deregistered = make(chan struct{})

err := plugin.cleanup()
Expand Down Expand Up @@ -180,7 +151,6 @@ func (plugin *USBDevicePlugin) Start(stop <-chan struct{}) error {
errChan <- plugin.healthCheck()
}()

plugin.setInitialized(true)
plugin.logger.Infof("%s device plugin started", plugin.resourceName)
err = <-errChan

Expand Down Expand Up @@ -291,7 +261,7 @@ func (plugin *USBDevicePlugin) register() error {
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(plugin.socketPath),
ResourceName: plugin.GetDeviceName(),
ResourceName: plugin.DeviceName(),
}

_, err = client.Register(context.Background(), reqt)
Expand Down Expand Up @@ -353,7 +323,7 @@ func (plugin *USBDevicePlugin) Allocate(_ context.Context, allocRequest *plugina
for _, id := range request.DevicesIDs {
plugin.logger.V(2).Infof("usb device id: %s", id)

pluginDevice := plugin.Device()
pluginDevice := plugin.device
if pluginDevice == nil {
plugin.logger.V(2).Infof("usb disappeared: %s", id)
continue
Expand All @@ -370,7 +340,6 @@ func (plugin *USBDevicePlugin) Allocate(_ context.Context, allocRequest *plugina
return nil, fmt.Errorf("error setting the permission the socket %s: %v", pluginDevice.DevicePath, err)
}

// We might have more than one USB device per resource name
key := util.ResourceNameToEnvVar(v1.USBResourcePrefix, plugin.resourceName)
value := fmt.Sprintf("%d:%d", pluginDevice.Bus, pluginDevice.DeviceNumber)
if previous, exist := env[key]; exist {
Expand Down Expand Up @@ -421,9 +390,9 @@ func NewUSBDevicePlugin(usb v1beta1.USBDevice) (*USBDevicePlugin, error) {
DeviceNumber: deviceNumber,
isHealthy: true,
},
logger: log.Log.With("subcomponent", resourceID),
initialized: false,
lock: &sync.Mutex{},
logger: log.Log.With("subcomponent", resourceID),
started: false,
lock: &sync.Mutex{},
}, nil
}

Expand All @@ -446,3 +415,42 @@ func generateBusAndDevice(devicePath string) (int, int, error) {

return bus, deviceNumber, nil
}

func (plugin *USBDevicePlugin) StartDevicePlugin() {
if plugin.started {
return
}

plugin.stop = make(chan struct{})
plugin.started = true

go func() {
for {
// This will be blocked by a channel read inside function
if err := plugin.startDevicePlugin(); err != nil {
logrus.Errorf("Error starting %s device plugin", plugin.resourceName)
}

select {
case <-plugin.stop:
return
case <-time.After(5 * time.Second):
// try to start device plugin again when getting error
continue
}
}
}()
}

func (plugin *USBDevicePlugin) StopDevicePlugin() {
if !plugin.started {
return
}

close(plugin.stop)
plugin.started = false
}

func (plugin *USBDevicePlugin) IsStarted() bool {
return plugin.started
}
19 changes: 19 additions & 0 deletions pkg/deviceplugins/usb_device_plugin_helper.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
package deviceplugins

/* This file was part of the KubeVirt project, copied to this project
* to get around private package issues.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright 2024 SUSE, LLC.
*
*/

import (
"bufio"
"fmt"
Expand Down

0 comments on commit 4b1cdc8

Please sign in to comment.