Skip to content
This repository was archived by the owner on Apr 29, 2024. It is now read-only.

Commit 4afbe79

Browse files
Aestekmkeeler
authored andcommitted
Improve blocking queries on services that do not exist (hashicorp#4810)
## Background When making a blocking query on a missing service (was never registered, or is not registered anymore) the query returns as soon as any service is updated. On clusters with frequent updates (5~10 updates/s in our DCs) these queries virtually do not block, and clients with no protections againt this waste ressources on the agent and server side. Clients that do protect against this get updates later than they should because of the backoff time they implement between requests. ## Implementation While reducing the number of unnecessary updates we still want : * Clients to be notified as soon as when the last instance of a service disapears. * Clients to be notified whenever there's there is an update for the service. * Clients to be notified as soon as the first instance of the requested service is added. To reduce the number of unnecessary updates we need to block when a request to a missing service is made. However in the following case : 1. Client `client1` makes a query for service `foo`, gets back a node and X-Consul-Index 42 2. `foo` is unregistered 3. `client1`  makes a query for `foo` with `index=42` -> `foo` does not exist, the query blocks and `client1` is not notified of the change on `foo` We could store the last raft index when each service was last alive to know wether we should block on the incoming query or not, but that list could grow indefinetly. We instead store the last raft index when a service was unregistered and use it when a query targets a service that does not exist. When a service `srv` is unregistered this "missing service index" is always greater than any X-Consul-Index held by the clients while `srv` was up, allowing us to immediatly notify them. 1. Client `client1` makes a query for service `foo`, gets back a node and `X-Consul-Index: 42` 2. `foo` is unregistered, we set the "missing service index" to 43 3. `client1` makes a blocking query for `foo` with `index=42` -> `foo` does not exist, we check against the "missing service index" and return immediatly with `X-Consul-Index: 43` 4. `client1` makes a blocking query for `foo` with `index=43` -> we block 5. Other changes happen in the cluster, but foo still doesn't exist and "missing service index" hasn't changed, the query is still blocked 6. `foo` is registered again on index 62 -> `foo` exists and its index is greater than 43, we unblock the query
1 parent 4db60f8 commit 4afbe79

File tree

2 files changed

+104
-26
lines changed

2 files changed

+104
-26
lines changed

agent/consul/state/catalog.go

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import (
1313

1414
const (
1515
servicesTableName = "services"
16+
17+
// serviceLastExtinctionIndexName keeps track of the last raft index when the last instance
18+
// of any service was unregistered. This is used by blocking queries on missing services.
19+
serviceLastExtinctionIndexName = "service_last_extinction"
1620
)
1721

1822
// nodesTableSchema returns a new table schema used for storing node
@@ -841,19 +845,30 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
841845
}
842846

843847
// maxIndexForService return the maximum Raft Index for a service
844-
// If the index is not set for the service, it will return:
845-
// - maxIndex(nodes, services) if checks is false
846-
// - maxIndex(nodes, services, checks) if checks is true
847-
func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) uint64 {
848-
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
849-
if err == nil {
850-
if idx, ok := transaction.(*IndexEntry); ok {
851-
return idx.Value
852-
}
848+
// If the index is not set for the service, it will return the missing
849+
// service index.
850+
// The service_last_extinction is set to the last raft index when a service
851+
// was unregistered (or 0 if no services were ever unregistered). This
852+
// allows blocking queries to
853+
// * return when the last instance of a service is removed
854+
// * block until an instance for this service is available, or another
855+
// service is unregistered.
856+
func maxIndexForService(tx *memdb.Txn, serviceName string, serviceExists, checks bool) uint64 {
857+
if !serviceExists {
858+
res, err := tx.First("index", "id", serviceLastExtinctionIndexName)
859+
if missingIdx, ok := res.(*IndexEntry); ok && err == nil {
860+
return missingIdx.Value
861+
}
862+
}
863+
864+
res, err := tx.First("index", "id", serviceIndexName(serviceName))
865+
if idx, ok := res.(*IndexEntry); ok && err == nil {
866+
return idx.Value
853867
}
854868
if checks {
855869
return maxIndexTxn(tx, "nodes", "services", "checks")
856870
}
871+
857872
return maxIndexTxn(tx, "nodes", "services")
858873
}
859874

@@ -873,9 +888,6 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
873888
tx := s.db.Txn(false)
874889
defer tx.Abort()
875890

876-
// Get the table index.
877-
idx := maxIndexForService(tx, serviceName, false)
878-
879891
// Function for lookup
880892
var f func() (memdb.ResultIterator, error)
881893
if !connect {
@@ -905,6 +917,10 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
905917
if err != nil {
906918
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
907919
}
920+
921+
// Get the table index.
922+
idx := maxIndexForService(tx, serviceName, len(results) > 0, false)
923+
908924
return idx, results, nil
909925
}
910926

@@ -914,9 +930,6 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
914930
tx := s.db.Txn(false)
915931
defer tx.Abort()
916932

917-
// Get the table index.
918-
idx := maxIndexForService(tx, service, false)
919-
920933
// List all the services.
921934
services, err := tx.Get("services", "service", service)
922935
if err != nil {
@@ -925,9 +938,11 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
925938
ws.Add(services.WatchCh())
926939

927940
// Gather all the services and apply the tag filter.
941+
serviceExists := false
928942
var results structs.ServiceNodes
929943
for service := services.Next(); service != nil; service = services.Next() {
930944
svc := service.(*structs.ServiceNode)
945+
serviceExists = true
931946
if !serviceTagsFilter(svc, tags) {
932947
results = append(results, svc)
933948
}
@@ -938,6 +953,9 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
938953
if err != nil {
939954
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
940955
}
956+
// Get the table index.
957+
idx := maxIndexForService(tx, service, serviceExists, false)
958+
941959
return idx, results, nil
942960
}
943961

@@ -1214,6 +1232,11 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID
12141232
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
12151233
}
12161234
}
1235+
1236+
if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil {
1237+
return fmt.Errorf("failed updating missing service index: %s", err)
1238+
}
1239+
12171240
}
12181241
} else {
12191242
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
@@ -1438,7 +1461,7 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
14381461
defer tx.Abort()
14391462

14401463
// Get the table index.
1441-
idx := maxIndexForService(tx, serviceName, true)
1464+
idx := maxIndexForService(tx, serviceName, true, true)
14421465
// Return the checks.
14431466
iter, err := tx.Get("checks", "service", serviceName)
14441467
if err != nil {
@@ -1627,9 +1650,6 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
16271650
tx := s.db.Txn(false)
16281651
defer tx.Abort()
16291652

1630-
// Get the table index.
1631-
idx := maxIndexForService(tx, serviceName, true)
1632-
16331653
// Function for lookup
16341654
var f func() (memdb.ResultIterator, error)
16351655
if !connect {
@@ -1654,6 +1674,10 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
16541674
for service := iter.Next(); service != nil; service = iter.Next() {
16551675
results = append(results, service.(*structs.ServiceNode))
16561676
}
1677+
1678+
// Get the table index.
1679+
idx := maxIndexForService(tx, serviceName, len(results) > 0, true)
1680+
16571681
return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
16581682
}
16591683

@@ -1663,9 +1687,6 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags
16631687
tx := s.db.Txn(false)
16641688
defer tx.Abort()
16651689

1666-
// Get the table index.
1667-
idx := maxIndexForService(tx, serviceName, true)
1668-
16691690
// Query the state store for the service.
16701691
iter, err := tx.Get("services", "service", serviceName)
16711692
if err != nil {
@@ -1674,13 +1695,18 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags
16741695
ws.Add(iter.WatchCh())
16751696

16761697
// Return the results, filtering by tag.
1698+
serviceExists := false
16771699
var results structs.ServiceNodes
16781700
for service := iter.Next(); service != nil; service = iter.Next() {
16791701
svc := service.(*structs.ServiceNode)
1702+
serviceExists = true
16801703
if !serviceTagsFilter(svc, tags) {
16811704
results = append(results, svc)
16821705
}
16831706
}
1707+
1708+
// Get the table index.
1709+
idx := maxIndexForService(tx, serviceName, serviceExists, true)
16841710
return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
16851711
}
16861712

agent/consul/state/catalog_test.go

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2848,16 +2848,68 @@ func TestIndexIndependence(t *testing.T) {
28482848
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
28492849

28502850
testRegisterService(t, s, 18, "node1", "service_new")
2851-
// Since service does not exists anymore, its index should be last insert
2852-
// The behaviour is the same as all non-existing services, meaning
2853-
// we properly did collect garbage
2854-
ensureServiceVersion(t, s, ws, "service_shared", 18, 0)
2851+
2852+
// Since service does not exists anymore, its index should be that of
2853+
// the last deleted service
2854+
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
2855+
28552856
// No index should exist anymore, it must have been garbage collected
28562857
ensureIndexForService(t, s, ws, "service_shared", 0)
28572858
if !watchFired(ws) {
28582859
t.Fatalf("bad")
28592860
}
2861+
}
2862+
2863+
func TestMissingServiceIndex(t *testing.T) {
2864+
s := testStateStore(t)
2865+
2866+
// Querying with no matches gives an empty response
2867+
ws := memdb.NewWatchSet()
2868+
idx, res, err := s.CheckServiceNodes(ws, "service1")
2869+
require.Nil(t, err)
2870+
require.Nil(t, res)
2871+
2872+
// index should be 0 for a non existing service at startup
2873+
require.Equal(t, uint64(0), idx)
2874+
2875+
testRegisterNode(t, s, 0, "node1")
2876+
2877+
// node operations should not affect missing service index
2878+
ensureServiceVersion(t, s, ws, "service1", 0, 0)
2879+
2880+
testRegisterService(t, s, 10, "node1", "service1")
2881+
ensureServiceVersion(t, s, ws, "service1", 10, 1)
28602882

2883+
s.DeleteService(11, "node1", "service1")
2884+
// service1 is now missing, its index is now that of the last index a service was
2885+
// deleted at
2886+
ensureServiceVersion(t, s, ws, "service1", 11, 0)
2887+
2888+
testRegisterService(t, s, 12, "node1", "service2")
2889+
ensureServiceVersion(t, s, ws, "service2", 12, 1)
2890+
2891+
// missing service index does not change even though another service have been
2892+
// registered
2893+
ensureServiceVersion(t, s, ws, "service1", 11, 0)
2894+
ensureServiceVersion(t, s, ws, "i_do_not_exist", 11, 0)
2895+
2896+
// registering a service on another node does not affect missing service
2897+
// index
2898+
testRegisterNode(t, s, 13, "node2")
2899+
testRegisterService(t, s, 14, "node2", "service3")
2900+
ensureServiceVersion(t, s, ws, "service3", 14, 1)
2901+
ensureServiceVersion(t, s, ws, "service1", 11, 0)
2902+
2903+
// unregistering a service bumps missing service index
2904+
s.DeleteService(15, "node2", "service3")
2905+
ensureServiceVersion(t, s, ws, "service3", 15, 0)
2906+
ensureServiceVersion(t, s, ws, "service2", 12, 1)
2907+
ensureServiceVersion(t, s, ws, "service1", 15, 0)
2908+
ensureServiceVersion(t, s, ws, "i_do_not_exist", 15, 0)
2909+
2910+
// registering again a missing service correctly updates its index
2911+
testRegisterService(t, s, 16, "node1", "service1")
2912+
ensureServiceVersion(t, s, ws, "service1", 16, 1)
28612913
}
28622914

28632915
func TestStateStore_CheckServiceNodes(t *testing.T) {

0 commit comments

Comments
 (0)