From c0e8dcb159a7365b25ea50507b609439a7793057 Mon Sep 17 00:00:00 2001 From: DeniskaRediska Date: Thu, 20 Feb 2020 12:03:53 +0300 Subject: [PATCH 1/4] remove `isp-log`; add panic recovery --- README.md | 1 - cluster/client.go | 35 +++++++----- cluster/const.go | 4 +- cluster/leader_client.go | 21 ++++---- cluster_handler.go | 22 +++++++- config/config.go | 3 ++ executor.go | 11 ++-- go.mod | 3 -- go.sum | 10 ---- raft.go | 17 +++--- raft/logger.go | 112 --------------------------------------- raft/manager.go | 10 ++-- scheduler.go | 18 ++++--- store/store.go | 22 +++++--- ws/cluster.go | 11 ++-- ws/root.go | 19 +++++-- 16 files changed, 132 insertions(+), 187 deletions(-) delete mode 100644 raft/logger.go diff --git a/README.md b/README.md index 764b1bd..f37dcb5 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,6 @@ Note that all nodes in cluster must register the same JobExecutors because all n # Planned - [Project TODOs](https://todos.tickgit.com/browse?repo=https://github.com/integration-system/gds) -- Remove `isp-log` dependency, pass Raft logger to user API - Implement new Job types - Add sync.Pool to cluster.prepareCommand() - Library API improvements diff --git a/cluster/client.go b/cluster/client.go index 1310f55..1bdb3e5 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -2,9 +2,10 @@ package cluster import ( "errors" + "fmt" + "github.com/hashicorp/go-hclog" "github.com/integration-system/gds/raft" "github.com/integration-system/gds/utils" - log "github.com/integration-system/isp-log" jsoniter "github.com/json-iterator/go" "sync" "time" @@ -30,6 +31,8 @@ type Client struct { leaderClient *SocketLeaderClient prevLeaderIds []string leaderCh chan bool + + logger hclog.Logger } func (client *Client) Shutdown() error { @@ -125,31 +128,34 @@ func (client *Client) SyncApplyOnLeader(command []byte) (*ApplyLogResponse, erro } func (client *Client) SyncApplyHelper(command []byte, commandName string) (interface{}, error) { - // TODO remove - // log.Debugf(0, "peer %s applying %s to cluster: %s", client.LocalID(), commandName, string(command)) applyLogResponse, err := client.SyncApply(command) if err != nil { - log.Warnf(0, "apply %s: %v", commandName, err) + client.logger.Warn(fmt.Sprintf("apply %s: %v", commandName, err)) return nil, err } if applyLogResponse != nil && applyLogResponse.ApplyError != "" { - log.WithMetadata(map[string]interface{}{ - "result": string(applyLogResponse.Result), - "applyError": applyLogResponse.ApplyError, - "commandName": commandName, - }).Warnf(0, "apply command") + client.logger.Warn("apply command", + "result", string(applyLogResponse.Result), + "applyError", applyLogResponse.ApplyError, + "commandName", commandName, + ) return applyLogResponse.Result, errors.New(applyLogResponse.ApplyError) } return applyLogResponse.Result, nil } func (client *Client) listenLeaderNotifications() { - defer close(client.leaderCh) + defer func() { + if err := recover(); err != nil { + client.logger.Error(fmt.Sprintf("panic: %v", err)) + } + close(client.leaderCh) + }() for n := range client.r.LeaderNotificationsCh() { client.leaderMu.Lock() if client.leaderClient != nil { - log.Debugf(0, "close previous leader ws connection %s", client.leaderState.leaderAddr) + client.logger.Debug(fmt.Sprintf("close previous leader ws connection %s", client.leaderState.leaderAddr)) client.leaderClient.Close() client.leaderClient = nil } @@ -179,9 +185,9 @@ func (client *Client) listenLeaderNotifications() { _, _ = client.SyncApplyHelper(cmd, "AddPeerCommand") }(client.prevLeaderIds) } else { - leaderClient := NewSocketLeaderClient(n.CurrentLeaderAddress, client.r.LocalID()) + leaderClient := NewSocketLeaderClient(n.CurrentLeaderAddress, client.r.LocalID(), client.logger) if err := leaderClient.Dial(leaderConnectionTimeout); err != nil { - log.Errorf(0, "could not connect to leader: %v", err) + client.logger.Error(fmt.Sprintf("could not connect to leader: %v", err)) continue } client.leaderClient = leaderClient @@ -206,11 +212,12 @@ type leaderState struct { leaderAddr string } -func NewRaftClusterClient(r *raft.Raft) *Client { +func NewRaftClusterClient(r *raft.Raft, logger hclog.Logger) *Client { client := &Client{ r: r, leaderState: leaderState{}, leaderCh: make(chan bool, 5), + logger: logger, } go client.listenLeaderNotifications() diff --git a/cluster/const.go b/cluster/const.go index 38f2f23..9a01698 100644 --- a/cluster/const.go +++ b/cluster/const.go @@ -3,7 +3,7 @@ package cluster import ( "bytes" "encoding/binary" - log "github.com/integration-system/isp-log" + "fmt" "time" ) @@ -69,7 +69,7 @@ func prepareCommand(command uint64, payload interface{}) []byte { buf := bytes.NewBuffer(cmd) err := json.NewEncoder(buf).Encode(payload) if err != nil { - log.Fatalf(0, "prepare log command: %v", err) + panic(fmt.Errorf("prepare log command: %v", err)) } return buf.Bytes() } diff --git a/cluster/leader_client.go b/cluster/leader_client.go index 87b344b..626c9eb 100644 --- a/cluster/leader_client.go +++ b/cluster/leader_client.go @@ -4,8 +4,8 @@ import ( "context" "fmt" "github.com/cenkalti/backoff" + "github.com/hashicorp/go-hclog" etp "github.com/integration-system/isp-etp-go/client" - log "github.com/integration-system/isp-log" "net" "net/http" "net/url" @@ -17,6 +17,8 @@ type SocketLeaderClient struct { url string globalCtx context.Context cancel context.CancelFunc + + logger hclog.Logger } func (c *SocketLeaderClient) Ack(data []byte, timeout time.Duration) ([]byte, error) { @@ -40,18 +42,19 @@ func (c *SocketLeaderClient) Close() { c.cancel() err := c.client.Close() if err != nil { - log.Warnf(0, "leader client close err: %v", err) + c.logger.Warn(fmt.Sprintf("leader client close err: %v", err)) } - log.Debug(0, "leader client connection closed") + c.logger.Debug(fmt.Sprintf("leader client connection closed")) } -func NewSocketLeaderClient(leaderAddr, localID string) *SocketLeaderClient { +func NewSocketLeaderClient(leaderAddr, localID string, logger hclog.Logger) *SocketLeaderClient { etpConfig := etp.Config{ HttpClient: http.DefaultClient, } client := etp.NewClient(etpConfig) leaderClient := &SocketLeaderClient{ client: client, + logger: logger, url: getURL(leaderAddr, localID), } ctx, cancel := context.WithCancel(context.Background()) @@ -59,16 +62,16 @@ func NewSocketLeaderClient(leaderAddr, localID string) *SocketLeaderClient { leaderClient.cancel = cancel leaderClient.client.OnDisconnect(func(err error) { - log.WithMetadata(map[string]interface{}{ - "leaderAddr": leaderAddr, - }).Warn(0, "leader client disconnected") + logger.Warn("leader client disconnected", + "leaderAddr", leaderAddr, + ) }) leaderClient.client.OnError(func(err error) { - log.Warnf(0, "leader client on error: %v", err) + logger.Warn(fmt.Sprintf("leader client on error: %v", err)) }) leaderClient.client.OnConnect(func() { - log.Debug(0, "leader client connected") + logger.Debug(fmt.Sprintf("leader client connected")) }) return leaderClient } diff --git a/cluster_handler.go b/cluster_handler.go index c064ef8..92669d2 100644 --- a/cluster_handler.go +++ b/cluster_handler.go @@ -1,6 +1,8 @@ package gds import ( + "fmt" + "github.com/hashicorp/go-hclog" "sync" "time" @@ -30,6 +32,8 @@ type ClusterHandler struct { nextPeer *utils.RoundRobinStrings assignJobsChLock sync.RWMutex assignJobsCh chan []string + + logger hclog.Logger } func (cl *ClusterHandler) HandleAddPeerCommand(state store.WritableState, data []byte) (interface{}, error) { @@ -195,6 +199,11 @@ func (cl *ClusterHandler) listenLeaderCh(mainStore *store.Store) { // Used to detect changes in onlinePeers while there was no leader in cluster (e.g. another peer got down). func (cl *ClusterHandler) checkPeers(closeCh chan struct{}, visitState func(f func(store.ReadonlyState))) { + defer func() { + if err := recover(); err != nil { + cl.logger.Error(fmt.Sprintf("panic: %v", err)) + } + }() time.Sleep(200 * time.Millisecond) var oldServers []string @@ -240,6 +249,11 @@ func (cl *ClusterHandler) checkJobs(closeCh chan struct{}, visitState func(f fun } func (cl *ClusterHandler) backgroundAssigningJobs(closeCh chan struct{}, assignJobsCh chan []string) { + defer func() { + if err := recover(); err != nil { + cl.logger.Error(fmt.Sprintf("panic: %v", err)) + } + }() jobKeys := make([]string, 0, batchSize) ticker := time.NewTicker(assigningInterval) defer ticker.Stop() @@ -297,6 +311,11 @@ func (cl *ClusterHandler) assignJobs(keys []string) { } func (cl *ClusterHandler) handleExecutedJobs(executedJobsCh <-chan cluster.JobExecuted) { + defer func() { + if err := recover(); err != nil { + cl.logger.Error(fmt.Sprintf("panic: %v", err)) + } + }() for payload := range executedJobsCh { cmd := cluster.PrepareJobExecutedCommand(payload.JobKey, payload.Error, payload.ExecutedTime) // TODO handle errors. retry? @@ -304,10 +323,11 @@ func (cl *ClusterHandler) handleExecutedJobs(executedJobsCh <-chan cluster.JobEx } } -func NewClusterHandler(typeProvider provider.TypeProvider, executor executor) *ClusterHandler { +func NewClusterHandler(typeProvider provider.TypeProvider, executor executor, logger hclog.Logger) *ClusterHandler { return &ClusterHandler{ typeProvider: typeProvider, executor: executor, nextPeer: utils.NewRoundRobinStrings(make([]string, 0)), + logger: logger, } } diff --git a/config/config.go b/config/config.go index 3f8fc0d..320ef61 100644 --- a/config/config.go +++ b/config/config.go @@ -1,6 +1,7 @@ package config import ( + "github.com/hashicorp/go-hclog" "time" ) @@ -17,4 +18,6 @@ type ClusterConfiguration struct { OuterAddress string // Addresses of peers in cluster including OuterAddress in format address:port Peers []string + // Logger is a user-provided hc-log logger. If nil, a logger writing to Stderr + Logger hclog.Logger } diff --git a/executor.go b/executor.go index 8678f96..5782494 100644 --- a/executor.go +++ b/executor.go @@ -4,10 +4,10 @@ import ( "context" "errors" "fmt" + "github.com/hashicorp/go-hclog" "github.com/integration-system/gds/cluster" "github.com/integration-system/gds/jobs" "github.com/integration-system/gds/utils" - log "github.com/integration-system/isp-log" "sync" "time" ) @@ -33,6 +33,8 @@ type defaultRuntimeExecutor struct { fMap map[string]*future executedJobsCh chan<- cluster.JobExecuted executionTimeout time.Duration + + logger hclog.Logger } func (e *defaultRuntimeExecutor) CancelJob(key string) bool { @@ -124,7 +126,7 @@ func (e *defaultRuntimeExecutor) makeF(f *future) func() { exec, ok := e.registry.GetExecutor(f.job.Type()) if !ok { - log.Errorf(0, "defaultRuntimeExecutor: not found executor for job type: %v", f.job.Type()) + e.logger.Error(fmt.Sprintf("defaultRuntimeExecutor: not found executor for job type: %v", f.job.Type())) return } @@ -156,7 +158,7 @@ func (e *defaultRuntimeExecutor) makeF(f *future) func() { if err != nil { errStr = err.Error() // TODO remove log - log.Warnf(0, "defaultRuntimeExecutor: job %s type %s has finished with err '%v'", f.job.Key(), f.job.Type(), err) + e.logger.Warn(fmt.Sprintf("defaultRuntimeExecutor: job %s type %s has finished with err '%v'", f.job.Key(), f.job.Type(), err)) } executedJob := cluster.JobExecuted{ @@ -169,7 +171,7 @@ func (e *defaultRuntimeExecutor) makeF(f *future) func() { } } -func newDefaultRuntimeExecutor(registry executorRegistry, executedJobsCh chan<- cluster.JobExecuted, executionTimeout time.Duration) executor { +func newDefaultRuntimeExecutor(registry executorRegistry, executedJobsCh chan<- cluster.JobExecuted, executionTimeout time.Duration, logger hclog.Logger) executor { if executionTimeout == 0 { executionTimeout = DefaultJobExecutionTimeout } @@ -178,5 +180,6 @@ func newDefaultRuntimeExecutor(registry executorRegistry, executedJobsCh chan<- fMap: make(map[string]*future), executedJobsCh: executedJobsCh, executionTimeout: executionTimeout, + logger: logger, } } diff --git a/go.mod b/go.mod index 4ab1717..12e2406 100644 --- a/go.mod +++ b/go.mod @@ -12,14 +12,11 @@ require ( github.com/hashicorp/raft v1.1.2-0.20191121160549-9ecdba6a067b github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617 github.com/integration-system/isp-etp-go v2.0.2+incompatible - github.com/integration-system/isp-log v0.0.0-20191010105142-a22bf2e0b56a github.com/integration-system/net-mux v1.0.0 github.com/json-iterator/go v1.1.8 github.com/mattn/go-isatty v0.0.11 // indirect github.com/pkg/errors v0.8.1 github.com/robfig/cron/v3 v3.0.0 - github.com/sirupsen/logrus v1.4.2 - github.com/spf13/cast v1.3.1 github.com/stretchr/testify v1.4.0 golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 // indirect gopkg.in/yaml.v2 v2.2.4 // indirect diff --git a/go.sum b/go.sum index edd6f16..f27a885 100644 --- a/go.sum +++ b/go.sum @@ -59,14 +59,10 @@ github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617 h1:CJDRE/2tB github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617/go.mod h1:aUF6HQr8+t3FC/ZHAC+pZreUBhTaxumuu3L+d37uRxk= github.com/integration-system/isp-etp-go v2.0.2+incompatible h1:+aQiqH/aMy9pUsifIxfWU/DD8cwbMo24AAIV7Ab6aP4= github.com/integration-system/isp-etp-go v2.0.2+incompatible/go.mod h1:NdgczkoFCer9SZOx674EVXCcKRIhFe8LYnwNwoIMDhI= -github.com/integration-system/isp-log v0.0.0-20191010105142-a22bf2e0b56a h1:rwJ0kgDrWaRWE4lB3L6/j1vThXSsu3KKwHqZnYCjBRQ= -github.com/integration-system/isp-log v0.0.0-20191010105142-a22bf2e0b56a/go.mod h1:zujfuIMzbVTJ3nCFI38Sfo4kxjAe0A+X9Kql1UPS4NA= github.com/integration-system/net-mux v1.0.0 h1:UY5y9cDq4ngfYaVzwqcIZ2FDw2l0H4slVDXp1E2gopc= github.com/integration-system/net-mux v1.0.0/go.mod h1:R2PRrXFCOQRbjYIeiJFA6y2/3CfMYEX20H6yrIN6c7g= github.com/json-iterator/go v1.1.8 h1:QiWkFLKq0T7mpzwOTu6BzNDbfTE8OLrYhVKYMLF46Ok= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= -github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -96,12 +92,7 @@ github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7q github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= -github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= -github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= -github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= @@ -114,7 +105,6 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be h1:QAcqgptGM8IQBC9K/RC4o+O9YmqEm0diQn9QmZw/0mU= diff --git a/raft.go b/raft.go index 3b0664d..bf344cf 100644 --- a/raft.go +++ b/raft.go @@ -3,6 +3,7 @@ package gds import ( "context" "fmt" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" "github.com/integration-system/gds/cluster" "github.com/integration-system/gds/config" @@ -11,7 +12,6 @@ import ( "github.com/integration-system/gds/store" "github.com/integration-system/gds/ws" "github.com/integration-system/isp-etp-go" - log "github.com/integration-system/isp-log" mux "github.com/integration-system/net-mux" "net" "net/http" @@ -29,11 +29,14 @@ type RaftAdapter struct { HTTPServer *http.Server EtpServer etp.Server muxer mux.Mux + + logger hclog.Logger } -func NewRaftAdapter(cfg config.ClusterConfiguration, handler store.CommandsHandler, typeProvider provider.TypeProvider) (*RaftAdapter, error) { +func NewRaftAdapter(cfg config.ClusterConfiguration, handler store.CommandsHandler, typeProvider provider.TypeProvider, logger hclog.Logger) (*RaftAdapter, error) { adapter := &RaftAdapter{ Config: cfg, + logger: logger, } httpListener, raftListener, err := adapter.initMultiplexer(cfg.OuterAddress) @@ -66,7 +69,7 @@ func (ra *RaftAdapter) initMultiplexer(address string) (net.Listener, net.Listen go func() { if err := ra.muxer.Serve(); err != nil { - log.Errorf(0, "serve mux: %v", err) + ra.logger.Error(fmt.Sprintf("serve mux: %v", err)) } }() return httpListener, raftListener, nil @@ -78,14 +81,14 @@ func (ra *RaftAdapter) initWebsocket(ctx context.Context, listener net.Listener) ConnectionReadLimit: defaultWsConnectionReadLimit, } etpServer := etp.NewServer(ctx, etpConfig) - ws.NewSocketEventHandler(etpServer, ra.ClusterClient).SubscribeAll() + ws.NewSocketEventHandler(etpServer, ra.ClusterClient, ra.logger).SubscribeAll() httpMux := http.NewServeMux() httpMux.HandleFunc(cluster.WebsocketURLPath, etpServer.ServeHttp) httpServer := &http.Server{Handler: httpMux} go func() { if err := httpServer.Serve(listener); err != nil && err != http.ErrServerClosed { - log.Errorf(0, "http server closed: %v", err) + ra.logger.Error(fmt.Sprintf("http server closed: %v", err)) } }() ra.EtpServer = etpServer @@ -95,11 +98,11 @@ func (ra *RaftAdapter) initWebsocket(ctx context.Context, listener net.Listener) func (ra *RaftAdapter) initRaft(listener net.Listener, clusterCfg config.ClusterConfiguration, commandsHandler store.CommandsHandler, typeProvider provider.TypeProvider) error { raftStore := store.NewStore(commandsHandler, typeProvider) - r, err := raft.NewRaft(listener, clusterCfg, raftStore) + r, err := raft.NewRaft(listener, clusterCfg, raftStore, ra.logger) if err != nil { return fmt.Errorf("unable to create raft server: %v", err) } - clusterClient := cluster.NewRaftClusterClient(r) + clusterClient := cluster.NewRaftClusterClient(r, ra.logger) if clusterCfg.BootstrapCluster { err = r.BootstrapCluster() diff --git a/raft/logger.go b/raft/logger.go deleted file mode 100644 index f302606..0000000 --- a/raft/logger.go +++ /dev/null @@ -1,112 +0,0 @@ -package raft - -import ( - "fmt" - "github.com/hashicorp/go-hclog" - isplog "github.com/integration-system/isp-log" - "github.com/sirupsen/logrus" - "github.com/spf13/cast" - "io" - "log" -) - -type LoggerAdapter struct { - name string -} - -func (l *LoggerAdapter) Log(level logrus.Level, msg string, args ...interface{}) { - if l.name != "" { - msg = fmt.Sprintf("%s:%s", l.name, msg) - } - if len(args) != 0 { - metadata := make(map[string]interface{}, len(args)/2) - for i := 0; i < len(args)-1; i += 2 { - k := cast.ToString(args[i]) - metadata[k] = args[i+1] - } - isplog.WithMetadata(metadata).Log(level, 0, msg) - } else { - isplog.Log(level, 0, msg) - } -} - -func (l *LoggerAdapter) Trace(msg string, args ...interface{}) { - l.Log(logrus.TraceLevel, msg, args...) -} - -func (l *LoggerAdapter) Debug(msg string, args ...interface{}) { - l.Log(logrus.DebugLevel, msg, args...) -} - -func (l *LoggerAdapter) Info(msg string, args ...interface{}) { - l.Log(logrus.InfoLevel, msg, args...) -} - -func (l *LoggerAdapter) Warn(msg string, args ...interface{}) { - l.Log(logrus.WarnLevel, msg, args...) -} - -func (l *LoggerAdapter) Error(msg string, args ...interface{}) { - l.Log(logrus.ErrorLevel, msg, args...) -} - -func (l *LoggerAdapter) IsTrace() bool { - return isplog.IsLevelEnabled(logrus.TraceLevel) -} - -func (l *LoggerAdapter) IsDebug() bool { - return isplog.IsLevelEnabled(logrus.DebugLevel) -} - -func (l *LoggerAdapter) IsInfo() bool { - return isplog.IsLevelEnabled(logrus.InfoLevel) -} - -func (l *LoggerAdapter) IsWarn() bool { - return isplog.IsLevelEnabled(logrus.WarnLevel) -} - -func (l *LoggerAdapter) IsError() bool { - return isplog.IsLevelEnabled(logrus.ErrorLevel) -} - -// No need to implement that as Raft doesn't use this method. -func (l *LoggerAdapter) With(args ...interface{}) hclog.Logger { - return l -} - -func (l *LoggerAdapter) ImpliedArgs() []interface{} { - return nil -} - -func (l *LoggerAdapter) Named(name string) hclog.Logger { - sl := *l - if sl.name != "" { - sl.name = sl.name + "." + name - } else { - sl.name = name - } - return &sl -} - -func (l *LoggerAdapter) Name() string { - return l.name -} - -func (l *LoggerAdapter) ResetNamed(name string) hclog.Logger { - sl := *l - sl.name = name - return &sl -} - -// Skip -func (l *LoggerAdapter) SetLevel(level hclog.Level) { -} - -func (l *LoggerAdapter) StandardLogger(opts *hclog.StandardLoggerOptions) *log.Logger { - return log.New(isplog.GetOutput(), "", 0) -} - -func (l *LoggerAdapter) StandardWriter(opts *hclog.StandardLoggerOptions) io.Writer { - return isplog.GetOutput() -} diff --git a/raft/manager.go b/raft/manager.go index 055eef5..e9f22e2 100644 --- a/raft/manager.go +++ b/raft/manager.go @@ -1,6 +1,7 @@ package raft import ( + "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/integration-system/gds/config" @@ -104,13 +105,15 @@ func (r *Raft) listenLeader() { } } -func NewRaft(tcpListener net.Listener, configuration config.ClusterConfiguration, state raft.FSM) (*Raft, error) { +func NewRaft(tcpListener net.Listener, configuration config.ClusterConfiguration, state raft.FSM, logger hclog.Logger) (*Raft, error) { logStore, store, snapshotStore, err := makeStores(configuration) if err != nil { return nil, err } - netLogger := &LoggerAdapter{name: "RAFT-NET"} + raftNetLogger, raftLogger := &logger, &logger + netLogger := *raftNetLogger + netLogger.Named("RAFT-NET") streamLayer := &StreamLayer{Listener: tcpListener} config := &raft.NetworkTransportConfig{ Stream: streamLayer, @@ -122,7 +125,8 @@ func NewRaft(tcpListener net.Listener, configuration config.ClusterConfiguration trans := raft.NewNetworkTransportWithConfig(config) cfg := raft.DefaultConfig() - cfg.Logger = &LoggerAdapter{name: "RAFT"} + cfg.Logger = *raftLogger + cfg.Logger.Named("RAFT") cfg.LocalID = raft.ServerID(configuration.OuterAddress) r, err := raft.NewRaft(cfg, state, logStore, store, snapshotStore, trans) if err != nil { diff --git a/scheduler.go b/scheduler.go index 21975b4..508487e 100644 --- a/scheduler.go +++ b/scheduler.go @@ -3,6 +3,7 @@ package gds import ( "context" "fmt" + "github.com/hashicorp/go-hclog" "time" "github.com/hashicorp/go-multierror" @@ -12,7 +13,6 @@ import ( "github.com/integration-system/gds/provider" "github.com/integration-system/gds/store" "github.com/integration-system/gds/utils" - log "github.com/integration-system/isp-log" ) type Scheduler interface { @@ -128,18 +128,22 @@ func (s *scheduler) Shutdown(ctx context.Context) error { } func NewScheduler(config config.ClusterConfiguration) (Scheduler, error) { - // TODO remove - _ = log.SetLevel("fatal") - //_ = log.SetLevel("debug") + var logger hclog.Logger + if config.Logger != nil { + logger = config.Logger + } else { + logger = hclog.Default() + logger.Named("gds") + } executedJobsCh := make(chan cluster.JobExecuted, 100) typeProvider := provider.NewTypeProvider() executorRegistry := newDefaultExecutorRegistry() - executor := newDefaultRuntimeExecutor(executorRegistry, executedJobsCh, config.JobExecutionTimeout) + executor := newDefaultRuntimeExecutor(executorRegistry, executedJobsCh, config.JobExecutionTimeout, logger) - clusterHandler := NewClusterHandler(typeProvider, executor) + clusterHandler := NewClusterHandler(typeProvider, executor, logger) - raftAdapter, err := NewRaftAdapter(config, clusterHandler, typeProvider) + raftAdapter, err := NewRaftAdapter(config, clusterHandler, typeProvider, logger) if err != nil { return nil, err } diff --git a/store/store.go b/store/store.go index 1103b9e..4029c08 100644 --- a/store/store.go +++ b/store/store.go @@ -4,10 +4,10 @@ import ( "encoding/binary" json2 "encoding/json" "fmt" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" "github.com/integration-system/gds/cluster" "github.com/integration-system/gds/provider" - log "github.com/integration-system/isp-log" jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" "io" @@ -23,17 +23,23 @@ type Store struct { lock sync.RWMutex handlers map[uint64]func(WritableState, []byte) (interface{}, error) typeProvider provider.TypeProvider + + logger hclog.Logger } func (s *Store) Apply(l *raft.Log) interface{} { s.lock.Lock() - defer s.lock.Unlock() + defer func() { + s.lock.Unlock() + if r := recover(); r != nil { + s.logger.Error(fmt.Sprintf("panic: %v", r)) + } + }() if len(l.Data) < 8 { - log.Errorf(0, "invalid log data command: %s", l.Data) + s.logger.Error(fmt.Sprintf("invalid log data command: %s", l.Data)) } command := binary.BigEndian.Uint64(l.Data[:8]) - //log.Debugf(0, "Apply %d command. Data: %s", command, l.Data[8:]) var ( result interface{} @@ -43,10 +49,10 @@ func (s *Store) Apply(l *raft.Log) interface{} { result, err = handler(s.state, l.Data[8:]) } else { err = fmt.Errorf("unknown log command %d", command) - log.WithMetadata(map[string]interface{}{ - "command": command, - "body": string(l.Data), - }).Error(0, "unknown log command") + s.logger.Error("unknown log command", + "command", command, + "body", string(l.Data), + ) } bytes, e := json.Marshal(result) diff --git a/ws/cluster.go b/ws/cluster.go index faee5e0..f06289d 100644 --- a/ws/cluster.go +++ b/ws/cluster.go @@ -1,12 +1,17 @@ package ws import ( + "fmt" "github.com/integration-system/gds/cluster" etp "github.com/integration-system/isp-etp-go" - log "github.com/integration-system/isp-log" ) func (h *SocketEventHandler) applyCommandOnLeader(_ etp.Conn, cmd []byte) []byte { + defer func() { + if err := recover(); err != nil { + h.logger.Error(fmt.Sprintf("panic: %v", err)) + } + }() cmdCopy := make([]byte, len(cmd)) copy(cmdCopy, cmd) obj, err := h.clusterClient.SyncApplyOnLeader(cmdCopy) @@ -15,13 +20,13 @@ func (h *SocketEventHandler) applyCommandOnLeader(_ etp.Conn, cmd []byte) []byte logResponse.ApplyError = err.Error() data, err := json.Marshal(obj) if err != nil { - log.Fatalf(0, "marshaling ApplyLogResponse: %v", err) + panic(fmt.Errorf("marshaling ApplyLogResponse: %v", err)) } return data } data, err := json.Marshal(obj) if err != nil { - log.Fatalf(0, "marshaling ApplyLogResponse: %v", err) + panic(fmt.Errorf("marshaling ApplyLogResponse: %v", err)) } return data } diff --git a/ws/root.go b/ws/root.go index 680478d..3b5bb89 100644 --- a/ws/root.go +++ b/ws/root.go @@ -1,9 +1,10 @@ package ws import ( + "fmt" + "github.com/hashicorp/go-hclog" "github.com/integration-system/gds/cluster" etp "github.com/integration-system/isp-etp-go" - log "github.com/integration-system/isp-log" jsoniter "github.com/json-iterator/go" ) @@ -14,6 +15,7 @@ var ( type SocketEventHandler struct { server etp.Server clusterClient *cluster.Client + logger hclog.Logger } func (h *SocketEventHandler) SubscribeAll() { @@ -25,6 +27,11 @@ func (h *SocketEventHandler) SubscribeAll() { } func (h *SocketEventHandler) handleConnect(conn etp.Conn) { + defer func() { + if err := recover(); err != nil { + h.logger.Error(fmt.Sprintf("panic: %v", err)) + } + }() peerID := GetPeerID(conn) if peerID == "" { return @@ -38,6 +45,11 @@ func (h *SocketEventHandler) handleConnect(conn etp.Conn) { } func (h *SocketEventHandler) handleDisconnect(conn etp.Conn, _ error) { + defer func() { + if err := recover(); err != nil { + h.logger.Error(fmt.Sprintf("panic: %v", err)) + } + }() peerID := GetPeerID(conn) if peerID == "" { return @@ -51,12 +63,13 @@ func (h *SocketEventHandler) handleDisconnect(conn etp.Conn, _ error) { } func (h *SocketEventHandler) handleError(_ etp.Conn, err error) { - log.Debugf(0, "isp-etp: %v", err) + h.logger.Debug(fmt.Sprintf("isp-etp: %v", err)) } -func NewSocketEventHandler(server etp.Server, client *cluster.Client) *SocketEventHandler { +func NewSocketEventHandler(server etp.Server, client *cluster.Client, logger hclog.Logger) *SocketEventHandler { return &SocketEventHandler{ server: server, clusterClient: client, + logger: logger, } } From 02dc27451cf3a1eb86dbf9077c56a5637b9107a3 Mon Sep 17 00:00:00 2001 From: DeniskaRediska Date: Thu, 20 Feb 2020 13:05:58 +0300 Subject: [PATCH 2/4] ref logger --- raft/manager.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/raft/manager.go b/raft/manager.go index e9f22e2..dac696a 100644 --- a/raft/manager.go +++ b/raft/manager.go @@ -111,9 +111,7 @@ func NewRaft(tcpListener net.Listener, configuration config.ClusterConfiguration return nil, err } - raftNetLogger, raftLogger := &logger, &logger - netLogger := *raftNetLogger - netLogger.Named("RAFT-NET") + netLogger := logger.Named("RAFT-NET") streamLayer := &StreamLayer{Listener: tcpListener} config := &raft.NetworkTransportConfig{ Stream: streamLayer, @@ -125,8 +123,7 @@ func NewRaft(tcpListener net.Listener, configuration config.ClusterConfiguration trans := raft.NewNetworkTransportWithConfig(config) cfg := raft.DefaultConfig() - cfg.Logger = *raftLogger - cfg.Logger.Named("RAFT") + cfg.Logger = logger.Named("RAFT") cfg.LocalID = raft.ServerID(configuration.OuterAddress) r, err := raft.NewRaft(cfg, state, logStore, store, snapshotStore, trans) if err != nil { From ac40824a5add70a1151dd57cf5ca5af6764b9aa7 Mon Sep 17 00:00:00 2001 From: DeniskaRediska Date: Thu, 20 Feb 2020 13:06:55 +0300 Subject: [PATCH 3/4] ref logger --- scheduler.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/scheduler.go b/scheduler.go index 508487e..1caca33 100644 --- a/scheduler.go +++ b/scheduler.go @@ -132,8 +132,7 @@ func NewScheduler(config config.ClusterConfiguration) (Scheduler, error) { if config.Logger != nil { logger = config.Logger } else { - logger = hclog.Default() - logger.Named("gds") + logger = hclog.Default().Named("gds") } executedJobsCh := make(chan cluster.JobExecuted, 100) From c956afd0918da12fe7768c1426611dfc85b5e624 Mon Sep 17 00:00:00 2001 From: DeniskaRediska Date: Thu, 20 Feb 2020 14:52:38 +0300 Subject: [PATCH 4/4] up description recovery panic --- cluster/client.go | 2 +- cluster_handler.go | 6 +++--- store/store.go | 2 +- ws/cluster.go | 2 +- ws/root.go | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cluster/client.go b/cluster/client.go index 1bdb3e5..b9ede02 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -147,7 +147,7 @@ func (client *Client) SyncApplyHelper(command []byte, commandName string) (inter func (client *Client) listenLeaderNotifications() { defer func() { if err := recover(); err != nil { - client.logger.Error(fmt.Sprintf("panic: %v", err)) + client.logger.Error(fmt.Sprintf("panic on listen raft client: %v", err)) } close(client.leaderCh) }() diff --git a/cluster_handler.go b/cluster_handler.go index 92669d2..bcc117d 100644 --- a/cluster_handler.go +++ b/cluster_handler.go @@ -201,7 +201,7 @@ func (cl *ClusterHandler) listenLeaderCh(mainStore *store.Store) { func (cl *ClusterHandler) checkPeers(closeCh chan struct{}, visitState func(f func(store.ReadonlyState))) { defer func() { if err := recover(); err != nil { - cl.logger.Error(fmt.Sprintf("panic: %v", err)) + cl.logger.Error(fmt.Sprintf("panic on check peers: %v", err)) } }() time.Sleep(200 * time.Millisecond) @@ -251,7 +251,7 @@ func (cl *ClusterHandler) checkJobs(closeCh chan struct{}, visitState func(f fun func (cl *ClusterHandler) backgroundAssigningJobs(closeCh chan struct{}, assignJobsCh chan []string) { defer func() { if err := recover(); err != nil { - cl.logger.Error(fmt.Sprintf("panic: %v", err)) + cl.logger.Error(fmt.Sprintf("panic on assigning jobs: %v", err)) } }() jobKeys := make([]string, 0, batchSize) @@ -313,7 +313,7 @@ func (cl *ClusterHandler) assignJobs(keys []string) { func (cl *ClusterHandler) handleExecutedJobs(executedJobsCh <-chan cluster.JobExecuted) { defer func() { if err := recover(); err != nil { - cl.logger.Error(fmt.Sprintf("panic: %v", err)) + cl.logger.Error(fmt.Sprintf("panic on executed jobs: %v", err)) } }() for payload := range executedJobsCh { diff --git a/store/store.go b/store/store.go index 4029c08..87390c1 100644 --- a/store/store.go +++ b/store/store.go @@ -32,7 +32,7 @@ func (s *Store) Apply(l *raft.Log) interface{} { defer func() { s.lock.Unlock() if r := recover(); r != nil { - s.logger.Error(fmt.Sprintf("panic: %v", r)) + s.logger.Error(fmt.Sprintf("panic on raft store: %v", r)) } }() diff --git a/ws/cluster.go b/ws/cluster.go index f06289d..6fb038f 100644 --- a/ws/cluster.go +++ b/ws/cluster.go @@ -9,7 +9,7 @@ import ( func (h *SocketEventHandler) applyCommandOnLeader(_ etp.Conn, cmd []byte) []byte { defer func() { if err := recover(); err != nil { - h.logger.Error(fmt.Sprintf("panic: %v", err)) + h.logger.Error(fmt.Sprintf("panic on ws command: %v", err)) } }() cmdCopy := make([]byte, len(cmd)) diff --git a/ws/root.go b/ws/root.go index 3b5bb89..7c2ba48 100644 --- a/ws/root.go +++ b/ws/root.go @@ -29,7 +29,7 @@ func (h *SocketEventHandler) SubscribeAll() { func (h *SocketEventHandler) handleConnect(conn etp.Conn) { defer func() { if err := recover(); err != nil { - h.logger.Error(fmt.Sprintf("panic: %v", err)) + h.logger.Error(fmt.Sprintf("panic on ws connect: %v", err)) } }() peerID := GetPeerID(conn) @@ -47,7 +47,7 @@ func (h *SocketEventHandler) handleConnect(conn etp.Conn) { func (h *SocketEventHandler) handleDisconnect(conn etp.Conn, _ error) { defer func() { if err := recover(); err != nil { - h.logger.Error(fmt.Sprintf("panic: %v", err)) + h.logger.Error(fmt.Sprintf("panic on ws disconnect: %v", err)) } }() peerID := GetPeerID(conn)