-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
memberlist: Use separate queue for CAS messages and only wait for CAS messages queue to be empty when stopping #539
Changes from 6 commits
3e1e0d3
6924d2f
1c1f876
a892e97
979c031
82bb78a
23449c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -157,7 +157,8 @@ type KVConfig struct { | |
LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` | ||
|
||
// Timeout used when leaving the memberlist cluster. | ||
LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` | ||
LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` | ||
BroadcastTimeoutForLocalUpdatesOnShutdown time.Duration `yaml:"broadcast_timeout_for_local_updates_on_shutdown" category:"advanced"` | ||
|
||
// How much space to use to keep received and sent messages in memory (for troubleshooting). | ||
MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"` | ||
|
@@ -198,6 +199,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { | |
f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") | ||
f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") | ||
f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.") | ||
f.DurationVar(&cfg.BroadcastTimeoutForLocalUpdatesOnShutdown, prefix+"memberlist.broadcast-timeout-for-local-updates-on-shutdown", 10*time.Second, "Timeout for broadcasting all remaining locally-generated updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to locally-generated updates, not to broadcast messages that are result of incoming gossip updates. 0 = no timeout, wait until all locally-generated updates are sent.") | ||
|
||
cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix) | ||
} | ||
|
@@ -231,10 +233,11 @@ type KV struct { | |
// dns discovery provider | ||
provider DNSProvider | ||
|
||
// Protects access to memberlist and broadcasts fields. | ||
delegateReady atomic.Bool | ||
memberlist *memberlist.Memberlist | ||
broadcasts *memberlist.TransmitLimitedQueue | ||
// Protects access to memberlist and broadcast queues. | ||
delegateReady atomic.Bool | ||
memberlist *memberlist.Memberlist | ||
localBroadcasts *memberlist.TransmitLimitedQueue // queue for messages generated locally | ||
gossipBroadcasts *memberlist.TransmitLimitedQueue // queue for messages that we forward from other nodes | ||
|
||
// KV Store. | ||
storeMu sync.Mutex | ||
|
@@ -273,7 +276,8 @@ type KV struct { | |
numberOfPushes prometheus.Counter | ||
totalSizeOfPulls prometheus.Counter | ||
totalSizeOfPushes prometheus.Counter | ||
numberOfBroadcastMessagesInQueue prometheus.GaugeFunc | ||
numberOfGossipMessagesInQueue prometheus.GaugeFunc | ||
numberOfLocalMessagesInQueue prometheus.GaugeFunc | ||
totalSizeOfBroadcastMessagesInQueue prometheus.Gauge | ||
numberOfBroadcastMessagesDropped prometheus.Counter | ||
casAttempts prometheus.Counter | ||
|
@@ -456,7 +460,11 @@ func (m *KV) starting(ctx context.Context) error { | |
} | ||
// Finish delegate initialization. | ||
m.memberlist = list | ||
m.broadcasts = &memberlist.TransmitLimitedQueue{ | ||
m.localBroadcasts = &memberlist.TransmitLimitedQueue{ | ||
NumNodes: list.NumMembers, | ||
RetransmitMult: mlCfg.RetransmitMult, | ||
} | ||
m.gossipBroadcasts = &memberlist.TransmitLimitedQueue{ | ||
NumNodes: list.NumMembers, | ||
RetransmitMult: mlCfg.RetransmitMult, | ||
} | ||
|
@@ -719,20 +727,24 @@ func (m *KV) discoverMembers(ctx context.Context, members []string) []string { | |
func (m *KV) stopping(_ error) error { | ||
level.Info(m.logger).Log("msg", "leaving memberlist cluster") | ||
|
||
// Wait until broadcast queue is empty, but don't wait for too long. | ||
// Wait until queue with locally-generated messages is empty, but don't wait for too long. | ||
// Also don't wait if there is just one node left. | ||
// Problem is that broadcast queue is also filled up by state changes received from other nodes, | ||
// so it may never be empty in a busy cluster. However, we generally only care about messages | ||
// generated on this node via CAS, and those are disabled now (via casBroadcastsEnabled), and should be able | ||
// to get out in this timeout. | ||
// Note: Once we enter Stopping state, we don't queue more locally-generated messages. | ||
|
||
waitTimeout := time.Now().Add(10 * time.Second) | ||
for m.broadcasts.NumQueued() > 0 && m.memberlist.NumMembers() > 1 && time.Now().Before(waitTimeout) { | ||
deadline := time.Now().Add(m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown) | ||
|
||
msgs := m.localBroadcasts.NumQueued() | ||
nodes := m.memberlist.NumMembers() | ||
for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) { | ||
level.Info(m.logger).Log("msg", "waiting for locally-generated broadcast messages to be sent out", "count", msgs, "nodes", nodes) | ||
time.Sleep(250 * time.Millisecond) | ||
|
||
msgs = m.localBroadcasts.NumQueued() | ||
nodes = m.memberlist.NumMembers() | ||
} | ||
|
||
if cnt := m.broadcasts.NumQueued(); cnt > 0 { | ||
level.Warn(m.logger).Log("msg", "broadcast messages left in queue", "count", cnt, "nodes", m.memberlist.NumMembers()) | ||
if msgs > 0 { | ||
level.Warn(m.logger).Log("msg", "locally-generated broadcast messages left the queue", "count", msgs, "nodes", nodes) | ||
} | ||
|
||
err := m.memberlist.Leave(m.cfg.LeaveTimeout) | ||
|
@@ -972,11 +984,7 @@ outer: | |
m.casSuccesses.Inc() | ||
m.notifyWatchers(key) | ||
|
||
if m.State() == services.Running { | ||
m.broadcastNewValue(key, change, newver, codec) | ||
} else { | ||
level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key) | ||
} | ||
m.broadcastNewValue(key, change, newver, codec, true) | ||
} | ||
|
||
return nil | ||
|
@@ -1034,7 +1042,12 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) | |
return change, newver, retry, nil | ||
} | ||
|
||
func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec) { | ||
func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool) { | ||
if locallyGenerated && m.State() != services.Running { | ||
level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key) | ||
return | ||
} | ||
|
||
data, err := codec.Encode(change) | ||
if err != nil { | ||
level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err) | ||
|
@@ -1058,7 +1071,25 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec | |
Changes: change.MergeContent(), | ||
}) | ||
|
||
m.queueBroadcast(key, change.MergeContent(), version, pairData) | ||
l := len(pairData) | ||
b := ringBroadcast{ | ||
key: key, | ||
content: change.MergeContent(), | ||
version: version, | ||
msg: pairData, | ||
finished: func(ringBroadcast) { | ||
m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l)) | ||
}, | ||
logger: m.logger, | ||
} | ||
|
||
m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l)) | ||
|
||
if locallyGenerated { | ||
m.localBroadcasts.QueueBroadcast(b) | ||
} else { | ||
m.gossipBroadcasts.QueueBroadcast(b) | ||
} | ||
} | ||
|
||
// NodeMeta is method from Memberlist Delegate interface | ||
|
@@ -1153,7 +1184,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { | |
m.notifyWatchers(key) | ||
|
||
// Don't resend original message, but only changes. | ||
m.broadcastNewValue(key, mod, version, update.codec) | ||
m.broadcastNewValue(key, mod, version, update.codec, false) | ||
} | ||
|
||
case <-m.shutdown: | ||
|
@@ -1163,32 +1194,25 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { | |
} | ||
} | ||
|
||
func (m *KV) queueBroadcast(key string, content []string, version uint, message []byte) { | ||
l := len(message) | ||
|
||
b := ringBroadcast{ | ||
key: key, | ||
content: content, | ||
version: version, | ||
msg: message, | ||
finished: func(ringBroadcast) { | ||
m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l)) | ||
}, | ||
logger: m.logger, | ||
} | ||
|
||
m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l)) | ||
m.broadcasts.QueueBroadcast(b) | ||
} | ||
|
||
// GetBroadcasts is method from Memberlist Delegate interface | ||
// It returns all pending broadcasts (within the size limit) | ||
func (m *KV) GetBroadcasts(overhead, limit int) [][]byte { | ||
if !m.delegateReady.Load() { | ||
return nil | ||
} | ||
|
||
return m.broadcasts.GetBroadcasts(overhead, limit) | ||
// Prioritize locally-generated messages | ||
msgs := m.localBroadcasts.GetBroadcasts(overhead, limit) | ||
|
||
// Decrease limit for each message we got from locally-generated broadcasts. | ||
for _, m := range msgs { | ||
limit -= overhead + len(m) | ||
} | ||
|
||
if limit > 0 { | ||
msgs = append(msgs, m.gossipBroadcasts.GetBroadcasts(overhead, limit)...) | ||
} | ||
return msgs | ||
} | ||
|
||
// LocalState is method from Memberlist Delegate interface | ||
|
@@ -1335,7 +1359,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { | |
level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) | ||
} else if newver > 0 { | ||
m.notifyWatchers(kvPair.Key) | ||
m.broadcastNewValue(kvPair.Key, change, newver, codec) | ||
m.broadcastNewValue(kvPair.Key, change, newver, codec, false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These messages we're persisting to the local KV that come from external broadcasts are non-CAS by definition? (Only the local mutations are called CAS?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. Currently only CAS operation can modify KV store. There is also Delete operation in KV client interface, but memberlist implementation doesn't support it yet. But we should implement it eventually, so perhaps it would be better to call it "local updates", instead of "cas updates". WDYT? I think I'll rename it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good! CAS is an implementation detail and doesn't really explain that it's a local mod. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've done this change in 979c031. I've also updated |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -718,7 +718,7 @@ func testMultipleClientsWithConfigGenerator(t *testing.T, members int, configGen | |
|
||
startTime := time.Now() | ||
firstKv := clients[0] | ||
ctx, cancel := context.WithTimeout(context.Background(), casInterval*3/2) // Watch for 1.5 cas intervals. | ||
ctx, cancel := context.WithTimeout(context.Background(), casInterval*3) // Watch for 3x cas intervals. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change is unrelated, only done to avoid test flakiness. |
||
updates := 0 | ||
firstKv.WatchKey(ctx, key, func(in interface{}) bool { | ||
updates++ | ||
|
@@ -1647,3 +1647,66 @@ func (p *delayedDNSProviderMock) Resolve(_ context.Context, addrs []string) erro | |
func (p delayedDNSProviderMock) Addresses() []string { | ||
return p.resolved | ||
} | ||
|
||
func TestGetBroadcastsPrefersLocalUpdates(t *testing.T) { | ||
codec := dataCodec{} | ||
|
||
cfg := KVConfig{ | ||
TCPTransport: TCPTransportConfig{ | ||
BindAddrs: getLocalhostAddrs(), | ||
}, | ||
} | ||
|
||
// We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. | ||
cfg.RetransmitMult = 1 | ||
cfg.Codecs = append(cfg.Codecs, codec) | ||
|
||
reg := prometheus.NewRegistry() | ||
kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, reg) | ||
require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) | ||
defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck | ||
|
||
now := time.Now() | ||
|
||
smallUpdate := &data{Members: map[string]member{"a": {Timestamp: now.Unix(), State: JOINING}}} | ||
bigUpdate := &data{Members: map[string]member{"b": {Timestamp: now.Unix(), State: JOINING}, "c": {Timestamp: now.Unix(), State: JOINING}, "d": {Timestamp: now.Unix(), State: JOINING}}} | ||
mediumUpdate := &data{Members: map[string]member{"d": {Timestamp: now.Unix(), State: JOINING}, "e": {Timestamp: now.Unix(), State: JOINING}}} | ||
|
||
// No broadcast messages from KV at the beginning. | ||
require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) | ||
|
||
// Check that locally-generated broadcast messages will be prioritized and sent out first, even if they are enqueued later or are smaller than other messages in the queue. | ||
kv.broadcastNewValue("non-local", smallUpdate, 1, codec, false) | ||
kv.broadcastNewValue("non-local", bigUpdate, 2, codec, false) | ||
kv.broadcastNewValue("local", smallUpdate, 1, codec, true) | ||
kv.broadcastNewValue("local", bigUpdate, 2, codec, true) | ||
kv.broadcastNewValue("local", mediumUpdate, 3, codec, true) | ||
|
||
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` | ||
# HELP memberlist_client_messages_in_broadcast_queue Number of user messages in the broadcast queue | ||
# TYPE memberlist_client_messages_in_broadcast_queue gauge | ||
memberlist_client_messages_in_broadcast_queue{queue="gossip"} 2 | ||
memberlist_client_messages_in_broadcast_queue{queue="local"} 3 | ||
`), "memberlist_client_messages_in_broadcast_queue") | ||
require.NoError(t, err) | ||
|
||
msgs := kv.GetBroadcasts(0, 10000) | ||
require.Len(t, msgs, 5) // we get all 4 messages | ||
require.Equal(t, "local", getKey(t, msgs[0])) | ||
require.Equal(t, "local", getKey(t, msgs[1])) | ||
require.Equal(t, "local", getKey(t, msgs[2])) | ||
require.Equal(t, "non-local", getKey(t, msgs[3])) | ||
require.Equal(t, "non-local", getKey(t, msgs[4])) | ||
|
||
// Check that TransmitLimitedQueue.GetBroadcasts preferred larger messages (it does that). | ||
require.True(t, len(msgs[0]) > len(msgs[1])) // Bigger local message is returned before medium one. | ||
require.True(t, len(msgs[1]) > len(msgs[2])) // Medium local message is returned before small one. | ||
require.True(t, len(msgs[3]) > len(msgs[4])) // Bigger non-local message is returned before smaller one | ||
} | ||
|
||
func getKey(t *testing.T, msg []byte) string { | ||
kvPair := KeyValuePair{} | ||
err := kvPair.Unmarshal(msg) | ||
require.NoError(t, err) | ||
return kvPair.Key | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inlined into
broadcastNewValue