From 1d3fdf93bbe5002c5023da50402368a817488691 Mon Sep 17 00:00:00 2001 From: Sarah Christoff Date: Wed, 2 Oct 2019 14:35:57 -0500 Subject: [PATCH] Update ForceLeave Prune (#580) * Prune flag on ForceLeave added. Refactor of `reap` function to call to `eraseNode`. --- serf/serf.go | 32 +++-- serf/serf_test.go | 226 ++++++++++++++++++++++++++++++++--- testutil/retry/retry.go | 207 ++++++++++++++++++++++++++++++++ testutil/retry/retry_test.go | 43 +++++++ 4 files changed, 486 insertions(+), 22 deletions(-) create mode 100644 testutil/retry/retry.go create mode 100644 testutil/retry/retry_test.go diff --git a/serf/serf.go b/serf/serf.go index 4460c8f68..9fe4cc3a7 100644 --- a/serf/serf.go +++ b/serf/serf.go @@ -1102,6 +1102,10 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { case StatusAlive: member.Status = StatusLeaving member.statusLTime = leaveMsg.LTime + + if leaveMsg.Prune { + s.handlePrune(member) + } return true case StatusFailed: member.Status = StatusLeft @@ -1127,26 +1131,38 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { } if leaveMsg.Prune { - s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr) - s.leftMembers = removeOldMember(s.leftMembers, member.Name) - s.eraseNode(member) + s.handlePrune(member) } return true - case StatusLeft: + case StatusLeaving, StatusLeft: if leaveMsg.Prune { - s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr) - s.leftMembers = removeOldMember(s.leftMembers, member.Name) - s.eraseNode(member) + s.handlePrune(member) } return true - default: return false } } +// handlePrune waits for nodes that are leaving and then forcibly +// erases a member from the list of members +func (s *Serf) handlePrune(member *memberState) { + if member.Status == StatusLeaving { + time.Sleep(s.config.BroadcastTimeout + s.config.LeavePropagateDelay) + } + + s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr) + + //If we are leaving or left we may be in that list of members + if member.Status == StatusLeaving || member.Status == StatusLeft { + s.leftMembers = removeOldMember(s.leftMembers, member.Name) + } + s.eraseNode(member) + +} + // handleNodeJoinIntent is called when a node broadcasts a // join message to set the lamport time of its join func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool { diff --git a/serf/serf_test.go b/serf/serf_test.go index 08bf50610..a2eb27dba 100644 --- a/serf/serf_test.go +++ b/serf/serf_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/testutil" + "github.com/hashicorp/serf/testutil/retry" ) func testConfig() *Config { @@ -65,6 +66,27 @@ func testMember(t *testing.T, members []Member, name string, status MemberStatus panic(fmt.Sprintf("node not found: %s", name)) } +// testMemberStatus is testMember but returns an error +// instead of failing the test +func testMemberStatus(members []Member, name string, status MemberStatus) error { + for _, m := range members { + if m.Name == name { + if m.Status != status { + return fmt.Errorf("bad state for %s: %d", name, m.Status) + } + return nil + } + } + + if status == StatusNone { + // We didn't expect to find it + return nil + } + + return fmt.Errorf("node not found: %s", name) + +} + func TestCreate_badProtocolVersion(t *testing.T) { cases := []struct { version uint8 @@ -500,7 +522,7 @@ func TestSerf_leaveRejoinDifferentRole(t *testing.T) { t.Fatalf("s1 members: %d", len(s1.Members())) } - var member *Member = nil + var member *Member for _, m := range members { if m.Name == s3Config.NodeName { member = &m @@ -517,6 +539,182 @@ func TestSerf_leaveRejoinDifferentRole(t *testing.T) { } } +func TestSerf_forceLeaveFailed(t *testing.T) { + s1Config := testConfig() + s2Config := testConfig() + s3Config := testConfig() + + s1, err := Create(s1Config) + if err != nil { + t.Fatalf("err: %s", err) + } + defer s1.Shutdown() + + s2, err := Create(s2Config) + if err != nil { + t.Fatalf("err: %s", err) + } + + defer s2.Shutdown() + + s3, err := Create(s3Config) + if err != nil { + t.Fatalf("err: %s", err) + } + + defer s3.Shutdown() + + _, err = s1.Join([]string{s2Config.MemberlistConfig.BindAddr}, false) + if err != nil { + t.Fatalf("err: %s", err) + } + + _, err = s1.Join([]string{s3Config.MemberlistConfig.BindAddr}, false) + if err != nil { + t.Fatalf("err: %s", err) + } + + //Put s2 in failed state + s2.Shutdown() + + retry.Run(t, func(r *retry.R) { + if err := testMemberStatus(s1.Members(), s2Config.NodeName, StatusFailed); err != nil { + r.Fatal(err) + } + }) + s1.forceLeave(s2.config.NodeName, true) + + memberlen := len(s1.Members()) + if memberlen != 2 { + t.Fatalf("wanted 2 alive members, got %v", s1.Members()) + } + +} + +func TestSerf_forceLeaveLeaving(t *testing.T) { + s1Config := testConfig() + s2Config := testConfig() + s3Config := testConfig() + + //make it so it doesn't get reaped + // allow for us to see the leaving state + s1Config.TombstoneTimeout = 1 * time.Hour + s1Config.LeavePropagateDelay = 5 * time.Second + + s2Config.TombstoneTimeout = 1 * time.Hour + s2Config.LeavePropagateDelay = 5 * time.Second + + s3Config.TombstoneTimeout = 1 * time.Hour + s3Config.LeavePropagateDelay = 5 * time.Second + + s1, err := Create(s1Config) + if err != nil { + t.Fatalf("err: %s", err) + } + + defer s1.Shutdown() + + s2, err := Create(s2Config) + if err != nil { + t.Fatalf("err: %s", err) + } + defer s2.Shutdown() + + s3, err := Create(s3Config) + if err != nil { + t.Fatalf("err: %s", err) + } + defer s3.Shutdown() + + _, err = s1.Join([]string{s2Config.MemberlistConfig.BindAddr}, true) + if err != nil { + t.Fatalf("err: %s", err) + } + testutil.Yield() + + _, err = s1.Join([]string{s3Config.MemberlistConfig.BindAddr}, true) + if err != nil { + t.Fatalf("err: %s", err) + } + testutil.Yield() + + //Put s2 in left state + if err := s2.Leave(); err != nil { + t.Fatal(err) + } + + retry.Run(t, func(r *retry.R) { + if err := testMemberStatus(s1.Members(), s2Config.NodeName, 3); err != nil { + r.Fatal(err) + } + }) + s1.forceLeave(s2.config.NodeName, true) + + memberlen := len(s1.Members()) + if memberlen != 2 { + t.Fatalf("wanted 2 alive members, got %v", s1.Members()) + } +} + +func TestSerf_forceLeaveLeft(t *testing.T) { + s1Config := testConfig() + s2Config := testConfig() + s3Config := testConfig() + + //make it so it doesn't get reaped + s1Config.TombstoneTimeout = 1 * time.Hour + s2Config.TombstoneTimeout = 1 * time.Hour + s3Config.TombstoneTimeout = 1 * time.Hour + + s1, err := Create(s1Config) + if err != nil { + t.Fatalf("err: %s", err) + } + defer s1.Shutdown() + + s2, err := Create(s2Config) + if err != nil { + t.Fatalf("err: %s", err) + } + defer s2.Shutdown() + + s3, err := Create(s3Config) + if err != nil { + t.Fatalf("err: %s", err) + } + defer s3.Shutdown() + + _, err = s1.Join([]string{s2Config.MemberlistConfig.BindAddr}, true) + if err != nil { + t.Fatalf("err: %s", err) + } + testutil.Yield() + + _, err = s1.Join([]string{s3Config.MemberlistConfig.BindAddr}, true) + if err != nil { + t.Fatalf("err: %s", err) + } + testutil.Yield() + + //Put s2 in left state + if err := s2.Leave(); err != nil { + t.Fatal(err) + } + + retry.Run(t, func(r *retry.R) { + if err := testMemberStatus(s1.Members(), s2Config.NodeName, StatusLeft); err != nil { + r.Fatal(err) + } + }) + s1.forceLeave(s2.config.NodeName, true) + + memberlen := len(s1.Members()) + if memberlen != 2 { + t.Fatalf("wanted 2 alive members, got %v", s1.Members()) + } + +} + func TestSerf_reconnect(t *testing.T) { eventCh := make(chan Event, 64) s1Config := testConfig() @@ -672,7 +870,7 @@ func TestSerf_update(t *testing.T) { // Add a tag to force an update event, and add a version downgrade as // well (that alone won't trigger an update). - s2Config.ProtocolVersion -= 1 + s2Config.ProtocolVersion-- s2Config.Tags["foo"] = "bar" // We try for a little while to wait for s2 to fully shutdown since the @@ -1475,31 +1673,31 @@ func TestSerf_SetTags(t *testing.T) { // Verify the new tags m1m := s1.Members() - m1m_tags := make(map[string]map[string]string) + m1mTags := make(map[string]map[string]string) for _, m := range m1m { - m1m_tags[m.Name] = m.Tags + m1mTags[m.Name] = m.Tags } - if m := m1m_tags[s1.config.NodeName]; m["port"] != "8000" { - t.Fatalf("bad: %v", m1m_tags) + if m := m1mTags[s1.config.NodeName]; m["port"] != "8000" { + t.Fatalf("bad: %v", m1mTags) } - if m := m1m_tags[s2.config.NodeName]; m["datacenter"] != "east-aws" { - t.Fatalf("bad: %v", m1m_tags) + if m := m1mTags[s2.config.NodeName]; m["datacenter"] != "east-aws" { + t.Fatalf("bad: %v", m1mTags) } m2m := s2.Members() - m2m_tags := make(map[string]map[string]string) + m2mTags := make(map[string]map[string]string) for _, m := range m2m { - m2m_tags[m.Name] = m.Tags + m2mTags[m.Name] = m.Tags } - if m := m2m_tags[s1.config.NodeName]; m["port"] != "8000" { - t.Fatalf("bad: %v", m1m_tags) + if m := m2mTags[s1.config.NodeName]; m["port"] != "8000" { + t.Fatalf("bad: %v", m1mTags) } - if m := m2m_tags[s2.config.NodeName]; m["datacenter"] != "east-aws" { - t.Fatalf("bad: %v", m1m_tags) + if m := m2mTags[s2.config.NodeName]; m["datacenter"] != "east-aws" { + t.Fatalf("bad: %v", m1mTags) } } diff --git a/testutil/retry/retry.go b/testutil/retry/retry.go new file mode 100644 index 000000000..53c05a2b0 --- /dev/null +++ b/testutil/retry/retry.go @@ -0,0 +1,207 @@ +// Package retry provides support for repeating operations in tests. +// +// A sample retry operation looks like this: +// +// func TestX(t *testing.T) { +// retry.Run(t, func(r *retry.R) { +// if err := foo(); err != nil { +// r.Fatal("f: ", err) +// } +// }) +// } +// +package retry + +import ( + "bytes" + "fmt" + "runtime" + "strings" + "sync" + "time" +) + +// Failer is an interface compatible with testing.T. +type Failer interface { + // Log is called for the final test output + Log(args ...interface{}) + + // FailNow is called when the retrying is abandoned. + FailNow() +} + +// R provides context for the retryer. +type R struct { + fail bool + output []string +} + +func (r *R) FailNow() { + r.fail = true + runtime.Goexit() +} + +func (r *R) Fatal(args ...interface{}) { + r.log(fmt.Sprint(args...)) + r.FailNow() +} + +func (r *R) Fatalf(format string, args ...interface{}) { + r.log(fmt.Sprintf(format, args...)) + r.FailNow() +} + +func (r *R) Error(args ...interface{}) { + r.log(fmt.Sprint(args...)) + r.fail = true +} + +func (r *R) Errorf(format string, args ...interface{}) { + r.log(fmt.Sprintf(format, args...)) + r.fail = true +} + +func (r *R) Check(err error) { + if err != nil { + r.log(err.Error()) + r.FailNow() + } +} + +func (r *R) log(s string) { + r.output = append(r.output, decorate(s)) +} + +func decorate(s string) string { + _, file, line, ok := runtime.Caller(3) + if ok { + n := strings.LastIndex(file, "/") + if n >= 0 { + file = file[n+1:] + } + } else { + file = "???" + line = 1 + } + return fmt.Sprintf("%s:%d: %s", file, line, s) +} + +func Run(t Failer, f func(r *R)) { + run(DefaultFailer(), t, f) +} + +func RunWith(r Retryer, t Failer, f func(r *R)) { + run(r, t, f) +} + +func dedup(a []string) string { + if len(a) == 0 { + return "" + } + m := map[string]int{} + for _, s := range a { + m[s] = m[s] + 1 + } + var b bytes.Buffer + for _, s := range a { + if _, ok := m[s]; ok { + b.WriteString(s) + b.WriteRune('\n') + delete(m, s) + } + } + return b.String() +} + +func run(r Retryer, t Failer, f func(r *R)) { + rr := &R{} + fail := func() { + out := dedup(rr.output) + if out != "" { + t.Log(out) + } + t.FailNow() + } + for r.NextOr(fail) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + f(rr) + }() + wg.Wait() + if rr.fail { + rr.fail = false + continue + } + break + } +} + +// DefaultFailer provides default retry.Run() behavior for unit tests. +func DefaultFailer() *Timer { + return &Timer{Timeout: 7 * time.Second, Wait: 25 * time.Millisecond} +} + +// TwoSeconds repeats an operation for two seconds and waits 25ms in between. +func TwoSeconds() *Timer { + return &Timer{Timeout: 2 * time.Second, Wait: 25 * time.Millisecond} +} + +// ThreeTimes repeats an operation three times and waits 25ms in between. +func ThreeTimes() *Counter { + return &Counter{Count: 3, Wait: 25 * time.Millisecond} +} + +// Retryer provides an interface for repeating operations +// until they succeed or an exit condition is met. +type Retryer interface { + // NextOr returns true if the operation should be repeated. + // Otherwise, it calls fail and returns false. + NextOr(fail func()) bool +} + +// Counter repeats an operation a given number of +// times and waits between subsequent operations. +type Counter struct { + Count int + Wait time.Duration + + count int +} + +func (r *Counter) NextOr(fail func()) bool { + if r.count == r.Count { + fail() + return false + } + if r.count > 0 { + time.Sleep(r.Wait) + } + r.count++ + return true +} + +// Timer repeats an operation for a given amount +// of time and waits between subsequent operations. +type Timer struct { + Timeout time.Duration + Wait time.Duration + + // stop is the timeout deadline. + // Set on the first invocation of Next(). + stop time.Time +} + +func (r *Timer) NextOr(fail func()) bool { + if r.stop.IsZero() { + r.stop = time.Now().Add(r.Timeout) + return true + } + if time.Now().After(r.stop) { + fail() + return false + } + time.Sleep(r.Wait) + return true +} diff --git a/testutil/retry/retry_test.go b/testutil/retry/retry_test.go new file mode 100644 index 000000000..db9c4aa84 --- /dev/null +++ b/testutil/retry/retry_test.go @@ -0,0 +1,43 @@ +package retry + +import ( + "testing" + "time" +) + +// delta defines the time band a test run should complete in. +var delta = 25 * time.Millisecond + +func TestRetryer(t *testing.T) { + tests := []struct { + desc string + r Retryer + }{ + {"counter", &Counter{Count: 3, Wait: 100 * time.Millisecond}}, + {"timer", &Timer{Timeout: 200 * time.Millisecond, Wait: 100 * time.Millisecond}}, + } + + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + var iters, fails int + fail := func() { fails++ } + start := time.Now() + for tt.r.NextOr(fail) { + iters++ + } + dur := time.Since(start) + if got, want := iters, 3; got != want { + t.Fatalf("got %d retries want %d", got, want) + } + if got, want := fails, 1; got != want { + t.Fatalf("got %d FailNow calls want %d", got, want) + } + // since the first iteration happens immediately + // the retryer waits only twice for three iterations. + // order of events: (true, (wait) true, (wait) true, false) + if got, want := dur, 200*time.Millisecond; got < (want-delta) || got > (want+delta) { + t.Fatalf("loop took %v want %v (+/- %v)", got, want, delta) + } + }) + } +}