Skip to content

Commit

Permalink
tests pass for new election changes
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed May 10, 2019
1 parent 9b2d716 commit d643280
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 89 deletions.
5 changes: 0 additions & 5 deletions cmd/election/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ func main() {
os.Exit(1)
}

/*resp, err := client.Get(context.Background(), "/elections/cli-election", clientv3.WithPrefix())
if err != nil {
fmt.Printf("while creating a new etcd client: %s\n", err)
os.Exit(1)
}*/
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

Expand Down
10 changes: 5 additions & 5 deletions etcdutil/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ services:
-enable-v2=false
ports:
- "22379:22379"
proxy:
image: shopify/toxiproxy:latest
ports:
- "2379:2379"
- "8474:8474"
# proxy:
# image: shopify/toxiproxy:latest
# ports:
# - "2379:2379"
# - "8474:8474"
120 changes: 56 additions & 64 deletions etcdutil/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@ import (
"sync/atomic"
"time"

"github.com/davecgh/go-spew/spew"

etcd "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/mailgun/holster"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type LeaderElector interface {
Expand Down Expand Up @@ -59,40 +56,35 @@ type ElectionConfig struct {
Candidate string
// Seconds to wait before giving up the election if leader disconnected
TTL int64
// Optional logger entry to be used
//Log *logrus.Entry
}

// Use leader election if you have several instances of a service running in production
// and you only want one of the service instances to preform a periodic task.
// NewElection creates a new leader election and submits our candidate for leader.
//
// client, _ := etcdutil.NewClient(nil)
//
// // Start a leader election and attempt to become leader, only returns after
// // determining the current leader.
// election := etcdutil.NewElection(client, etcdutil.ElectionConfig{
// Election: "election-name",
// Candidate: "",
// Election: "presidental",
// Candidate: "donald",
// EventObserver: func(e etcdutil.Event) {
// fmt.Printf("Leader Data: %t\n", e.LeaderData)
// if e.IsLeader {
// // Do thing as leader
// }
// },
// TTL: 5,
// })
//
// // Start the leader election and attempt to become leader
// if err := election.Start(ctx); err != nil {
// panic(err)
// }
//
// // Returns true if we are leader (thread safe)
// if election.IsLeader() {
// // Do periodic thing
// }
//
// select {
// case isLeader := <-election.LeaderChan():
// fmt.Printf("Leader: %t\n", isLeader)
// }
// // Concede the election if leader and cancel our candidacy
// // for the election.
// election.Close()
//
// NOTE: If this instance is elected leader and connection is interrupted to etcd,
// this library will continue to report it is leader until connection to etcd is resumed
// and a new leader is elected. If you wish to lose leadership on disconnect set
// `LoseLeaderOnDisconnect = true`
func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig) (*Election, error) {
if conf.Election == "" {
return nil, errors.New("ElectionConfig.Election can not be empty")
Expand Down Expand Up @@ -151,7 +143,7 @@ func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig)
}

func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) {
logrus.Debugf("Lease ID: %v running: %t err: %v", leaseID, e.isRunning, err)
//logrus.Debugf("Lease ID: %v running: %t err: %v", leaseID, e.isRunning, err)

// If we lost our lease, concede the campaign and stop
if leaseID == NoLease {
Expand All @@ -168,7 +160,7 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) {
}

if e.isRunning {
logrus.Debugf("already running '%v", leaseID)
//logrus.Debugf("already running '%v", leaseID)
return
}

Expand All @@ -178,8 +170,8 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) {
var err error
var rev int64

logrus.Debug("registering")
rev, err = e.enterCampaign(leaseID)
//logrus.Debug("registering")
rev, err = e.registerCampaign(leaseID)
if err != nil {
e.onErr(err, "during campaign registration")
select {
Expand All @@ -191,7 +183,7 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) {
}
e.backOff.Reset()

logrus.Debugf("watching rev %v", rev)
//logrus.Debugf("watching rev %v", rev)
if err := e.watchCampaign(rev); err != nil {
e.onErr(err, "during campaign watch")
select {
Expand All @@ -200,23 +192,23 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) {
case <-done:
}

// If delete takes longer than our TTL then lease is expired
// and we are no longer leader anyway.
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
// Withdraw our candidacy since an error occurred
if err := e.withDrawCampaign(); err != nil {
if err := e.withDrawCampaign(ctx); err != nil {
e.onErr(err, "")
}
cancel()
}
return false
})
}

func (e *Election) withDrawCampaign() error {
logrus.Debugf("withDrawCampaign(%s)", e.key)
// If delete takes longer than our TTL then lease is expired
// and we are no longer leader anyway.
ctx, cancel := context.WithTimeout(e.ctx, e.timeout)
func (e *Election) withDrawCampaign(ctx context.Context) error {
//logrus.Debugf("withDrawCampaign(%s)", e.key)
defer func() {
atomic.StoreInt32(&e.isLeader, 0)
cancel()
}()

_, err := e.client.Delete(ctx, e.key)
Expand All @@ -226,7 +218,7 @@ func (e *Election) withDrawCampaign() error {
return nil
}

func (e *Election) enterCampaign(id etcd.LeaseID) (revision int64, err error) {
func (e *Election) registerCampaign(id etcd.LeaseID) (revision int64, err error) {
// Create an entry under the election prefix with our lease ID as the key name
e.key = fmt.Sprintf("%s%x", e.conf.Election, id)
txn := e.client.Txn(e.ctx).If(etcd.Compare(etcd.CreateRevision(e.key), "=", 0))
Expand Down Expand Up @@ -275,14 +267,14 @@ func (e *Election) watchCampaign(rev int64) error {
return errors.Wrap(err, "while querying for current leader")
}

logrus.Debugf("Current Leader %v", string(leaderKV.Key))
//logrus.Debugf("Current Leader %v", string(leaderKV.Key))

watcher := etcd.NewWatcher(e.client)

// We do this because watcher does not reliably return when errors occur on connect
// or when cancelled (See https://github.com/etcd-io/etcd/pull/10020)
go func() {
logrus.Debugf("Watching Prefix: %s", e.conf.Election)
//logrus.Debugf("watching prefix: %s", e.conf.Election)
watchChan = watcher.Watch(etcd.WithRequireLeader(e.ctx), e.conf.Election,
etcd.WithRev(int64(rev+1)), etcd.WithPrefix())
close(ready)
Expand All @@ -298,7 +290,7 @@ func (e *Election) watchCampaign(rev int64) error {
e.onLeaderChange(leaderKV)

e.wg.Until(func(done chan struct{}) bool {
logrus.Debug("Watching...")
//logrus.Debug("Watching...")
select {
case resp := <-watchChan:
if resp.Canceled {
Expand All @@ -312,17 +304,15 @@ func (e *Election) watchCampaign(rev int64) error {

// Look for changes in leadership
for _, event := range resp.Events {
spew.Printf("Event: %v\n", event)
if event.Type == etcd.EventTypeDelete || event.Type == etcd.EventTypePut {
// Skip events that are about us
if string(event.Kv.Key) == e.key {
continue
}

logrus.Debug("PUT or DELETE")
// If the key is for our current leader
if bytes.Compare(event.Kv.Key, leaderKV.Key) == 0 {
logrus.Debug("Leader Changed")
//logrus.Debug("Leader Changed")
// Check our leadership status
resp, err := e.getLeader(e.ctx)
if err != nil {
Expand All @@ -338,12 +328,18 @@ func (e *Election) watchCampaign(rev int64) error {
}
}
case <-done:
logrus.Debug("done")
//logrus.Debug("done")
watcher.Close()
// If withdraw takes longer than our TTL then lease is expired
// and we are no longer leader anyway.
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)

// Withdraw our candidacy because of shutdown
if err := e.withDrawCampaign(); err != nil {
if err := e.withDrawCampaign(ctx); err != nil {
e.onErr(err, "")
}
e.onLeaderChange(nil)
cancel()
return false
}
return true
Expand All @@ -352,22 +348,22 @@ func (e *Election) watchCampaign(rev int64) error {
}

func (e *Election) onLeaderChange(kv *mvccpb.KeyValue) {
logrus.Debug("onLeaderChange()")
var isLeader bool

if string(kv.Key) == e.key {
atomic.StoreInt32(&e.isLeader, 1)
isLeader = true
} else {
atomic.StoreInt32(&e.isLeader, 0)
//logrus.Debug("onLeaderChange()")
event := Event{}

if kv != nil {
if string(kv.Key) == e.key {
atomic.StoreInt32(&e.isLeader, 1)
event.IsLeader = true
} else {
atomic.StoreInt32(&e.isLeader, 0)
}
event.LeaderKey = string(kv.Key)
event.LeaderData = string(kv.Value)
}

for _, v := range e.observers {
v(Event{
IsLeader: isLeader,
LeaderKey: string(kv.Key),
LeaderData: string(kv.Value),
})
v(event)
}
}

Expand All @@ -394,7 +390,6 @@ func (e *Election) onFatalErr(err error, msg string) {
// Close cancels the election and concedes the election if we are leader
func (e *Election) Close() {
e.session.Close()
//e.cancel()
e.wg.Wait()
}

Expand Down Expand Up @@ -423,11 +418,8 @@ func (e *Election) Concede() (bool, error) {
return isLeader == 1, nil
}

/*
type LeaderElectionMock struct{}
type AlwaysLeaderMock struct{}

func (s *LeaderElectionMock) IsLeader() bool { return true }
func (s *LeaderElectionMock) LeaderChan() chan bool { return nil }
func (s *LeaderElectionMock) Concede() bool { return true }
func (s *LeaderElectionMock) Start() error { return nil }
func (s *LeaderElectionMock) Stop() {}*/
func (s *AlwaysLeaderMock) IsLeader() bool { return true }
func (s *AlwaysLeaderMock) Concede() (bool, error) { return true, nil }
func (s *AlwaysLeaderMock) Close() {}
6 changes: 2 additions & 4 deletions etcdutil/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package etcdutil_test

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -52,7 +51,6 @@ func TestTwoCampaigns(t *testing.T) {
c2Chan := make(chan bool, 5)
c2, err := etcdutil.NewElection(ctx, client, etcdutil.ElectionConfig{
EventObserver: func(e etcdutil.Event) {
fmt.Printf("Observed: %t err: %v\n", e.IsLeader, err)
if err != nil {
t.Fatal(err.Error())
}
Expand All @@ -71,9 +69,9 @@ func TestTwoCampaigns(t *testing.T) {
assert.Equal(t, false, c1.IsLeader())

// Second campaign should become leader
/*assert.Equal(t, false, <-c2Chan)
assert.Equal(t, false, <-c2Chan)
assert.Equal(t, true, <-c2Chan)

c2.Close()
assert.Equal(t, false, <-c2Chan)*/
assert.Equal(t, false, <-c2Chan)
}
14 changes: 4 additions & 10 deletions etcdutil/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package etcdutil

import (
"context"
"io/ioutil"
"sync"
"time"

etcd "github.com/coreos/etcd/clientv3"
"github.com/mailgun/holster"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const NoLease = etcd.LeaseID(-1)
Expand Down Expand Up @@ -41,9 +39,6 @@ type SessionConfig struct {
// as the lease ID. The Session will continue to try to gain another lease, once a new lease
// is gained SessionConfig.Observer is called again with the new lease id.
func NewSession(c *etcd.Client, conf SessionConfig) (*Session, error) {
null := logrus.New()
null.SetOutput(ioutil.Discard)

holster.SetDefault(&conf.TTL, int64(30))

if conf.Observer == nil {
Expand All @@ -62,7 +57,6 @@ func NewSession(c *etcd.Client, conf SessionConfig) (*Session, error) {
client: c,
}

logrus.Debug("New Session")
s.run()
return &s, nil
}
Expand Down Expand Up @@ -91,7 +85,7 @@ func (s *Session) run() {
select {
case _, ok := <-s.keepAlive:
if !ok {
logrus.Warn("heartbeat lost")
//logrus.Warn("heartbeat lost")
s.keepAlive = nil
} else {
//logrus.Debug("heartbeat received")
Expand All @@ -100,7 +94,7 @@ func (s *Session) run() {
case <-ticker.C:
// Ensure we are getting heartbeats regularly
if time.Now().Sub(s.lastKeepAlive) > s.timeout {
logrus.Warn("too long between heartbeats")
//logrus.Warn("too long between heartbeats")
s.keepAlive = nil
}
case <-done:
Expand Down Expand Up @@ -141,7 +135,7 @@ func (s *Session) Close() {
}

func (s *Session) gainLease(ctx context.Context) error {
logrus.Debug("attempting to grant new lease")
//logrus.Debug("attempting to grant new lease")
lease, err := s.client.Grant(ctx, s.conf.TTL)
if err != nil {
return errors.Wrapf(err, "during grant lease")
Expand All @@ -151,7 +145,7 @@ func (s *Session) gainLease(ctx context.Context) error {
if err != nil {
return err
}
logrus.Debugf("new lease %d", lease.ID)
//logrus.Debugf("new lease %d", lease.ID)
s.conf.Observer(lease.ID, nil)
return nil
}
Loading

0 comments on commit d643280

Please sign in to comment.