Skip to content

Commit

Permalink
feat: use fsnotify instead of interval checking
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Yu <jack.yu@suse.com>
  • Loading branch information
Yu-Jack committed Jul 5, 2024
1 parent c41c51c commit 68e2669
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 26 deletions.
16 changes: 9 additions & 7 deletions pkg/controller/nodes/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ import (

"github.com/jaypipes/ghw"
ctlcorev1 "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"kubevirt.io/client-go/kubecli"

ctlnetworkv1beta1 "github.com/harvester/harvester-network-controller/pkg/generated/controllers/network.harvesterhci.io/v1beta1"

"github.com/harvester/pcidevices/pkg/config"
"github.com/harvester/pcidevices/pkg/controller/gpudevice"
"github.com/harvester/pcidevices/pkg/controller/usbdevice"

ctlnetworkv1beta1 "github.com/harvester/harvester-network-controller/pkg/generated/controllers/network.harvesterhci.io/v1beta1"

"github.com/harvester/pcidevices/pkg/apis/devices.harvesterhci.io/v1beta1"
"github.com/harvester/pcidevices/pkg/controller/pcidevice"
"github.com/harvester/pcidevices/pkg/controller/sriovdevice"
Expand Down Expand Up @@ -89,6 +90,12 @@ func Register(ctx context.Context, management *config.FactoryManager) error {
virtClient: virtClient,
}

usbHandler := usbdevice.NewHandler(h.usbCtl, h.usbClaimCtl, h.usbCtl.Cache(), h.usbClaimCtl.Cache())
if err := usbHandler.WatchUSBDevices(ctx); err != nil {
logrus.Errorf("error watching usb devices: %v", err)
return err
}

nodeCtl.OnChange(ctx, reconcilePCIDevices, h.reconcileNodeDevices)
return nil
}
Expand Down Expand Up @@ -117,11 +124,6 @@ func (h *handler) reconcileNodeDevices(name string, node *v1beta1.Node) (*v1beta
return nil, fmt.Errorf("error reconciling pcidevices for node %s: %v", h.nodeName, err)
}

usbHandler := usbdevice.NewHandler(h.usbCtl, h.usbClaimCtl, h.usbCtl.Cache(), h.usbClaimCtl.Cache())
if err := usbHandler.ReconcileUSBDevices(); err != nil {
return nil, fmt.Errorf("error reconciling usb devices for node %s: %v", h.nodeName, err)
}

// additional steps for sriov reconcile
sriovHelper := sriovdevice.NewHandler(h.ctx, h.sriovCache, h.sriovClient, h.nodeName, h.coreNodeCache, h.vlanConfigCache)
err = sriovHelper.SetupSriovDevices()
Expand Down
75 changes: 58 additions & 17 deletions pkg/controller/usbdevice/usbdevice_controller.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package usbdevice

import (
"context"
"fmt"
"io/fs"
"path/filepath"
"strings"
"sync"

"github.com/fsnotify/fsnotify"
"github.com/rancher/wrangler/pkg/relatedresource"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -22,26 +27,11 @@ type DevHandler struct {
usbClaimClient ctldevicerv1vbeta1.USBDeviceClaimClient
usbCache ctldevicerv1vbeta1.USBDeviceCache
usbClaimCache ctldevicerv1vbeta1.USBDeviceClaimCache
watchLock *sync.Mutex
}

var walkUSBDevices = deviceplugins.WalkUSBDevices

type USBDevice struct {
Name string
Manufacturer string
Vendor int
Product int
BCD int
Bus int
DeviceNumber int
Serial string
DevicePath string
}

func (dev *USBDevice) GetID() string {
return fmt.Sprintf("%04x:%04x-%02d:%02d", dev.Vendor, dev.Product, dev.Bus, dev.DeviceNumber)
}

func NewHandler(
usbClient ctldevicerv1vbeta1.USBDeviceClient,
usbClaimClient ctldevicerv1vbeta1.USBDeviceClaimClient,
Expand All @@ -53,6 +43,7 @@ func NewHandler(
usbClaimClient: usbClaimClient,
usbCache: usbCache,
usbClaimCache: usbClaimCache,
watchLock: &sync.Mutex{},
}
}

Expand Down Expand Up @@ -85,7 +76,57 @@ func (h *DevHandler) OnDeviceChange(_ string, _ string, obj runtime.Object) ([]r
return nil, nil
}

func (h *DevHandler) ReconcileUSBDevices() error {
func (h *DevHandler) WatchUSBDevices(ctx context.Context) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("failed to creating a fsnotify watcher: %v", err)
}

if err := filepath.WalkDir("/dev/bus/usb/", func(path string, info fs.DirEntry, err error) error {
if info.IsDir() {
if err := watcher.Add(path); err != nil {
return fmt.Errorf("failed to watch device %s parent directory: %s", path, err)
}
}

return nil
}); err != nil {
return fmt.Errorf("failed to walk /dev/bus/usb: %v", err)
}

go func() {
defer watcher.Close()

for {
select {
case <-ctx.Done():
return
case _, ok := <-watcher.Events:
if !ok {
return
}

// we need reconcile whatever there is a change in /dev/bus/usb/xxx
if err := h.reconcile(); err != nil {
logrus.Errorf("failed to reconcile USB devices: %v", err)
}
case err, ok := <-watcher.Errors:
if !ok {
return
}

logrus.Errorf("fsnotify watcher error: %v", err)
}
}
}()

return nil
}

func (h *DevHandler) reconcile() error {
h.watchLock.Lock()
defer h.watchLock.Unlock()

nodeName := cl.nodeName

localUSBDevices, err := walkUSBDevices()
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/usbdevice/usbdevice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func Test_ReconcileUSBDevices(t *testing.T) {
fakeclients.USBDeviceClaimsCache(client.DevicesV1beta1().USBDeviceClaims),
)

err := usbHandler.ReconcileUSBDevices()
err := usbHandler.reconcile()
assert.NoError(t, err)

list, err := client.DevicesV1beta1().USBDevices().List(context.Background(), metav1.ListOptions{})
Expand All @@ -62,7 +62,7 @@ func Test_ReconcileUSBDevices(t *testing.T) {
// detect no usb device after few minutes, delete existing USBDevice CR
walkUSBDevices = func() (map[int][]*deviceplugins.USBDevice, error) { return map[int][]*deviceplugins.USBDevice{}, nil }

err = usbHandler.ReconcileUSBDevices()
err = usbHandler.reconcile()
assert.NoError(t, err)

list, err = client.DevicesV1beta1().USBDevices().List(context.Background(), metav1.ListOptions{})
Expand Down

0 comments on commit 68e2669

Please sign in to comment.