Skip to content

Commit

Permalink
Merge pull request #63 from mailgun/maxim/develop
Browse files Browse the repository at this point in the history
PIP-683: Add NewElectionAsync
  • Loading branch information
horkhe authored Nov 29, 2019
2 parents 8bdc723 + d0a4bed commit 2296d2f
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 94 deletions.
160 changes: 90 additions & 70 deletions v3/etcdutil/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type LeaderElector interface {

var _ LeaderElector = &Election{}

type Event struct {
type ElectionEvent struct {
// True if our candidate is leader
IsLeader bool
// True if the election is shutdown and
Expand All @@ -39,16 +39,20 @@ type Event struct {
Err error
}

type EventObserver func(Event)
// Deprecated, use ElectionEvent instead
type Event = ElectionEvent

type EventObserver func(ElectionEvent)

type Election struct {
observers map[string]EventObserver
observer EventObserver
election string
candidate string
backOff *backOffCounter
cancel context.CancelFunc
wg syncutil.WaitGroup
ctx context.Context
conf ElectionConfig
timeout time.Duration
ttl time.Duration
client *etcd.Client
session *Session
key string
Expand Down Expand Up @@ -76,7 +80,7 @@ type ElectionConfig struct {
// election := etcdutil.NewElection(client, etcdutil.ElectionConfig{
// Election: "presidental",
// Candidate: "donald",
// EventObserver: func(e etcdutil.Event) {
// EventObserver: func(e etcdutil.ElectionEvent) {
// fmt.Printf("Leader Data: %t\n", e.LeaderData)
// if e.IsLeader {
// // Do thing as leader
Expand All @@ -95,61 +99,80 @@ type ElectionConfig struct {
// election.Close()
//
func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig) (*Election, error) {
if conf.Election == "" {
return nil, errors.New("ElectionConfig.Election can not be empty")
var initialElectionErr error
readyCh := make(chan struct{})
initialElection := true
userObserver := conf.EventObserver
// Wrap user's observer to intercept the initial election.
conf.EventObserver = func(event ElectionEvent) {
if userObserver != nil {
userObserver(event)
}
if initialElection {
initialElection = false
initialElectionErr = event.Err
close(readyCh)
return
}
}
e := NewElectionAsync(client, conf)
// Wait for results of the initial leader election.
select {
case <-readyCh:
case <-ctx.Done():
return nil, ctx.Err()
}
return e, errors.WithStack(initialElectionErr)
}

// Default to short 5 second leadership TTL
setter.SetDefault(&conf.TTL, int64(5))
// NewElectionAsync creates a new leader election and submits our candidate for
// leader. It does not wait for the election to complete. The caller must
// provide an election event observer to monitor the election outcome.
//
// client, _ := etcdutil.NewClient(nil)
//
// // Start a leader election and returns immediately.
// election := etcdutil.NewElectionAsync(client, etcdutil.ElectionConfig{
// Election: "presidental",
// Candidate: "donald",
// EventObserver: func(e etcdutil.Event) {
// fmt.Printf("Leader Data: %t\n", e.LeaderData)
// if e.IsLeader {
// // Do thing as leader
// }
// },
// TTL: 5,
// })
//
// // Cancels the election and concedes the election if we are leader.
// election.Close()
//
func NewElectionAsync(client *etcd.Client, conf ElectionConfig) *Election {
setter.SetDefault(&conf.Election, "null")
conf.Election = path.Join("/elections", conf.Election)

// Use the hostname if no candidate name provided
if host, err := os.Hostname(); err == nil {
setter.SetDefault(&conf.Candidate, host)
}
setter.SetDefault(&conf.TTL, int64(5))

e := &Election{
backOff: newBackOffCounter(time.Millisecond*500, time.Duration(conf.TTL)*time.Second, 2),
timeout: time.Duration(conf.TTL) * time.Second,
observers: make(map[string]EventObserver),
ttlDuration := time.Duration(conf.TTL) * time.Second
e := Election{
observer: conf.EventObserver,
election: conf.Election,
candidate: conf.Candidate,
ttl: ttlDuration,
backOff: newBackOffCounter(500*time.Millisecond, ttlDuration, 2),
client: client,
conf: conf,
}

e.ctx, e.cancel = context.WithCancel(context.Background())

// If an observer was provided
if conf.EventObserver != nil {
e.observers["conf"] = conf.EventObserver
}

var err error
ready := make(chan struct{})
// Register ourselves as an observer for the initial election, then remove before returning
e.observers["init"] = func(event Event) {
// If we get an error while waiting on the election results, pass that back to the caller
if event.Err != nil {
err = event.Err
}
delete(e.observers, "init")
close(ready)
}

// Create a new Session
if e.session, err = NewSession(e.client, SessionConfig{
Observer: e.onSessionChange,
TTL: e.conf.TTL,
}); err != nil {
return nil, err
}

// Wait for results of leader election
select {
case <-ready:
case <-ctx.Done():
return nil, ctx.Err()
e.session = &Session{
observer: e.onSessionChange,
ttl: e.ttl,
backOff: newBackOffCounter(500*time.Millisecond, ttlDuration, 2),
client: client,
}
return e, err
e.session.start()
return &e
}

func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) {
Expand Down Expand Up @@ -202,7 +225,7 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) {

// If delete takes longer than our TTL then lease is expired
// and we are no longer leader anyway.
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
ctx, cancel := context.WithTimeout(context.Background(), e.ttl)
// Withdraw our candidacy since an error occurred
if err := e.withDrawCampaign(ctx); err != nil {
e.onErr(err, "")
Expand All @@ -229,9 +252,9 @@ func (e *Election) withDrawCampaign(ctx context.Context) error {

func (e *Election) registerCampaign(id etcd.LeaseID) (revision int64, err error) {
// Create an entry under the election prefix with our lease ID as the key name
e.key = fmt.Sprintf("%s%x", e.conf.Election, id)
e.key = fmt.Sprintf("%s%x", e.election, id)
txn := e.client.Txn(e.ctx).If(etcd.Compare(etcd.CreateRevision(e.key), "=", 0))
txn = txn.Then(etcd.OpPut(e.key, e.conf.Candidate, etcd.WithLease(id)))
txn = txn.Then(etcd.OpPut(e.key, e.candidate, etcd.WithLease(id)))
txn = txn.Else(etcd.OpGet(e.key))
resp, err := txn.Commit()
if err != nil {
Expand All @@ -245,8 +268,8 @@ func (e *Election) registerCampaign(id etcd.LeaseID) (revision int64, err error)
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]
revision = kv.CreateRevision
if string(kv.Value) != e.conf.Candidate {
if _, err = e.client.Put(e.ctx, e.key, e.conf.Candidate); err != nil {
if string(kv.Value) != e.candidate {
if _, err = e.client.Put(e.ctx, e.key, e.candidate); err != nil {
return 0, err
}
}
Expand All @@ -257,7 +280,7 @@ func (e *Election) registerCampaign(id etcd.LeaseID) (revision int64, err error)
// getLeader returns a KV pair for the current leader
func (e *Election) getLeader(ctx context.Context) (*mvccpb.KeyValue, error) {
// The leader is the first entry under the election prefix
resp, err := e.client.Get(ctx, e.conf.Election, etcd.WithFirstCreate()...)
resp, err := e.client.Get(ctx, e.election, etcd.WithFirstCreate()...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -287,7 +310,7 @@ func (e *Election) watchCampaign(rev int64) error {
// We do this because watcher does not reliably return when errors occur on connect
// or when cancelled (See https://github.com/etcd-io/etcd/pull/10020)
go func() {
watchChan = watcher.Watch(etcd.WithRequireLeader(e.ctx), e.conf.Election,
watchChan = watcher.Watch(etcd.WithRequireLeader(e.ctx), e.election,
etcd.WithRev(int64(rev+1)), etcd.WithPrefix())
close(ready)
}()
Expand Down Expand Up @@ -342,7 +365,7 @@ func (e *Election) watchCampaign(rev int64) error {
_ = watcher.Close()
// If withdraw takes longer than our TTL then lease is expired
// and we are no longer leader anyway.
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
ctx, cancel := context.WithTimeout(context.Background(), e.ttl)

// Withdraw our candidacy because of shutdown
if err := e.withDrawCampaign(ctx); err != nil {
Expand All @@ -358,8 +381,7 @@ func (e *Election) watchCampaign(rev int64) error {
}

func (e *Election) onLeaderChange(kv *mvccpb.KeyValue) {
event := Event{}

event := ElectionEvent{}
if kv != nil {
if string(kv.Key) == e.key {
atomic.StoreInt32(&e.isLeader, 1)
Expand All @@ -372,22 +394,19 @@ func (e *Election) onLeaderChange(kv *mvccpb.KeyValue) {
} else {
event.IsDone = true
}

for _, v := range e.observers {
v(event)
if e.observer != nil {
e.observer(event)
}
}

// onErr reports errors the the observer
func (e *Election) onErr(err error, msg string) {
atomic.StoreInt32(&e.isLeader, 0)

if msg != "" {
err = errors.Wrap(err, msg)
}

for _, v := range e.observers {
v(Event{Err: err})
if e.observer != nil {
e.observer(ElectionEvent{Err: err})
}
}

Expand All @@ -406,7 +425,8 @@ func (e *Election) Close() {
e.onLeaderChange(nil)
}

// IsLeader returns true if we are leader
// IsLeader returns true if we are leader. It only makes sense if the election
// was created with NewElection that block until the initial election is over.
func (e *Election) IsLeader() bool {
return atomic.LoadInt32(&e.isLeader) == 1
}
Expand All @@ -422,8 +442,8 @@ func (e *Election) Concede() (bool, error) {
oldCampaignKey := e.key
e.session.Reset()

// Ensure there are no lingering candiates
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.conf.TTL)*time.Second)
// Ensure there are no lingering candidates
ctx, cancel := context.WithTimeout(context.Background(), e.ttl)
cancel()

_, err := e.client.Delete(ctx, oldCampaignKey)
Expand Down
Loading

0 comments on commit 2296d2f

Please sign in to comment.