diff --git a/serf/coalesce_member.go b/serf/coalesce_member.go index b966393be..8705a5218 100644 --- a/serf/coalesce_member.go +++ b/serf/coalesce_member.go @@ -3,14 +3,30 @@ package serf -type coalesceEvent struct { +import "reflect" + +// event happens to a node +type nodeEvent struct { Type EventType Member *Member } +func (n *nodeEvent) Equal(m *nodeEvent) bool { + if m == nil { + return false + } + if n.Type != m.Type { + return false + } + if n.Type != EventMemberUpdate { + return true + } + return reflect.DeepEqual(n.Member, m.Member) +} + type memberEventCoalescer struct { - lastEvents map[string]EventType - latestEvents map[string]coalesceEvent + lastEvents map[string]*nodeEvent // the last event happens to a node + newEvents map[string]*nodeEvent // most recent event for a node. } func (c *memberEventCoalescer) Handle(e Event) bool { @@ -33,35 +49,30 @@ func (c *memberEventCoalescer) Handle(e Event) bool { func (c *memberEventCoalescer) Coalesce(raw Event) { e := raw.(MemberEvent) for _, m := range e.Members { - c.latestEvents[m.Name] = coalesceEvent{ + c.newEvents[m.Name] = &nodeEvent{ // overwrite the old events Type: e.Type, Member: &m, } } } - func (c *memberEventCoalescer) Flush(outCh chan<- Event) { // Coalesce the various events we got into a single set of events. events := make(map[EventType]*MemberEvent) - for name, cevent := range c.latestEvents { - previous, ok := c.lastEvents[name] - - // If we sent the same event before, then ignore - // unless it is a MemberUpdate - if ok && previous == cevent.Type && cevent.Type != EventMemberUpdate { + for name, e := range c.newEvents { + if e.Equal(c.lastEvents[name]) { continue } // Update our last event - c.lastEvents[name] = cevent.Type + c.lastEvents[name] = e // Add it to our event - newEvent, ok := events[cevent.Type] + event, ok := events[e.Type] if !ok { - newEvent = &MemberEvent{Type: cevent.Type} - events[cevent.Type] = newEvent + event = &MemberEvent{Type: e.Type} + events[e.Type] = event } - newEvent.Members = append(newEvent.Members, *cevent.Member) + event.Members = append(event.Members, *e.Member) } // Send out those events diff --git a/serf/coalesce_member_test.go b/serf/coalesce_member_test.go index 83fdbcf61..30e64f2bd 100644 --- a/serf/coalesce_member_test.go +++ b/serf/coalesce_member_test.go @@ -16,8 +16,8 @@ func TestMemberEventCoalesce_Basic(t *testing.T) { defer close(shutdownCh) c := &memberEventCoalescer{ - lastEvents: make(map[string]EventType), - latestEvents: make(map[string]coalesceEvent), + lastEvents: make(map[string]*nodeEvent), + newEvents: make(map[string]*nodeEvent), } inCh := coalescedEventCh(outCh, shutdownCh, @@ -125,8 +125,8 @@ func TestMemberEventCoalesce_TagUpdate(t *testing.T) { defer close(shutdownCh) c := &memberEventCoalescer{ - lastEvents: make(map[string]EventType), - latestEvents: make(map[string]coalesceEvent), + lastEvents: make(map[string]*nodeEvent), + newEvents: make(map[string]*nodeEvent), } inCh := coalescedEventCh(outCh, shutdownCh, @@ -186,3 +186,81 @@ func TestMemberEventCoalesce_passThrough(t *testing.T) { } } } + +func TestMemberEventCoalesce_MemberUpdateEvent(t *testing.T) { + outCh := make(chan Event, 64) + shutdownCh := make(chan struct{}) + defer close(shutdownCh) + + c := &memberEventCoalescer{ + lastEvents: make(map[string]*nodeEvent), + newEvents: make(map[string]*nodeEvent), + } + + inCh := coalescedEventCh(outCh, shutdownCh, + 5*time.Millisecond, 5*time.Millisecond, c) + + inCh <- MemberEvent{ + Type: EventMemberUpdate, + Members: []Member{Member{Name: "zip", Tags: map[string]string{"role": "foo"}}}, + } + + time.Sleep(10 * time.Millisecond) + + inCh <- MemberEvent{ + Type: EventMemberUpdate, + Members: []Member{Member{Name: "zip", Tags: map[string]string{"role": "bar"}}}, + } + + time.Sleep(10 * time.Millisecond) + + inCh <- MemberEvent{ + Type: EventMemberUpdate, + Members: []Member{Member{Name: "zip", Tags: map[string]string{"role": "bar"}}}, + } + + events := []Event{} + timeout := time.After(10 * time.Millisecond) + +MEMBEREVENTFORLOOP: + for { + select { + case e := <-outCh: + events = append(events, e) + case <-timeout: // wait until coalescer flush done + break MEMBEREVENTFORLOOP + } + } + + if len(events) != 2 { + t.Fatalf("bad: %#v", events) + } + + e, ok := events[0].(MemberEvent) + if !ok { + t.Fatalf("bad: %#v", e) + } + if len(e.Members) != 1 { + t.Fatalf("bad: %#v", e.Members) + } + if e.Members[0].Name != "zip" { + t.Fatalf("bad: %#v", e.Members) + } + if e.Members[0].Tags["role"] != "foo" { + t.Fatalf("bad %#v", e.Members[0].Tags) + } + + e, ok = events[1].(MemberEvent) + if !ok { + t.Fatalf("bad: %#v", e) + } + if len(e.Members) != 1 { + t.Fatalf("bad: %#v", e.Members) + } + if e.Members[0].Name != "zip" { + t.Fatalf("bad: %#v", e.Members) + } + if e.Members[0].Tags["role"] != "bar" { + t.Fatalf("bad %#v", e.Members[0].Tags) + } +} diff --git a/serf/serf.go b/serf/serf.go index a260c8738..aef1bd376 100644 --- a/serf/serf.go +++ b/serf/serf.go @@ -293,8 +293,8 @@ func Create(conf *Config) (*Serf, error) { // Check if serf member event coalescing is enabled if conf.CoalescePeriod > 0 && conf.QuiescentPeriod > 0 && conf.EventCh != nil { c := &memberEventCoalescer{ - lastEvents: make(map[string]EventType), - latestEvents: make(map[string]coalesceEvent), + lastEvents: make(map[string]*nodeEvent), + newEvents: make(map[string]*nodeEvent), } conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,