Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the IMEX controller code #189

Merged
merged 2 commits into from
Oct 28, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions cmd/nvidia-dra-controller/imex.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand All @@ -41,8 +42,12 @@ const (
ImexDomainLabel = "nvidia.com/gpu.imex-domain"
ResourceSliceImexChannelLimit = 128
DriverImexChannelLimit = 2048
RetryTimeout = 1 * time.Minute
)

// transientError defines an error indicating that it is transient.
type transientError struct{ error }
ArangoGutierrez marked this conversation as resolved.
Show resolved Hide resolved

// imexDomainOffsets represents the offset for assigning IMEX channels
// to ResourceSlices for each <imex-domain, cliqueid> combination.
type imexDomainOffsets map[string]map[string]int
Expand All @@ -51,6 +56,7 @@ type ImexManager struct {
driverName string
resourceSliceImexChannelLimit int
driverImexChannelLimit int
retryTimeout time.Duration
waitGroup sync.WaitGroup
clientset kubernetes.Interface
imexDomainOffsets imexDomainOffsets
Expand Down Expand Up @@ -95,6 +101,7 @@ func StartIMEXManager(ctx context.Context, config *Config) (*ImexManager, error)
driverName: DriverName,
resourceSliceImexChannelLimit: ResourceSliceImexChannelLimit,
driverImexChannelLimit: DriverImexChannelLimit,
retryTimeout: RetryTimeout,
clientset: clientset,
owner: owner,
driverResources: driverResources,
Expand Down Expand Up @@ -133,14 +140,26 @@ func (m *ImexManager) manageResourceSlices(ctx context.Context) error {
klog.Infof("Adding channels for new IMEX domain: %v", addedDomain)
if err := m.addImexDomain(addedDomain); err != nil {
klog.Errorf("Error adding channels for IMEX domain %s: %v", addedDomain, err)
return
if errors.As(err, &transientError{}) {
ArangoGutierrez marked this conversation as resolved.
Show resolved Hide resolved
klog.Infof("Retrying adding channels for IMEX domain %s after %v", addedDomain, m.retryTimeout)
go func() {
time.Sleep(m.retryTimeout)
addedDomainsCh <- addedDomain
}()
}
}
controller.Update(m.driverResources)
case removedDomain := <-removedDomainsCh:
klog.Infof("Removing channels for removed IMEX domain: %v", removedDomain)
if err := m.removeImexDomain(removedDomain); err != nil {
klog.Errorf("Error removing channels for IMEX domain %s: %v", removedDomain, err)
return
if errors.As(err, &transientError{}) {
klog.Infof("Retrying removing channels for IMEX domain %s after %v", removedDomain, m.retryTimeout)
go func() {
time.Sleep(m.retryTimeout)
removedDomainsCh <- removedDomain
}()
}
}
controller.Update(m.driverResources)
case <-ctx.Done():
Expand Down Expand Up @@ -334,7 +353,7 @@ func (offsets imexDomainOffsets) add(imexDomainID string, cliqueID string, resou

// If we reach the limit, return an error
if offset == driverImexChannelLimit {
return -1, fmt.Errorf("channel limit reached")
return -1, transientError{fmt.Errorf("channel limit reached")}
ArangoGutierrez marked this conversation as resolved.
Show resolved Hide resolved
}
offsets[imexDomainID][cliqueID] = offset

Expand Down