Skip to content

Commit b5df789

Browse files
committed
Remove CAS/WatchKey layers from notification delay tests.
1 parent 8e2a4a0 commit b5df789

File tree

2 files changed

+94
-114
lines changed

2 files changed

+94
-114
lines changed

kv/memberlist/memberlist_client.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ func (m *KV) running(ctx context.Context) error {
497497
// Start delayed key notifications.
498498
notifTicker := time.NewTicker(m.cfg.NotifyInterval)
499499
defer notifTicker.Stop()
500-
go m.sendKeyNotifications(ctx, notifTicker.C)
500+
go m.monitorKeyNotifications(ctx, notifTicker.C)
501501
}
502502

503503
var tickerChan <-chan time.Time
@@ -932,14 +932,26 @@ func (m *KV) notifyWatchers(key string) {
932932
m.keyNotifications[key] = struct{}{}
933933
}
934934

935-
// sendKeyNotifications sends accumulated notifications to all watchers of
935+
// monitorKeyNotifications sends accumulated notifications to all watchers of
936936
// respective keys when the given channel ticks.
937-
func (m *KV) sendKeyNotifications(ctx context.Context, tickChan <-chan time.Time) {
937+
func (m *KV) monitorKeyNotifications(ctx context.Context, tickChan <-chan time.Time) {
938938
if m.cfg.NotifyInterval <= 0 {
939939
panic("sendNotifications called with NotifyInterval <= 0")
940940
}
941941

942+
for {
943+
select {
944+
case <-tickChan:
945+
m.sendKeyNotifications()
946+
case <-ctx.Done():
947+
return
948+
}
949+
}
950+
}
951+
952+
func (m *KV) sendKeyNotifications() {
942953
newNotifs := func() map[string]struct{} {
954+
// Grab and clear accumulated notifications.
943955
m.notifMu.Lock()
944956
defer m.notifMu.Unlock()
945957

@@ -952,15 +964,8 @@ func (m *KV) sendKeyNotifications(ctx context.Context, tickChan <-chan time.Time
952964
return notifs
953965
}
954966

955-
for {
956-
select {
957-
case <-tickChan:
958-
for key := range newNotifs() {
959-
m.notifyWatchersSync(key)
960-
}
961-
case <-ctx.Done():
962-
return
963-
}
967+
for key := range newNotifs() {
968+
m.notifyWatchersSync(key)
964969
}
965970
}
966971

kv/memberlist/memberlist_client_test.go

Lines changed: 77 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1801,116 +1801,91 @@ func TestNotificationDelay(t *testing.T) {
18011801
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), kv))
18021802
})
18031803

1804-
cli, err := NewClient(kv, codec)
1805-
require.NoError(t, err)
1806-
1807-
casInterval := 1 * time.Second
1808-
1809-
ctx, cancel := context.WithCancel(context.Background())
1810-
defer cancel()
1811-
1812-
// Arrange to do our own ticking.
1813-
tick := make(chan time.Time)
1814-
go kv.sendKeyNotifications(ctx, tick)
1804+
watchChan := make(chan string, 16)
18151805

1816-
// Mirror any WatchKey updates to our own map and verify that they
1817-
// eventually arrive.
1806+
// Add ourselves as a watcher.
1807+
kv.watchersMu.Lock()
1808+
kv.watchers["foo_123"] = append(kv.watchers["foo_123"], watchChan)
1809+
kv.watchers["foo_124"] = append(kv.watchers["foo_124"], watchChan)
1810+
kv.watchersMu.Unlock()
18181811

1819-
dbMu := sync.Mutex{}
1820-
db := make(map[string]*data)
1821-
keyCalls := make(map[string]int)
1812+
defer func() {
1813+
kv.watchersMu.Lock()
1814+
removeWatcherChannel("foo_123", watchChan, kv.watchers)
1815+
removeWatcherChannel("foo_124", watchChan, kv.watchers)
1816+
kv.watchersMu.Unlock()
1817+
}()
18221818

1823-
setData := func(key string, d *data) {
1824-
dbMu.Lock()
1825-
defer dbMu.Unlock()
1826-
db[key] = d
1827-
keyCalls[key]++
1828-
}
1829-
getData := func(key string) *data {
1830-
dbMu.Lock()
1831-
defer dbMu.Unlock()
1832-
return db[key]
1833-
}
1834-
callsForKey := func(key string) int {
1835-
dbMu.Lock()
1836-
defer dbMu.Unlock()
1837-
return keyCalls[key]
1838-
}
1839-
verifyVal := func(k, v string) bool {
1840-
d := getData(k)
1841-
if d == nil {
1842-
return false
1819+
verifyNotifs := func(expected map[string]int, comment string) {
1820+
observed := make(map[string]int, len(expected))
1821+
for kk := range expected {
1822+
observed[kk] = 0
1823+
}
1824+
loop:
1825+
for {
1826+
select {
1827+
case k := <-watchChan:
1828+
observed[k]++
1829+
default:
1830+
break loop
1831+
}
18431832
}
1844-
_, ok := d.Members[v]
1845-
return ok
1833+
require.Equal(t, expected, observed, comment)
18461834
}
18471835

1848-
watchData := func(key string) {
1849-
go cli.WatchKey(ctx, key, func(in any) bool {
1850-
setData(key, in.(*data))
1851-
return true
1852-
})
1836+
drainChan := func() {
1837+
for {
1838+
select {
1839+
case <-watchChan:
1840+
default:
1841+
return
1842+
}
1843+
}
18531844
}
18541845

1855-
watchData("foo_123")
1856-
watchData("foo_124")
1857-
1858-
assert.Equal(t, 0, callsForKey("foo_123"))
1859-
assert.Equal(t, 0, callsForKey("foo_124"))
1860-
1861-
err = cas(cli, "foo_123", updateFn("val1"))
1862-
require.NoError(t, err)
1863-
1864-
tick <- time.Now()
1865-
1866-
require.Eventually(t, func() bool {
1867-
return verifyVal("foo_123", "val1")
1868-
}, 3*casInterval, 25*time.Millisecond)
1869-
1870-
assert.Equal(t, 1, callsForKey("foo_123"))
1871-
assert.Equal(t, 0, callsForKey("foo_124"))
1846+
kv.notifyWatchers("foo_123")
1847+
kv.sendKeyNotifications()
1848+
verifyNotifs(map[string]int{"foo_123": 1}, "1 change 1 notification")
18721849

18731850
// Test coalescing of updates.
1874-
err = cas(cli, "foo_123", updateFn("val2"))
1875-
require.NoError(t, err)
1876-
err = cas(cli, "foo_123", updateFn("val3"))
1877-
require.NoError(t, err)
1878-
err = cas(cli, "foo_123", updateFn("val4"))
1879-
require.NoError(t, err)
1880-
1881-
assert.Equal(t, 1, callsForKey("foo_123"), "no flush -> no callback")
1882-
assert.Equal(t, 0, callsForKey("foo_124"), "no flush -> no callback")
1883-
1884-
tick <- time.Now()
1885-
1886-
require.Eventually(t, func() bool {
1887-
return verifyVal("foo_123", "val4")
1888-
}, 3*casInterval, 25*time.Millisecond, "multiple updates should be coalesced into the last one")
1889-
1890-
assert.Equal(t, 2, callsForKey("foo_123"))
1891-
assert.Equal(t, 0, callsForKey("foo_124"))
1892-
1893-
err = cas(cli, "foo_123", updateFn("val100"))
1894-
require.NoError(t, err)
1895-
err = cas(cli, "foo_124", updateFn("val101"))
1896-
require.NoError(t, err)
1897-
1898-
tick <- time.Now()
1899-
1900-
require.Eventually(t, func() bool {
1901-
return verifyVal("foo_123", "val100") && verifyVal("foo_124", "val101")
1902-
}, 3*casInterval, 25*time.Millisecond)
1903-
1904-
assert.Equal(t, 3, callsForKey("foo_123"))
1905-
assert.Equal(t, 1, callsForKey("foo_124"))
1906-
1907-
require.NotPanics(t, func() {
1908-
tick <- time.Now()
1909-
tick <- time.Now()
1910-
tick <- time.Now()
1911-
}, "shouldn't panic or anything like that when ticked without updates")
1912-
1913-
time.Sleep(100 * time.Millisecond)
1914-
assert.Equal(t, 3, callsForKey("foo_123"))
1915-
assert.Equal(t, 1, callsForKey("foo_124"))
1851+
drainChan()
1852+
verifyNotifs(map[string]int{"foo_123": 0}, "chan drained")
1853+
kv.notifyWatchers("foo_123")
1854+
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
1855+
kv.notifyWatchers("foo_123")
1856+
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
1857+
kv.notifyWatchers("foo_123")
1858+
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
1859+
kv.notifyWatchers("foo_123")
1860+
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
1861+
kv.notifyWatchers("foo_123")
1862+
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
1863+
kv.notifyWatchers("foo_123")
1864+
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
1865+
kv.sendKeyNotifications()
1866+
verifyNotifs(map[string]int{"foo_123": 1}, "flush should coalesce updates")
1867+
1868+
// multiple buffered updates
1869+
drainChan()
1870+
verifyNotifs(map[string]int{"foo_123": 0}, "chan drained")
1871+
kv.notifyWatchers("foo_123")
1872+
kv.sendKeyNotifications()
1873+
kv.notifyWatchers("foo_123")
1874+
kv.sendKeyNotifications()
1875+
verifyNotifs(map[string]int{"foo_123": 2}, "two buffered updates")
1876+
1877+
// multiple keys
1878+
drainChan()
1879+
kv.notifyWatchers("foo_123")
1880+
kv.notifyWatchers("foo_124")
1881+
kv.sendKeyNotifications()
1882+
verifyNotifs(map[string]int{"foo_123": 1, "foo_124": 1}, "2 changes 2 notifications")
1883+
kv.sendKeyNotifications()
1884+
verifyNotifs(map[string]int{"foo_123": 0, "foo_124": 0}, "no new notifications")
1885+
1886+
// and finally, sendKeyNotifications can be called repeatedly without new updates.
1887+
kv.sendKeyNotifications()
1888+
kv.sendKeyNotifications()
1889+
kv.sendKeyNotifications()
1890+
kv.sendKeyNotifications()
19161891
}

0 commit comments

Comments
 (0)