Skip to content

Commit 570ed7a

Browse files
committed
Fixes versions memberlist merging.
1 parent 9826831 commit 570ed7a

File tree

4 files changed

+127
-64
lines changed

4 files changed

+127
-64
lines changed

api/gen/proto/go/version/v1/version.pb.go

Lines changed: 46 additions & 36 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/gen/proto/go/version/v1/version_vtproto.pb.go

Lines changed: 37 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/version/v1/version.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ message InstanceVersion {
2121

2222
// Querier Service API version
2323
uint64 QuerierAPI = 4;
24+
25+
// Tells if the instance is running or has left cluster.
26+
bool left = 5;
2427
}
2528

2629
// Versions is the top-level type used to model version for all instances, containing information for individual instances.

pkg/api/version/version.go

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (v *Versions) Marshal() ([]byte, error) {
6363

6464
// Merge merges two versions. This is used when CASing or merging versions from other nodes.
6565
// v is the local version and should be mutated to include the changes from incoming.
66-
// The returned value is the change to broadcast, in our case they are similar.
66+
// The returned value is the change to broadcast.
6767
func (v *Versions) Merge(incoming memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error) {
6868
if incoming == nil {
6969
return nil, nil
@@ -87,39 +87,41 @@ func (v *Versions) Merge(incoming memberlist.Mergeable, localCAS bool) (memberli
8787
if v.Instances == nil {
8888
v.Instances = make(map[string]*versionv1.InstanceVersion)
8989
}
90-
change := false
91-
// Delete all the instances that are not in the other.
92-
missing := []string{}
93-
for k := range v.Instances {
94-
if _, ok := other.Instances[k]; !ok {
95-
missing = append(missing, k)
96-
}
97-
}
98-
for _, k := range missing {
99-
change = true
100-
delete(v.Instances, k)
101-
}
90+
var updated []string
10291

10392
// Copy over all the instances with newer timestamps.
10493
for k, new := range other.Instances {
10594
current, ok := v.Instances[k]
106-
if !ok {
95+
if !ok || new.Timestamp > current.Timestamp {
10796
v.Instances[k] = new.CloneVT()
108-
change = true
109-
continue
110-
}
111-
if current.EqualVT(new) {
112-
continue
113-
}
114-
if new.Timestamp > current.Timestamp {
97+
updated = append(updated, k)
98+
} else if new.Timestamp == current.Timestamp && !current.Left && new.Left {
11599
v.Instances[k] = new.CloneVT()
116-
change = true
100+
updated = append(updated, k)
117101
}
102+
118103
}
119-
if !change {
104+
105+
if localCAS {
106+
// Mark left all the instances that are not in the other.
107+
for k, current := range v.Instances {
108+
if _, ok := other.Instances[k]; !ok && !current.Left {
109+
current.Left = true
110+
current.Timestamp = time.Now().UnixNano()
111+
updated = append(updated, k)
112+
}
113+
}
114+
}
115+
// No updated members, no need to broadcast.
116+
if len(updated) == 0 {
120117
return nil, nil
121118
}
122-
return v, nil
119+
// Return the changes to broadcast.
120+
changes := newVersions().(*Versions)
121+
for _, k := range updated {
122+
changes.Instances[k] = v.Instances[k].CloneVT()
123+
}
124+
return changes, nil
123125
}
124126

125127
// MergeContent describes content of this Mergeable.
@@ -133,14 +135,25 @@ func (d *Versions) MergeContent() []string {
133135
}
134136

135137
// RemoveTombstones is not required for version keys.
136-
func (c *Versions) RemoveTombstones(limit time.Time) (total, removed int) {
137-
return 0, 0
138+
func (v *Versions) RemoveTombstones(limit time.Time) (total, removed int) {
139+
for n, inst := range v.Instances {
140+
if inst.Left {
141+
if limit.IsZero() || time.Unix(inst.Timestamp, 0).Before(limit) {
142+
// remove it
143+
delete(v.Instances, n)
144+
removed++
145+
} else {
146+
total++
147+
}
148+
}
149+
}
150+
return
138151
}
139152

140153
// Implements memberlist.Mergeable.
141-
func (c *Versions) Clone() memberlist.Mergeable {
154+
func (v *Versions) Clone() memberlist.Mergeable {
142155
return &Versions{
143-
Versions: c.CloneVT(),
156+
Versions: v.CloneVT(),
144157
}
145158
}
146159

0 commit comments

Comments
 (0)