Skip to content

Commit a7ab766

Browse files
authored
Merge pull request #19249 from fuweid/35-fix-19179
[3.5] mvcc: restore tombstone index if it's first revision
2 parents eade1fa + c6fcd27 commit a7ab766

File tree

5 files changed

+156
-5
lines changed

5 files changed

+156
-5
lines changed

server/mvcc/key_index.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int6
119119
keysGauge.Inc()
120120
}
121121

122+
// restoreTombstone is used to restore a tombstone revision, which is the only
123+
// revision so far for a key. We don't know the creating revision (i.e. already
124+
// compacted) of the key, so set it empty.
125+
func (ki *keyIndex) restoreTombstone(lg *zap.Logger, main, sub int64) {
126+
ki.restore(lg, revision{}, revision{main, sub}, 1)
127+
ki.generations = append(ki.generations, generation{})
128+
keysGauge.Dec()
129+
}
130+
122131
// tombstone puts a revision, pointing to a tombstone, to the keyIndex.
123132
// It also creates a new empty generation in the keyIndex.
124133
// It returns ErrRevisionNotFound when tombstone on an empty generation.

server/mvcc/key_index_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,51 @@ import (
1919
"testing"
2020

2121
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
2223
"go.uber.org/zap"
24+
"go.uber.org/zap/zaptest"
2325
)
2426

27+
func TestRestoreTombstone(t *testing.T) {
28+
lg := zaptest.NewLogger(t)
29+
30+
// restore from tombstone
31+
//
32+
// key: "foo"
33+
// modified: 16
34+
// "created": 16
35+
// generations:
36+
// {empty}
37+
// {{16, 0}(t)[0]}
38+
//
39+
ki := &keyIndex{key: []byte("foo")}
40+
ki.restoreTombstone(lg, 16, 0)
41+
42+
// get should return not found
43+
for retAt := 16; retAt <= 20; retAt++ {
44+
_, _, _, err := ki.get(lg, int64(retAt))
45+
require.ErrorIs(t, err, ErrRevisionNotFound)
46+
}
47+
48+
// doCompact should keep that tombstone
49+
availables := map[revision]struct{}{}
50+
ki.doCompact(16, availables)
51+
require.Len(t, availables, 1)
52+
_, ok := availables[revision{main: 16}]
53+
require.True(t, ok)
54+
55+
// should be able to put new revisions
56+
ki.put(lg, 17, 0)
57+
ki.put(lg, 18, 0)
58+
revs := ki.since(lg, 16)
59+
require.Equal(t, []revision{{16, 0}, {17, 0}, {18, 0}}, revs)
60+
61+
// compaction should remove restored tombstone
62+
ki.compact(lg, 17, map[revision]struct{}{})
63+
require.Len(t, ki.generations, 1)
64+
require.Equal(t, []revision{{17, 0}, {18, 0}}, ki.generations[0].revs)
65+
}
66+
2567
func TestKeyIndexGet(t *testing.T) {
2668
// key: "foo"
2769
// rev: 16

server/mvcc/kv_test.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ import (
3131

3232
"github.com/prometheus/client_golang/prometheus"
3333
dto "github.com/prometheus/client_model/go"
34+
"github.com/stretchr/testify/assert"
3435
"go.uber.org/zap"
36+
"go.uber.org/zap/zaptest"
3537
)
3638

3739
// Functional tests for features implemented in v3 store. It treats v3 store
@@ -620,6 +622,8 @@ func TestKVHash(t *testing.T) {
620622
}
621623

622624
func TestKVRestore(t *testing.T) {
625+
compactBatchLimit := 5
626+
623627
tests := []func(kv KV){
624628
func(kv KV) {
625629
kv.Put([]byte("foo"), []byte("bar0"), 1)
@@ -637,10 +641,23 @@ func TestKVRestore(t *testing.T) {
637641
kv.Put([]byte("foo"), []byte("bar1"), 2)
638642
kv.Compact(traceutil.TODO(), 1)
639643
},
644+
func(kv KV) { // after restore, foo1 key only has tombstone revision
645+
kv.Put([]byte("foo1"), []byte("bar1"), 0)
646+
kv.Put([]byte("foo2"), []byte("bar2"), 0)
647+
kv.Put([]byte("foo3"), []byte("bar3"), 0)
648+
kv.Put([]byte("foo4"), []byte("bar4"), 0)
649+
kv.Put([]byte("foo5"), []byte("bar5"), 0)
650+
_, delAtRev := kv.DeleteRange([]byte("foo1"), nil)
651+
assert.Equal(t, int64(7), delAtRev)
652+
653+
// after compaction and restore, foo1 key only has tombstone revision
654+
ch, _ := kv.Compact(traceutil.TODO(), delAtRev)
655+
<-ch
656+
},
640657
}
641658
for i, tt := range tests {
642659
b, tmpPath := betesting.NewDefaultTmpBackend(t)
643-
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
660+
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit})
644661
tt(s)
645662
var kvss [][]mvccpb.KeyValue
646663
for k := int64(0); k < 10; k++ {
@@ -651,8 +668,8 @@ func TestKVRestore(t *testing.T) {
651668
keysBefore := readGaugeInt(keysGauge)
652669
s.Close()
653670

654-
// ns should recover the the previous state from backend.
655-
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
671+
// ns should recover the previous state from backend.
672+
ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit})
656673

657674
if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {
658675
t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)

server/mvcc/kvstore.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,8 +484,12 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int
484484
continue
485485
}
486486
ki.put(lg, rev.main, rev.sub)
487-
} else if !isTombstone(rkv.key) {
488-
ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
487+
} else {
488+
if isTombstone(rkv.key) {
489+
ki.restoreTombstone(lg, rev.main, rev.sub)
490+
} else {
491+
ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
492+
}
489493
idx.Insert(ki)
490494
kiCache[rkv.kstr] = ki
491495
}

tests/e2e/watch_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,3 +482,82 @@ func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombsto
482482
}
483483
}
484484
}
485+
486+
// TestResumeCompactionOnTombstone verifies whether a deletion event is preserved
487+
// when etcd restarts and resumes compaction on a key that only has a tombstone revision.
488+
func TestResumeCompactionOnTombstone(t *testing.T) {
489+
e2e.BeforeTest(t)
490+
491+
ctx := context.Background()
492+
compactBatchLimit := 5
493+
494+
cfg := e2e.EtcdProcessClusterConfig{
495+
GoFailEnabled: true,
496+
ClusterSize: 1,
497+
IsClientAutoTLS: true,
498+
ClientTLS: e2e.ClientTLS,
499+
CompactionBatchLimit: compactBatchLimit,
500+
WatchProcessNotifyInterval: 100 * time.Millisecond,
501+
}
502+
clus, err := e2e.NewEtcdProcessCluster(t, &cfg)
503+
require.NoError(t, err)
504+
defer clus.Close()
505+
506+
c1 := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS)
507+
defer c1.Close()
508+
509+
keyPrefix := "/key-"
510+
for i := 0; i < compactBatchLimit; i++ {
511+
key := fmt.Sprintf("%s%d", keyPrefix, i)
512+
value := fmt.Sprintf("%d", i)
513+
514+
t.Logf("PUT key=%s, val=%s", key, value)
515+
_, err = c1.KV.Put(ctx, key, value)
516+
require.NoError(t, err)
517+
}
518+
519+
firstKey := keyPrefix + "0"
520+
t.Logf("DELETE key=%s", firstKey)
521+
deleteResp, err := c1.KV.Delete(ctx, firstKey)
522+
require.NoError(t, err)
523+
524+
var deleteEvent *clientv3.Event
525+
select {
526+
case watchResp := <-c1.Watch(ctx, firstKey, clientv3.WithRev(deleteResp.Header.Revision)):
527+
require.Len(t, watchResp.Events, 1)
528+
529+
require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type)
530+
deletedKey := string(watchResp.Events[0].Kv.Key)
531+
require.Equal(t, firstKey, deletedKey)
532+
533+
deleteEvent = watchResp.Events[0]
534+
case <-time.After(100 * time.Millisecond):
535+
t.Fatal("timed out getting watch response")
536+
}
537+
538+
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "compactBeforeSetFinishedCompact", `panic`))
539+
540+
t.Logf("COMPACT rev=%d", deleteResp.Header.Revision)
541+
_, err = c1.KV.Compact(ctx, deleteResp.Header.Revision, clientv3.WithCompactPhysical())
542+
require.Error(t, err)
543+
544+
require.Error(t, clus.Procs[0].Stop())
545+
// NOTE: The proc panics and exit code is 2. It's impossible to restart
546+
// that etcd proc because last exit code is 2 and Restart() refuses to
547+
// start new one. Using IsRunning() function is to cleanup status.
548+
require.False(t, clus.Procs[0].IsRunning())
549+
require.NoError(t, clus.Restart())
550+
551+
c2 := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS)
552+
defer c2.Close()
553+
554+
watchChan := c2.Watch(ctx, firstKey, clientv3.WithRev(deleteResp.Header.Revision))
555+
select {
556+
case watchResp := <-watchChan:
557+
require.Equal(t, []*clientv3.Event{deleteEvent}, watchResp.Events)
558+
case <-time.After(100 * time.Millisecond):
559+
// we care only about the first response, but have an
560+
// escape hatch in case the watch response is delayed.
561+
t.Fatal("timed out getting watch response")
562+
}
563+
}

0 commit comments

Comments
 (0)