Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sotw][linear] Fix missing watch cleanup in linear cache for sotw watches subscribing to multiple resources #854

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

type watches = map[chan Response]struct{}
type watches = map[ResponseWatch]struct{}

// LinearCache supports collections of opaque resources. This cache has a
// single collection indexed by resource names and manages resource versions
Expand Down Expand Up @@ -113,7 +113,7 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
return out
}

func (cache *LinearCache) respond(value chan Response, staleResources []string) {
func (cache *LinearCache) respond(watch ResponseWatch, staleResources []string) {
var resources []types.ResourceWithTTL
// TODO: optimize the resources slice creations across different clients
if len(staleResources) == 0 {
Expand All @@ -130,8 +130,8 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string)
}
}
}
value <- &RawResponse{
Request: &Request{TypeUrl: cache.typeURL},
watch.Response <- &RawResponse{
Request: watch.Request,
Resources: resources,
Version: cache.getVersion(),
Ctx: context.Background(),
Expand All @@ -140,18 +140,18 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string)

func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
// de-duplicate watches that need to be responded
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment threw me off a bit when reading the code. It's not de-duplicating iiuc (there are no duplicates), it is constructing a mapping of watches -> list of resources names, no?

For linear cache/SOTW, here the list of strings in the mapping should simply be request.ResourceNames, no? Maybe it'd be clearer to just use this, since you have the request at heand in the ResponseWatch?

Copy link
Contributor Author

@valerian-roche valerian-roche Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is deduplicating through the map to send a simple reply with n resources rather than n replies with one
On whether to use request.ResourceNames, this would yield a different behavior as:

  • this is not normalized for wildcard parts for instance
  • can target resources which don't exist in the cache, and the respond implementation might not expect that
  • for eds/rds, we don't want to return all resources but only the ones matching modified

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes a difference in theory, the request contains all the resources you may need to respond to, so

  1. for CDS/LDS and wildcard you need to send all resources for this type
  2. for RDS/EDS/* you need the intersection of resource names in modified and what is in request.ResourceNames

notifyList := make(map[chan Response][]string)
notifyList := make(map[ResponseWatch][]string)
for name := range modified {
for watch := range cache.watches[name] {
notifyList[watch] = append(notifyList[watch], name)
}
delete(cache.watches, name)
}
for value, stale := range notifyList {
cache.respond(value, stale)
for watch, stale := range notifyList {
cache.removeWatch(watch)
cache.respond(watch, stale)
}
for value := range cache.watchAll {
cache.respond(value, nil)
for watch := range cache.watchAll {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So watchAll are wildcards, right? I think it would be more clear to simply call that watchWildcards

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can update that in a separate PR

cache.respond(watch, nil)
}
cache.watchAll = make(watches)

Expand Down Expand Up @@ -318,6 +318,8 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
err = errors.New("mis-matched version prefix")
}

watch := ResponseWatch{Request: request, Response: value}

cache.mu.Lock()
defer cache.mu.Unlock()

Expand All @@ -337,16 +339,16 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
}
}
if stale {
cache.respond(value, staleResources)
cache.respond(watch, staleResources)
return nil
}
// Create open watches since versions are up to date.
if len(request.GetResourceNames()) == 0 {
cache.watchAll[value] = struct{}{}
cache.watchAll[watch] = struct{}{}
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.watchAll, value)
delete(cache.watchAll, watch)
}
}
for _, name := range request.GetResourceNames() {
Expand All @@ -355,19 +357,24 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
set = make(watches)
cache.watches[name] = set
}
set[value] = struct{}{}
set[watch] = struct{}{}
}
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
for _, name := range request.GetResourceNames() {
set, exists := cache.watches[name]
if exists {
delete(set, value)
}
if len(set) == 0 {
delete(cache.watches, name)
}
cache.removeWatch(watch)
}
}

// Must be called under lock
func (cache *LinearCache) removeWatch(watch ResponseWatch) {
// Make sure we clean the watch for ALL resources it might be associated with,
// as the channel will no longer be listened to
for _, resource := range watch.Request.ResourceNames {
resourceWatches := cache.watches[resource]
delete(resourceWatches, watch)
if len(resourceWatches) == 0 {
delete(cache.watches, resource)
}
}
}
Expand Down
90 changes: 89 additions & 1 deletion pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,14 @@ func testResource(s string) types.Resource {

func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) {
t.Helper()
r := <-ch
var r Response
select {
case r = <-ch:
case <-time.After(1 * time.Second):
t.Error("failed to receive response after 1 second")
return
}

if r.GetRequest().GetTypeUrl() != testType {
t.Errorf("unexpected empty request type URL: %q", r.GetRequest().GetTypeUrl())
}
Expand All @@ -63,6 +70,9 @@ func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) {
if out.GetTypeUrl() != testType {
t.Errorf("unexpected type URL: %q", out.GetTypeUrl())
}
if len(r.GetRequest().GetResourceNames()) != 0 && len(r.GetRequest().GetResourceNames()) < len(out.Resources) {
t.Errorf("received more resources (%d) than requested (%d)", len(r.GetRequest().GetResourceNames()), len(out.Resources))
}
}

type resourceInfo struct {
Expand Down Expand Up @@ -773,3 +783,81 @@ func TestLinearMixedWatches(t *testing.T) {
verifyResponse(t, w, c.getVersion(), 0)
verifyDeltaResponse(t, wd, nil, []string{"b"})
}

func TestLinearSotwWatches(t *testing.T) {
t.Run("watches are properly removed from all objects", func(t *testing.T) {
cache := NewLinearCache(testType)
a := &endpoint.ClusterLoadAssignment{ClusterName: "a"}
err := cache.UpdateResource("a", a)
require.NoError(t, err)
b := &endpoint.ClusterLoadAssignment{ClusterName: "b"}
err = cache.UpdateResource("b", b)
require.NoError(t, err)
assert.Equal(t, 2, cache.NumResources())

// A watch tracks three different objects.
// An update is done for the three objects in a row
// If the watches are no properly purged, all three updates will send responses in the channel, but only the first one is tracked
// The buffer will therefore saturate and the third request will deadlock the entire cache as occurring under the mutex
sotwState := stream.NewStreamState(false, nil)
w := make(chan Response, 1)
_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "c"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w)
mustBlock(t, w)
checkVersionMapNotSet(t, cache)

assert.Len(t, cache.watches["a"], 1)
assert.Len(t, cache.watches["b"], 1)
assert.Len(t, cache.watches["c"], 1)

// Update a and c without touching b
a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update
{Priority: 25},
}}
err = cache.UpdateResources(map[string]types.Resource{"a": a}, nil)
require.NoError(t, err)
verifyResponse(t, w, cache.getVersion(), 1)
checkVersionMapNotSet(t, cache)

assert.Empty(t, cache.watches["a"])
assert.Empty(t, cache.watches["b"])
assert.Empty(t, cache.watches["c"])

// c no longer watched
w = make(chan Response, 1)
_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w)
require.NoError(t, err)
mustBlock(t, w)
checkVersionMapNotSet(t, cache)

b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update
{Priority: 15},
}}
err = cache.UpdateResources(map[string]types.Resource{"b": b}, nil)

assert.Empty(t, cache.watches["a"])
assert.Empty(t, cache.watches["b"])
assert.Empty(t, cache.watches["c"])

require.NoError(t, err)
verifyResponse(t, w, cache.getVersion(), 1)
checkVersionMapNotSet(t, cache)

w = make(chan Response, 1)
_ = cache.CreateWatch(&Request{ResourceNames: []string{"c"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w)
require.NoError(t, err)
mustBlock(t, w)
checkVersionMapNotSet(t, cache)

c := &endpoint.ClusterLoadAssignment{ClusterName: "c", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update
{Priority: 15},
}}
err = cache.UpdateResources(map[string]types.Resource{"c": c}, nil)
require.NoError(t, err)
verifyResponse(t, w, cache.getVersion(), 1)
checkVersionMapNotSet(t, cache)

assert.Empty(t, cache.watches["a"])
assert.Empty(t, cache.watches["b"])
assert.Empty(t, cache.watches["c"])
})
}