Skip to content

Commit

Permalink
Merge pull request #45 from thrawn01/thrawn/develop
Browse files Browse the repository at this point in the history
Election rewrite
  • Loading branch information
thrawn01 authored May 15, 2019
2 parents 769f8d7 + 169e3bb commit e1aba53
Show file tree
Hide file tree
Showing 9 changed files with 834 additions and 290 deletions.
55 changes: 55 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package holster

import (
"math"
"time"
)

var backoff = NewBackOff(time.Millisecond*300, time.Second*30, 2)

type BackOffCounter struct {
min, max time.Duration
factor float64
attempt int
}

func NewBackOff(min, max time.Duration, factor float64) *BackOffCounter {
return &BackOffCounter{
factor: factor,
min: min,
max: max,
}
}

// Next returns the next back off duration based on the number of
// times Next() was called. Each call to next returns the next factor
// of back off. Call Reset() to reset the back off attempts to zero.
func (b *BackOffCounter) Next() time.Duration {
d := b.BackOff(b.attempt)
b.attempt++
return d
}

// Reset sets the back off attempt counter to zero
func (b *BackOffCounter) Reset() {
b.attempt = 0
}

// BackOff calculates the back depending on the attempts provided
func (b *BackOffCounter) BackOff(attempt int) time.Duration {
d := time.Duration(float64(b.min) * math.Pow(b.factor, float64(attempt)))
if d > b.max {
return b.max
}
if d < b.min {
return b.min
}
return d
}

// BackOff is a convenience function which returns a back off duration
// with a default 300 millisecond minimum and a 30 second maximum with
// a factor of 2 for each attempt.
func BackOff(attempt int) time.Duration {
return backoff.BackOff(attempt)
}
17 changes: 17 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package holster_test

import (
"testing"
"time"

"github.com/mailgun/holster"
"github.com/stretchr/testify/assert"
)

func TestBackoffFunc(t *testing.T) {
d := holster.BackOff(0)
assert.Equal(t, time.Millisecond*300, d)

d = holster.BackOff(1)
assert.Equal(t, time.Millisecond*600, d)
}
41 changes: 23 additions & 18 deletions cmd/election/main.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package main

import (
"context"
"fmt"
"os"

"os/signal"
"syscall"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/mailgun/holster/etcdutil"
"github.com/sirupsen/logrus"
)

/*func checkErr(err error) {
if err != nil {
fmt.Printf("err: %s\n", err)
os.Exit(1)
}
}*/

func main() {
logrus.SetLevel(logrus.DebugLevel)

Expand All @@ -25,21 +34,18 @@ func main() {
os.Exit(1)
}

/*resp, err := client.Get(context.Background(), "/elections/cli-election", clientv3.WithPrefix())
if err != nil {
fmt.Printf("while creating a new etcd client: %s\n", err)
os.Exit(1)
}*/
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

e := etcdutil.NewElection(client, etcdutil.ElectionConfig{
Election: "cli-election",
Candidate: os.Args[1],
LeaderChannelSize: 10,
ResumeLeaderOnReconnect: true,
TTL: 10,
leaderChan := make(chan etcdutil.Event, 5)
e, err := etcdutil.NewElection(ctx, client, etcdutil.ElectionConfig{
Election: "cli-election",
Candidate: os.Args[1],
EventObserver: func(e etcdutil.Event) {
leaderChan <- e
},
TTL: 5,
})

err = e.Start()
if err != nil {
fmt.Printf("during election start: %s\n", err)
os.Exit(1)
Expand All @@ -54,15 +60,14 @@ func main() {
switch sig {
case syscall.SIGINT:
fmt.Printf("[%s] Concede and exit\n", os.Args[1])
e.Stop()
e.Close()
os.Exit(1)
}
}
}
}()

for leader := range e.LeaderChan() {
fmt.Printf("[%s] Leader: %t\n", os.Args[1], leader)
for e := range leaderChan {
spew.Printf("[%s] %v\n", os.Args[1], e)
}

}
43 changes: 20 additions & 23 deletions etcdutil/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ Use etcd for leader election if you have several instances of a service running
and you only want one of the service instances to preform a task.

`LeaderElection` starts a goroutine which performs an election and maintains a leader
while services join and leave the election. Calling `Stop()` will `Concede()` leadership if
the service currently has it.
while candidates join and leave the election. Calling `Close()` will concede leadership if
the service currently has it and will withdraw the candidate from the election.

```go

Expand All @@ -16,33 +16,29 @@ import (
func main() {
var wg holster.WaitGroup

hostname, err := os.Hostname()
if err != nil {
fmt.Fprintf(os.Stderr, "while obtaining hostname: %s\n", err)
return
}

client, err := etcdutil.NewClient(nil)
if err != nil {
fmt.Fprintf(os.Stderr, "while creating etcd client: %s\n", err)
return
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// Preform an election called 'my-service' with hostname as the candidate name
e := etcdutil.NewElection(client, etcdutil.ElectionConfig{
// Start a leader election and attempt to become leader, only returns after
// determining the current leader.
election := etcdutil.NewElection(ctx, client, etcdutil.ElectionConfig{
Election: "my-service",
Candidate: hostname,
LeaderChannelSize: 10,
ResumeLeaderOnReconnect: true,
Candidate: "my-candidate",
EventObserver: func(e etcdutil.Event) {
leaderChan <- e
if e.IsDone {
close(leaderChan)
}
},
TTL: 10,
})

// Start the election, will block until a leader is elected
if err = e.Start(); err != nil {
fmt.Printf("during election start: %s\n", err)
os.Exit(1)
}

// Handle graceful shutdown
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
Expand All @@ -56,7 +52,8 @@ func main() {
if election.IsLeader() {
err := DoThing()
if err != nil {
// Have another instance DoThing(), we can't for some reason
// Have another instance run DoThing()
// since we can't for some reason.
election.Concede()
}
}
Expand All @@ -68,9 +65,9 @@ func main() {
})
wg.Wait()

// Or you can listen on a channel for leadership updates
for leader := range e.LeaderChan() {
fmt.Printf("Leader: %t\n", leader)
// Or you can pipe events to a channel
for leader := range leaderChan {
fmt.Printf("Leader: %v\n", leader)
}
}
```
Expand Down
28 changes: 28 additions & 0 deletions etcdutil/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This setup is for testing only, all communication with
# etcd is done through the proxy. You have to create a proxy
# with toxiproxy-cli before you can connect to etcd
#
# toxiproxy-cli create etcd --listen 0.0.0.0:2379 --upstream etcd:22379

version: '3.2'
services:
etcd:
image: quay.io/coreos/etcd:v3.2
command: >
/usr/local/bin/etcd
-name etcd0
-advertise-client-urls http://localhost:2379
-listen-client-urls http://0.0.0.0:22379
-initial-advertise-peer-urls http://0.0.0.0:2381
-listen-peer-urls http://0.0.0.0:2381
-initial-cluster-token etcd-cluster-1
-initial-cluster etcd0=http://0.0.0.0:2381
-initial-cluster-state new
-enable-v2=false
ports:
- "22379:22379"
# proxy:
# image: shopify/toxiproxy:latest
# ports:
# - "2379:2379"
# - "8474:8474"
Loading

0 comments on commit e1aba53

Please sign in to comment.