Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix_the_etcdv2_1000ErrorCodeEventIndexCleared_bug #1141

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions objdb/etcdLock.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package objdb

import (
"strings"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -353,12 +353,21 @@ func (lk *etcdLock) watchLock() {
}
for {
resp, err := watcher.Next(lk.watchCtx)
if err != nil && (err.Error() == client.ErrClusterUnavailable.Error() ||
strings.Contains(err.Error(), "context canceled")) {
log.Infof("Stopping watch on key %s", keyName)
return
} else if err != nil {
log.Errorf("Error watching the key %s, Err %v.", keyName, err)
if err != nil {
log.Infof("Watch %s next failed: %v %v", keyName, reflect.TypeOf(err), err)
switch err.(type) {
case *client.ClusterError:
// retry and wait for etcd cluster to recover!
time.Sleep(time.Second * 5)
case client.Error:
if err.(client.Error).Code == client.ErrorCodeEventIndexCleared {
watcher = lk.kapi.Watcher(keyName, nil)
log.Errorf("Watch next failed: reset watcher")
}
default:
// do nothing, just retry
}
continue
} else {
log.Debugf("Got Watch Resp: %+v", resp)

Expand Down
23 changes: 16 additions & 7 deletions objdb/etcdService.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/coreos/etcd/client"
"reflect"
)

// Service state
Expand Down Expand Up @@ -194,14 +195,22 @@ func (ep *EtcdClient) WatchService(name string, eventCh chan WatchServiceEvent,
for {
// Block till next watch event
etcdRsp, err := watcher.Next(watchCtx)
if err != nil && err.Error() == client.ErrClusterUnavailable.Error() {
log.Infof("Stopping watch on key %s", keyName)
return
} else if err != nil {
log.Errorf("Error %v during watch. Watch thread exiting", err)
return
if err != nil {
log.Infof("Watch %s next failed: %v %v", keyName, reflect.TypeOf(err), err)
switch err.(type) {
case *client.ClusterError:
// retry and wait for etcd cluster to recover!
time.Sleep(time.Second * 5)
case client.Error:
if err.(client.Error).Code == client.ErrorCodeEventIndexCleared {
watcher = ep.kapi.Watcher(keyName, &client.WatcherOptions{Recursive: true})
log.Errorf("Watch next failed: reset watcher")
}
default:
// do nothing, just retry
}
continue
}

// Send it to watch channel
watchCh <- etcdRsp
}
Expand Down
20 changes: 15 additions & 5 deletions state/etcdstatedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,26 @@ func (d *EtcdStateDriver) ReadAll(baseKey string) ([][]byte, error) {
return [][]byte{}, err
}

func (d *EtcdStateDriver) channelEtcdEvents(watcher client.Watcher, rsps chan [2][]byte) {
func (d *EtcdStateDriver) channelEtcdEvents(watcher client.Watcher, rsps chan [2][]byte, baseKey string) {
for {
// block on change notifications
etcdRsp, err := watcher.Next(context.Background())
if err != nil {
log.Errorf("Error %v during watch", err)
time.Sleep(time.Second)
log.Infof("Watch %s next failed: %v %v", baseKey, reflect.TypeOf(err), err)
switch err.(type) {
case *client.ClusterError:
// retry and wait for etcd cluster to recover!
time.Sleep(time.Second * 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you choose this value for the sleep?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value for the sleep can be set any value.But this decides how many times the watcher will send request to the etcd until the cluster being recover.
The etcd cluster may be recovered any time after it broken.So i choose the value from my experiences.

case client.Error:
if err.(client.Error).Code == client.ErrorCodeEventIndexCleared {
watcher = d.KeysAPI.Watcher(baseKey, &client.WatcherOptions{Recursive: true})
log.Errorf("Watch next failed: reset watcher")
}
default:
// do nothing, just retry
}
continue
}

// XXX: The logic below assumes that the node returned is always a node
// of interest. Eg: If we set a watch on /a/b/c, then we are mostly
// interested in changes in that directory i.e. changes to /a/b/c/d1..d2
Expand Down Expand Up @@ -220,7 +230,7 @@ func (d *EtcdStateDriver) WatchAll(baseKey string, rsps chan [2][]byte) error {
return errors.New("etcd watch failed")
}

go d.channelEtcdEvents(watcher, rsps)
go d.channelEtcdEvents(watcher, rsps, baseKey)

return nil
}
Expand Down