Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve readability of member event coalescer #739

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions serf/coalesce_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,30 @@

package serf

type coalesceEvent struct {
import "reflect"

// event happens to a node
type nodeEvent struct {
Type EventType
Member *Member
}

func (n *nodeEvent) Equal(m *nodeEvent) bool {
if m == nil {
return false
}
if n.Type != m.Type {
return false
}
if n.Type != EventMemberUpdate {
return true
}
return reflect.DeepEqual(n.Member, m.Member)
}

type memberEventCoalescer struct {
lastEvents map[string]EventType
latestEvents map[string]coalesceEvent
lastEvents map[string]*nodeEvent // the last event happens to a node
newEvents map[string]*nodeEvent // most recent event for a node.
}

func (c *memberEventCoalescer) Handle(e Event) bool {
Expand All @@ -33,35 +49,30 @@ func (c *memberEventCoalescer) Handle(e Event) bool {
func (c *memberEventCoalescer) Coalesce(raw Event) {
e := raw.(MemberEvent)
for _, m := range e.Members {
c.latestEvents[m.Name] = coalesceEvent{
c.newEvents[m.Name] = &nodeEvent{ // overwrite the old events
Type: e.Type,
Member: &m,
}
}
}

func (c *memberEventCoalescer) Flush(outCh chan<- Event) {
// Coalesce the various events we got into a single set of events.
events := make(map[EventType]*MemberEvent)
for name, cevent := range c.latestEvents {
previous, ok := c.lastEvents[name]

// If we sent the same event before, then ignore
// unless it is a MemberUpdate
if ok && previous == cevent.Type && cevent.Type != EventMemberUpdate {
for name, e := range c.newEvents {
if e.Equal(c.lastEvents[name]) {
continue
}

// Update our last event
c.lastEvents[name] = cevent.Type
c.lastEvents[name] = e

// Add it to our event
newEvent, ok := events[cevent.Type]
event, ok := events[e.Type]
if !ok {
newEvent = &MemberEvent{Type: cevent.Type}
events[cevent.Type] = newEvent
event = &MemberEvent{Type: e.Type}
events[e.Type] = event
}
newEvent.Members = append(newEvent.Members, *cevent.Member)
event.Members = append(event.Members, *e.Member)
}

// Send out those events
Expand Down
86 changes: 82 additions & 4 deletions serf/coalesce_member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func TestMemberEventCoalesce_Basic(t *testing.T) {
defer close(shutdownCh)

c := &memberEventCoalescer{
lastEvents: make(map[string]EventType),
latestEvents: make(map[string]coalesceEvent),
lastEvents: make(map[string]*nodeEvent),
newEvents: make(map[string]*nodeEvent),
}

inCh := coalescedEventCh(outCh, shutdownCh,
Expand Down Expand Up @@ -125,8 +125,8 @@ func TestMemberEventCoalesce_TagUpdate(t *testing.T) {
defer close(shutdownCh)

c := &memberEventCoalescer{
lastEvents: make(map[string]EventType),
latestEvents: make(map[string]coalesceEvent),
lastEvents: make(map[string]*nodeEvent),
newEvents: make(map[string]*nodeEvent),
}

inCh := coalescedEventCh(outCh, shutdownCh,
Expand Down Expand Up @@ -186,3 +186,81 @@ func TestMemberEventCoalesce_passThrough(t *testing.T) {
}
}
}

func TestMemberEventCoalesce_MemberUpdateEvent(t *testing.T) {
outCh := make(chan Event, 64)
shutdownCh := make(chan struct{})
defer close(shutdownCh)

c := &memberEventCoalescer{
lastEvents: make(map[string]*nodeEvent),
newEvents: make(map[string]*nodeEvent),
}

inCh := coalescedEventCh(outCh, shutdownCh,
5*time.Millisecond, 5*time.Millisecond, c)

inCh <- MemberEvent{
Type: EventMemberUpdate,
Members: []Member{Member{Name: "zip", Tags: map[string]string{"role": "foo"}}},
}

time.Sleep(10 * time.Millisecond)

inCh <- MemberEvent{
Type: EventMemberUpdate,
Members: []Member{Member{Name: "zip", Tags: map[string]string{"role": "bar"}}},
}

time.Sleep(10 * time.Millisecond)

inCh <- MemberEvent{
Type: EventMemberUpdate,
Members: []Member{Member{Name: "zip", Tags: map[string]string{"role": "bar"}}},
}

events := []Event{}
timeout := time.After(10 * time.Millisecond)

MEMBEREVENTFORLOOP:
for {
select {
case e := <-outCh:
events = append(events, e)
case <-timeout: // wait until coalescer flush done
break MEMBEREVENTFORLOOP
}
}

if len(events) != 2 {
t.Fatalf("bad: %#v", events)
}

e, ok := events[0].(MemberEvent)
if !ok {
t.Fatalf("bad: %#v", e)
}
if len(e.Members) != 1 {
t.Fatalf("bad: %#v", e.Members)
}
if e.Members[0].Name != "zip" {
t.Fatalf("bad: %#v", e.Members)
}
if e.Members[0].Tags["role"] != "foo" {
t.Fatalf("bad %#v", e.Members[0].Tags)
}

e, ok = events[1].(MemberEvent)
if !ok {
t.Fatalf("bad: %#v", e)
}
if len(e.Members) != 1 {
t.Fatalf("bad: %#v", e.Members)
}
if e.Members[0].Name != "zip" {
t.Fatalf("bad: %#v", e.Members)
}
if e.Members[0].Tags["role"] != "bar" {
t.Fatalf("bad %#v", e.Members[0].Tags)
}
}
4 changes: 2 additions & 2 deletions serf/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ func Create(conf *Config) (*Serf, error) {
// Check if serf member event coalescing is enabled
if conf.CoalescePeriod > 0 && conf.QuiescentPeriod > 0 && conf.EventCh != nil {
c := &memberEventCoalescer{
lastEvents: make(map[string]EventType),
latestEvents: make(map[string]coalesceEvent),
lastEvents: make(map[string]*nodeEvent),
newEvents: make(map[string]*nodeEvent),
}

conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,
Expand Down