Skip to content

Commit

Permalink
Merge pull request juju#18681 from SimonRichardson/allow-limiter-to-g…
Browse files Browse the repository at this point in the history
…ive-up

juju#18681

If the resource opener has a limit set, either globally or for an application, we want to be able to tell the opener to give up. Either by creating a timeout or by the client closing. Either way, the original implementation didn't allow this, it would just block forever. We can swap out the channels for the weighted semaphore, which will allow the nice and easy cancellation of resources.

## QA steps


```sh
$ juju bootstrap lxd test
$ juju add-model default
$ juju controller-config controller-resource-download-limit=1
$ juju deploy juju-qa-test qa1 --resource foo-file=3
$ juju add-unit -n 10
```
  • Loading branch information
jujubot authored Jan 28, 2025
2 parents 01d9cbb + f63d06c commit 6b404c3
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 72 deletions.
26 changes: 21 additions & 5 deletions apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,9 @@ func newServer(ctx context.Context, cfg ServerConfig) (_ *Server, err error) {
healthStatus: "starting",
}
srv.updateAgentRateLimiter(controllerConfig)
srv.updateResourceDownloadLimiters(controllerConfig)
if err := srv.updateResourceDownloadLimiters(controllerConfig); err != nil {
return nil, errors.Trace(err)
}

// We are able to get the current controller config before subscribing to changes
// because the changes are only ever published in response to an API call,
Expand All @@ -408,8 +410,15 @@ func newServer(ctx context.Context, cfg ServerConfig) (_ *Server, err error) {
logger.Criticalf("programming error in %s message data: %v", topic, err)
return
}

srv.updateAgentRateLimiter(data.Config)
srv.updateResourceDownloadLimiters(data.Config)

// If the update fails, there is nothing else we can do but log the
// error. The server will continue to run with the old limits.
if err := srv.updateResourceDownloadLimiters(data.Config); err != nil {
logger.Errorf("failed to update resource download limiters: %v", err)
return
}
})
if err != nil {
logger.Criticalf("programming error in subscribe function: %v", err)
Expand Down Expand Up @@ -531,18 +540,25 @@ func (srv *Server) updateAgentRateLimiter(cfg controller.Config) {
srv.agentRateLimitRate = cfg.AgentRateLimitRate()
if srv.agentRateLimitMax > 0 {
srv.agentRateLimit = ratelimit.NewBucketWithClock(
srv.agentRateLimitRate, int64(srv.agentRateLimitMax), rateClock{srv.clock})
srv.agentRateLimitRate, int64(srv.agentRateLimitMax), rateClock{Clock: srv.clock})
} else {
srv.agentRateLimit = nil
}
}

func (srv *Server) updateResourceDownloadLimiters(cfg controller.Config) {
func (srv *Server) updateResourceDownloadLimiters(cfg controller.Config) error {
srv.mu.Lock()
defer srv.mu.Unlock()
globalLimit := cfg.ControllerResourceDownloadLimit()
appLimit := cfg.ApplicationResourceDownloadLimit()
srv.resourceLock = resource.NewResourceDownloadLimiter(globalLimit, appLimit)

var err error
srv.resourceLock, err = resource.NewResourceDownloadLimiter(globalLimit, appLimit)
if err != nil {
return errors.Trace(err)
}

return nil
}

func (srv *Server) getResourceDownloadLimiter() resource.ResourceDownloadLock {
Expand Down
106 changes: 82 additions & 24 deletions internal/resource/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
package resource

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"golang.org/x/sync/semaphore"
)

// ResourceDownloadLock is used to limit the number of concurrent
Expand All @@ -15,59 +20,73 @@ type ResourceDownloadLock interface {
// Acquire grabs the lock for a given application so long as the
// per application limit is not exceeded and total across all
// applications does not exceed the global limit.
Acquire(appName string)
Acquire(ctx context.Context, appName string) error

// Release releases the lock for the given application.
Release(appName string)
}

type resourceDownloadLimiter struct {
globalLock *semaphore.Weighted

mu sync.Mutex
applicationLimit int64
applicationLocks map[string]appLock
}

// NewResourceDownloadLimiter creates a new resource download limiter.
func NewResourceDownloadLimiter(globalLimit, applicationLimit int) *resourceDownloadLimiter {
func NewResourceDownloadLimiter(globalLimit, applicationLimit int) (*resourceDownloadLimiter, error) {
if globalLimit < 0 || applicationLimit < 0 {
return nil, fmt.Errorf("resource download limits must be non-negative")
}
limiter := &resourceDownloadLimiter{
applicationLimit: applicationLimit,
applicationLocks: make(map[string]chan struct{}),
applicationLimit: int64(applicationLimit),
applicationLocks: make(map[string]appLock),
}
if globalLimit > 0 {
limiter.globalLock = make(chan struct{}, globalLimit)
limiter.globalLock = semaphore.NewWeighted(int64(globalLimit))
}
return limiter
return limiter, nil
}

type resourceDownloadLimiter struct {
globalLock chan struct{}

mu sync.Mutex
applicationLimit int
applicationLocks map[string]chan struct{}
}

// Acquire implements ResourceDownloadLock.
func (r *resourceDownloadLimiter) Acquire(appName string) {
// Acquire grabs the lock for a given application so long as the per application
// limit is not exceeded and total across all applications does not exceed the
// global limit.
func (r *resourceDownloadLimiter) Acquire(ctx context.Context, appName string) error {
if r.globalLock != nil {
start := time.Now()
r.globalLock <- struct{}{}
if err := r.globalLock.Acquire(ctx, 1); err != nil {
return err
}
resourceLogger.Debugf("acquire global resource download lock for %q, took %dms", appName, time.Now().Sub(start)/time.Millisecond)
}
if r.applicationLimit <= 0 {
return
return nil
}

r.mu.Lock()
lock, ok := r.applicationLocks[appName]
if !ok {
lock = make(chan struct{}, r.applicationLimit)
lock = appLock{
lock: semaphore.NewWeighted(r.applicationLimit),
size: 0,
}
r.applicationLocks[appName] = lock
}
r.mu.Unlock()

start := time.Now()
lock <- struct{}{}
if err := lock.Acquire(ctx); err != nil {
return err
}
resourceLogger.Debugf("acquire app resource download lock for %q, took %dms", appName, time.Now().Sub(start)/time.Millisecond)
return nil
}

// Release implements ResourceDownloadLock.
// Release releases the lock for the given application.
func (r *resourceDownloadLimiter) Release(appName string) {
if r.globalLock != nil {
<-r.globalLock
r.globalLock.Release(1)
}
if r.applicationLimit <= 0 {
return
Expand All @@ -79,8 +98,47 @@ func (r *resourceDownloadLimiter) Release(appName string) {
if !ok {
return
}
<-lock
if len(lock) == 0 {
lock.Release()

if lock.Size() == 0 {
delete(r.applicationLocks, appName)
}
}

type appLock struct {
lock *semaphore.Weighted
size int64
}

// Acquire grabs the lock for a given application so long as the
// per-application limit is not exceeded and total across all
// applications does not exceed the global limit.
func (a *appLock) Acquire(ctx context.Context) error {
if err := a.lock.Acquire(ctx, 1); err != nil {
return err
}
atomic.AddInt64(&a.size, 1)
return nil
}

// Release releases the lock for the given application.
func (a *appLock) Release() {
a.lock.Release(1)
atomic.AddInt64(&a.size, -1)
}

// Size returns the current size of the lock.
func (a *appLock) Size() int64 {
return atomic.LoadInt64(&a.size)
}

// noopDownloadResourceLocker is a no-op download resource locker.
type noopDownloadResourceLocker struct{}

// Acquire grabs the lock for a given application so long as the
// per-application limit is not exceeded and total across all
// applications does not exceed the global limit.
func (noopDownloadResourceLocker) Acquire(context.Context, string) error { return nil }

// Release releases the lock for the given application.
func (noopDownloadResourceLocker) Release(string) {}
36 changes: 24 additions & 12 deletions internal/resource/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package resource

import (
context "context"
"sync"
"sync/atomic"
"time"
Expand All @@ -25,10 +26,21 @@ var shortAttempt = &utils.AttemptStrategy{
Delay: 10 * time.Millisecond,
}

func (s *LimiterSuite) TestNoLimits(c *gc.C) {
func (s *LimiterSuite) TestNoLimitsInvalidLimits(c *gc.C) {
_, err := NewResourceDownloadLimiter(-1, 0)
c.Assert(err, gc.ErrorMatches, "resource download limits must be non-negative")

_, err = NewResourceDownloadLimiter(0, -1)
c.Assert(err, gc.ErrorMatches, "resource download limits must be non-negative")

_, err = NewResourceDownloadLimiter(-1, -1)
c.Assert(err, gc.ErrorMatches, "resource download limits must be non-negative")
}

func (s *LimiterSuite) TestNoLimits(c *gc.C) {
const totalToAcquire = 10
limiter := NewResourceDownloadLimiter(0, 0)
limiter, err := NewResourceDownloadLimiter(0, 0)
c.Assert(err, jc.ErrorIsNil)

totalAcquiredCount := int32(0)
trigger := make(chan struct{})
Expand All @@ -40,7 +52,7 @@ func (s *LimiterSuite) TestNoLimits(c *gc.C) {
go func() {
defer finished.Done()
started.Done()
limiter.Acquire("app1")
limiter.Acquire(context.Background(), "app1")
atomic.AddInt32(&totalAcquiredCount, 1)
<-trigger
limiter.Release("app1")
Expand Down Expand Up @@ -82,12 +94,12 @@ func (s *LimiterSuite) TestNoLimits(c *gc.C) {
}

func (s *LimiterSuite) TestGlobalLimit(c *gc.C) {

const (
globalLimit = 5
totalToAcquire = 10
)
limiter := NewResourceDownloadLimiter(globalLimit, 0)
limiter, err := NewResourceDownloadLimiter(globalLimit, 0)
c.Assert(err, jc.ErrorIsNil)

totalAcquiredCount := int32(0)
trigger := make(chan struct{})
Expand All @@ -99,7 +111,7 @@ func (s *LimiterSuite) TestGlobalLimit(c *gc.C) {
go func() {
defer finished.Done()
started.Done()
limiter.Acquire("app1")
limiter.Acquire(context.Background(), "app1")
atomic.AddInt32(&totalAcquiredCount, 1)
<-trigger
limiter.Release("app1")
Expand Down Expand Up @@ -149,13 +161,13 @@ func (s *LimiterSuite) TestGlobalLimit(c *gc.C) {
}

func (s *LimiterSuite) TestApplicationLimit(c *gc.C) {

const (
applicationLimit = 5
numApplications = 2
totalToAcquirePerApplication = 10
)
limiter := NewResourceDownloadLimiter(0, applicationLimit)
limiter, err := NewResourceDownloadLimiter(0, applicationLimit)
c.Assert(err, jc.ErrorIsNil)

totalAcquiredCount := int32(0)
trigger := make(chan struct{})
Expand All @@ -171,7 +183,7 @@ func (s *LimiterSuite) TestApplicationLimit(c *gc.C) {
go func(uui string) {
defer finished.Done()
started.Done()
limiter.Acquire(uuid)
limiter.Acquire(context.Background(), uuid)
atomic.AddInt32(&totalAcquiredCount, 1)
<-trigger
limiter.Release(uuid)
Expand Down Expand Up @@ -222,14 +234,14 @@ func (s *LimiterSuite) TestApplicationLimit(c *gc.C) {
}

func (s *LimiterSuite) TestGlobalAndApplicationLimit(c *gc.C) {

const (
globalLimit = 5
applicationLimit = 3
numApplications = 3
totalToAcquirePerApplication = 2
)
limiter := NewResourceDownloadLimiter(globalLimit, applicationLimit)
limiter, err := NewResourceDownloadLimiter(globalLimit, applicationLimit)
c.Assert(err, jc.ErrorIsNil)

totalAcquiredCount := int32(0)
trigger := make(chan struct{})
Expand All @@ -247,7 +259,7 @@ func (s *LimiterSuite) TestGlobalAndApplicationLimit(c *gc.C) {
go func(uui string) {
defer finished.Done()
started.Done()
limiter.Acquire(uuid)
limiter.Acquire(context.Background(), uuid)
atomic.AddInt32(&totalAcquiredCount, 1)
<-trigger
limiter.Release(uuid)
Expand Down
22 changes: 6 additions & 16 deletions internal/resource/opener.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,17 +192,6 @@ func newClientGetter(
return clientGetter
}

// noopDownloadResourceLocker is a no-op download resource locker.
type noopDownloadResourceLocker struct{}

// Acquire grabs the lock for a given application so long as the
// per-application limit is not exceeded and total across all
// applications does not exceed the global limit.
func (noopDownloadResourceLocker) Acquire(string) {}

// Release releases the lock for the given application.
func (noopDownloadResourceLocker) Release(appName string) {}

// ResourceOpener is a ResourceOpener for charmhub. It will first look on the
// controller for the requested resource.
type ResourceOpener struct {
Expand All @@ -222,15 +211,16 @@ type ResourceOpener struct {

// OpenResource implements server.ResourceOpener.
func (ro ResourceOpener) OpenResource(ctx context.Context, name string) (opener coreresource.Opened, err error) {
appKey := fmt.Sprintf("%s:%s", ro.modelUUID, ro.appName)
lock := ro.resourceDownloadLimiterFunc()
lock.Acquire(appKey)

done := func() {
lock.Release(appKey)
appKey := fmt.Sprintf("%s:%s", ro.modelUUID, ro.appName)
if err := lock.Acquire(ctx, appKey); err != nil {
return coreresource.Opened{}, errors.Errorf("acquiring resource download lock for %s: %w", appKey, err)
}

return ro.getResource(ctx, name, done)
return ro.getResource(ctx, name, func() {
lock.Release(appKey)
})
}

var resourceMutex = kmutex.New()
Expand Down
Loading

0 comments on commit 6b404c3

Please sign in to comment.