Skip to content

Commit

Permalink
proxy: use context for proxy check and timeouts
Browse files Browse the repository at this point in the history
Use a context with deadline of proxyTimeout instead of a timer to handle checks
and timeouts.
As an additional effect the check function will exit or will not update the
proxy address if the context is done.

Also handle signals to stop proxy.
  • Loading branch information
sgotti committed Feb 24, 2020
1 parent 350ddf6 commit 2f1a691
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 60 deletions.
156 changes: 96 additions & 60 deletions cmd/proxy/cmd/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"fmt"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -163,15 +166,15 @@ func (c *ClusterChecker) stopPollonProxy() {
}
}

func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) {
func (c *ClusterChecker) updateDestAddress(destAddr *net.TCPAddr) {
c.pollonMutex.Lock()
defer c.pollonMutex.Unlock()
if c.pp != nil {
c.pp.C <- confData
c.pp.C <- pollon.ConfData{DestAddr: destAddr}
}
}

func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTimeout time.Duration) error {
func (c *ClusterChecker) setProxyInfo(ctx context.Context, e store.Store, generation int64, proxyTimeout time.Duration) error {
proxyInfo := &cluster.ProxyInfo{
InfoUID: common.UID(),
UID: c.uid,
Expand All @@ -180,15 +183,15 @@ func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTime
}
log.Debugf("proxyInfo dump: %s", spew.Sdump(proxyInfo))

if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, 2*proxyTimeout); err != nil {
if err := c.e.SetProxyInfo(ctx, proxyInfo, 2*proxyTimeout); err != nil {
return err
}
return nil
}

// Check reads the cluster data and applies the right pollon configuration.
func (c *ClusterChecker) Check() error {
cd, _, err := c.e.GetClusterData(context.TODO())
// check reads the cluster data and applies the right pollon configuration.
func (c *ClusterChecker) check(ctx context.Context) error {
cd, _, err := c.e.GetClusterData(ctx)
if err != nil {
return fmt.Errorf("cannot get cluster data: %v", err)
}
Expand All @@ -201,15 +204,15 @@ func (c *ClusterChecker) Check() error {
log.Debugf("cd dump: %s", spew.Sdump(cd))
if cd == nil {
log.Infow("no clusterdata available, closing connections to master")
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
return nil
}
if cd.FormatVersion != cluster.CurrentCDFormatVersion {
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
return fmt.Errorf("unsupported clusterdata format version: %d", cd.FormatVersion)
}
if err = cd.Cluster.Spec.Validate(); err != nil {
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
return fmt.Errorf("clusterdata validation failed: %v", err)
}

Expand All @@ -228,9 +231,9 @@ func (c *ClusterChecker) Check() error {
proxy := cd.Proxy
if proxy == nil {
log.Infow("no proxy object available, closing connections to master")
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
// ignore errors on setting proxy info
if err = c.SetProxyInfo(c.e, cluster.NoGeneration, proxyTimeout); err != nil {
if err = c.setProxyInfo(ctx, c.e, cluster.NoGeneration, proxyTimeout); err != nil {
log.Errorw("failed to update proxyInfo", zap.Error(err))
} else {
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
Expand All @@ -245,9 +248,9 @@ func (c *ClusterChecker) Check() error {
db, ok := cd.DBs[proxy.Spec.MasterDBUID]
if !ok {
log.Infow("no db object available, closing connections to master", "db", proxy.Spec.MasterDBUID)
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
// ignore errors on setting proxy info
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {
if err = c.setProxyInfo(ctx, c.e, proxy.Generation, proxyTimeout); err != nil {
log.Errorw("failed to update proxyInfo", zap.Error(err))
} else {
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
Expand All @@ -259,14 +262,15 @@ func (c *ClusterChecker) Check() error {
return nil
}

// TODO(sgotti) use a resolver with a context if it exists
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port))
if err != nil {
log.Errorw("cannot resolve db address", zap.Error(err))
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
return nil
}
log.Infow("master address", "address", addr)
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {

if err = c.setProxyInfo(ctx, c.e, proxy.Generation, proxyTimeout); err != nil {
// if we failed to update our proxy info when a master is defined we
// cannot ignore this error since the sentinel won't know that we exist
// and are sending connections to a master so, when electing a new
Expand All @@ -282,84 +286,111 @@ func (c *ClusterChecker) Check() error {

// start proxing only if we are inside enabledProxies, this ensures that the
// sentinel has read our proxyinfo and knows we are alive
if util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) {
log.Infow("proxying to master address", "address", addr)
c.sendPollonConfData(pollon.ConfData{DestAddr: addr})
} else {
if !util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) {
log.Infow("not proxying to master address since we aren't in the enabled proxies list", "address", addr)
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
return nil
}

return nil
}

func (c *ClusterChecker) TimeoutChecker(checkOkCh chan struct{}) {
c.configMutex.Lock()
timeoutTimer := time.NewTimer(c.proxyTimeout)
c.configMutex.Unlock()

for {
select {
case <-timeoutTimer.C:
log.Infow("check timeout timer fired")
// if the check timeouts close all connections and stop listening
// (for example to avoid load balancers forward connections to us
// since we aren't ready or in a bad state)
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
if c.stopListening {
c.stopPollonProxy()
}
// before updating the pollon address, check that the context isn't timed
// out, usually if the context is timeout out one of the above calls will
// return an error but libkv stores doesn't handle contexts so we should
// check here.
select {
case <-ctx.Done():
log.Infow("not updating proxy address since context is done: %v", ctx.Err())
return nil
default:
}

case <-checkOkCh:
log.Debugw("check ok message received")
log.Infow("proxying to master address", "address", addr)
c.updateDestAddress(addr)

// ignore if stop succeeded or not due to timer already expired
timeoutTimer.Stop()
return nil
}

c.configMutex.Lock()
timeoutTimer = time.NewTimer(c.proxyTimeout)
c.configMutex.Unlock()
// timeoutChecker will forcefully close connections when the context times
// out.
func (c *ClusterChecker) timeoutChecker(ctx context.Context) {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
log.Infow("check timeout fired")
// if the check timeouts close all connections and stop listening
// (for example to avoid load balancers forward connections to us
// since we aren't ready or in a bad state)
c.updateDestAddress(nil)
if c.stopListening {
c.stopPollonProxy()
}
}
}

func (c *ClusterChecker) Start() error {
checkOkCh := make(chan struct{})
// checkLoop executes at predefined intervals the Check function. It'll force
// close connections when a check function continuosly fails for more than a
// timeout.
func (c *ClusterChecker) checkLoop(pctx context.Context) error {
checkCh := make(chan error)
timerCh := time.NewTimer(0).C

// TODO(sgotti) TimeoutCecker is needed to forcefully close connection also
// if the Check method is blocked somewhere.
// The idomatic/cleaner solution will be to use a context instead of this
// TimeoutChecker but we have to change the libkv stores to support contexts.
go c.TimeoutChecker(checkOkCh)
c.configMutex.Lock()
ctx, cancel := context.WithTimeout(pctx, c.proxyTimeout)
c.configMutex.Unlock()

for {
select {
case <-pctx.Done():
cancel()
return nil
case <-timerCh:
// start a new context if it's already done, this happens when the
// context is timed out or cancelled.
select {
case <-ctx.Done():
c.configMutex.Lock()
ctx, cancel = context.WithTimeout(pctx, c.proxyTimeout)
c.configMutex.Unlock()
default:
}

go func() {
checkCh <- c.Check()
checkCh <- c.check(ctx)
}()
case err := <-checkCh:
if err != nil {
// don't report check ok since it returned an error
// if the check function returned an error then don't stop the
// context so if it times out the TimeoutChecker will close
// connections or it could be cancelled if the next check
// succeeds before the timeout
log.Infow("check function error", zap.Error(err))
} else {
// report that check was ok
checkOkCh <- struct{}{}
// check was ok, so cancel the context and start a new one with a new TimeoutChecker
cancel()

c.configMutex.Lock()
ctx, cancel = context.WithTimeout(pctx, c.proxyTimeout)
c.configMutex.Unlock()
go c.timeoutChecker(ctx)
}

c.configMutex.Lock()
timerCh = time.NewTimer(c.proxyCheckInterval).C
c.configMutex.Unlock()

case err := <-c.endPollonProxyCh:
if err != nil {
cancel()
return fmt.Errorf("proxy error: %v", err)
}
}
}
}

func sigHandler(sigs chan os.Signal, cancel context.CancelFunc) {
s := <-sigs
log.Debugw("got signal", "signal", s)
cancel()
}

func Execute() {
if err := flagutil.SetFlagsFromEnv(CmdProxy.PersistentFlags(), "STPROXY"); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -428,11 +459,16 @@ func proxy(c *cobra.Command, args []string) {
}()
}

ctx, cancel := context.WithCancel(context.Background())
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go sigHandler(sigs, cancel)

clusterChecker, err := NewClusterChecker(uid, cfg)
if err != nil {
log.Fatalf("cannot create cluster checker: %v", err)
}
if err = clusterChecker.Start(); err != nil {
if err = clusterChecker.checkLoop(ctx); err != nil {
log.Fatalf("cluster checker ended with error: %v", err)
}
}
2 changes: 2 additions & 0 deletions tests/integration/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func TestProxyListening(t *testing.T) {
Spec: &cluster.ClusterSpec{
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
FailInterval: &cluster.Duration{Duration: 10 * time.Second},
// user faster check interval for tests
ProxyCheckInterval: &cluster.Duration{Duration: 1 * time.Second},
},
Status: cluster.ClusterStatus{
CurrentGeneration: 1,
Expand Down

0 comments on commit 2f1a691

Please sign in to comment.