Skip to content

Commit

Permalink
Merge pull request #2 from integration-system/update_logger
Browse files Browse the repository at this point in the history
update logger
  • Loading branch information
pymq authored Feb 21, 2020
2 parents 43d48f2 + c956afd commit beb8877
Show file tree
Hide file tree
Showing 16 changed files with 128 additions and 187 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Note that all nodes in cluster must register the same JobExecutors because all n

# Planned
- [Project TODOs](https://todos.tickgit.com/browse?repo=https://github.com/integration-system/gds)
- Remove `isp-log` dependency, pass Raft logger to user API
- Implement new Job types
- Add sync.Pool to cluster.prepareCommand()
- Library API improvements
Expand Down
35 changes: 21 additions & 14 deletions cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package cluster

import (
"errors"
"fmt"
"github.com/hashicorp/go-hclog"
"github.com/integration-system/gds/raft"
"github.com/integration-system/gds/utils"
log "github.com/integration-system/isp-log"
jsoniter "github.com/json-iterator/go"
"sync"
"time"
Expand All @@ -30,6 +31,8 @@ type Client struct {
leaderClient *SocketLeaderClient
prevLeaderIds []string
leaderCh chan bool

logger hclog.Logger
}

func (client *Client) Shutdown() error {
Expand Down Expand Up @@ -125,31 +128,34 @@ func (client *Client) SyncApplyOnLeader(command []byte) (*ApplyLogResponse, erro
}

func (client *Client) SyncApplyHelper(command []byte, commandName string) (interface{}, error) {
// TODO remove
// log.Debugf(0, "peer %s applying %s to cluster: %s", client.LocalID(), commandName, string(command))
applyLogResponse, err := client.SyncApply(command)
if err != nil {
log.Warnf(0, "apply %s: %v", commandName, err)
client.logger.Warn(fmt.Sprintf("apply %s: %v", commandName, err))
return nil, err
}
if applyLogResponse != nil && applyLogResponse.ApplyError != "" {
log.WithMetadata(map[string]interface{}{
"result": string(applyLogResponse.Result),
"applyError": applyLogResponse.ApplyError,
"commandName": commandName,
}).Warnf(0, "apply command")
client.logger.Warn("apply command",
"result", string(applyLogResponse.Result),
"applyError", applyLogResponse.ApplyError,
"commandName", commandName,
)
return applyLogResponse.Result, errors.New(applyLogResponse.ApplyError)
}
return applyLogResponse.Result, nil
}

func (client *Client) listenLeaderNotifications() {
defer close(client.leaderCh)
defer func() {
if err := recover(); err != nil {
client.logger.Error(fmt.Sprintf("panic on listen raft client: %v", err))
}
close(client.leaderCh)
}()

for n := range client.r.LeaderNotificationsCh() {
client.leaderMu.Lock()
if client.leaderClient != nil {
log.Debugf(0, "close previous leader ws connection %s", client.leaderState.leaderAddr)
client.logger.Debug(fmt.Sprintf("close previous leader ws connection %s", client.leaderState.leaderAddr))
client.leaderClient.Close()
client.leaderClient = nil
}
Expand Down Expand Up @@ -179,9 +185,9 @@ func (client *Client) listenLeaderNotifications() {
_, _ = client.SyncApplyHelper(cmd, "AddPeerCommand")
}(client.prevLeaderIds)
} else {
leaderClient := NewSocketLeaderClient(n.CurrentLeaderAddress, client.r.LocalID())
leaderClient := NewSocketLeaderClient(n.CurrentLeaderAddress, client.r.LocalID(), client.logger)
if err := leaderClient.Dial(leaderConnectionTimeout); err != nil {
log.Errorf(0, "could not connect to leader: %v", err)
client.logger.Error(fmt.Sprintf("could not connect to leader: %v", err))
continue
}
client.leaderClient = leaderClient
Expand All @@ -206,11 +212,12 @@ type leaderState struct {
leaderAddr string
}

func NewRaftClusterClient(r *raft.Raft) *Client {
func NewRaftClusterClient(r *raft.Raft, logger hclog.Logger) *Client {
client := &Client{
r: r,
leaderState: leaderState{},
leaderCh: make(chan bool, 5),
logger: logger,
}
go client.listenLeaderNotifications()

Expand Down
4 changes: 2 additions & 2 deletions cluster/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cluster
import (
"bytes"
"encoding/binary"
log "github.com/integration-system/isp-log"
"fmt"
"time"
)

Expand Down Expand Up @@ -69,7 +69,7 @@ func prepareCommand(command uint64, payload interface{}) []byte {
buf := bytes.NewBuffer(cmd)
err := json.NewEncoder(buf).Encode(payload)
if err != nil {
log.Fatalf(0, "prepare log command: %v", err)
panic(fmt.Errorf("prepare log command: %v", err))
}
return buf.Bytes()
}
21 changes: 12 additions & 9 deletions cluster/leader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"fmt"
"github.com/cenkalti/backoff"
"github.com/hashicorp/go-hclog"
etp "github.com/integration-system/isp-etp-go/client"
log "github.com/integration-system/isp-log"
"net"
"net/http"
"net/url"
Expand All @@ -17,6 +17,8 @@ type SocketLeaderClient struct {
url string
globalCtx context.Context
cancel context.CancelFunc

logger hclog.Logger
}

func (c *SocketLeaderClient) Ack(data []byte, timeout time.Duration) ([]byte, error) {
Expand All @@ -40,35 +42,36 @@ func (c *SocketLeaderClient) Close() {
c.cancel()
err := c.client.Close()
if err != nil {
log.Warnf(0, "leader client close err: %v", err)
c.logger.Warn(fmt.Sprintf("leader client close err: %v", err))
}
log.Debug(0, "leader client connection closed")
c.logger.Debug(fmt.Sprintf("leader client connection closed"))
}

func NewSocketLeaderClient(leaderAddr, localID string) *SocketLeaderClient {
func NewSocketLeaderClient(leaderAddr, localID string, logger hclog.Logger) *SocketLeaderClient {
etpConfig := etp.Config{
HttpClient: http.DefaultClient,
}
client := etp.NewClient(etpConfig)
leaderClient := &SocketLeaderClient{
client: client,
logger: logger,
url: getURL(leaderAddr, localID),
}
ctx, cancel := context.WithCancel(context.Background())
leaderClient.globalCtx = ctx
leaderClient.cancel = cancel

leaderClient.client.OnDisconnect(func(err error) {
log.WithMetadata(map[string]interface{}{
"leaderAddr": leaderAddr,
}).Warn(0, "leader client disconnected")
logger.Warn("leader client disconnected",
"leaderAddr", leaderAddr,
)
})

leaderClient.client.OnError(func(err error) {
log.Warnf(0, "leader client on error: %v", err)
logger.Warn(fmt.Sprintf("leader client on error: %v", err))
})
leaderClient.client.OnConnect(func() {
log.Debug(0, "leader client connected")
logger.Debug(fmt.Sprintf("leader client connected"))
})
return leaderClient
}
Expand Down
22 changes: 21 additions & 1 deletion cluster_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gds

import (
"fmt"
"github.com/hashicorp/go-hclog"
"sync"
"time"

Expand Down Expand Up @@ -30,6 +32,8 @@ type ClusterHandler struct {
nextPeer *utils.RoundRobinStrings
assignJobsChLock sync.RWMutex
assignJobsCh chan []string

logger hclog.Logger
}

func (cl *ClusterHandler) HandleAddPeerCommand(state store.WritableState, data []byte) (interface{}, error) {
Expand Down Expand Up @@ -195,6 +199,11 @@ func (cl *ClusterHandler) listenLeaderCh(mainStore *store.Store) {

// Used to detect changes in onlinePeers while there was no leader in cluster (e.g. another peer got down).
func (cl *ClusterHandler) checkPeers(closeCh chan struct{}, visitState func(f func(store.ReadonlyState))) {
defer func() {
if err := recover(); err != nil {
cl.logger.Error(fmt.Sprintf("panic on check peers: %v", err))
}
}()
time.Sleep(200 * time.Millisecond)

var oldServers []string
Expand Down Expand Up @@ -240,6 +249,11 @@ func (cl *ClusterHandler) checkJobs(closeCh chan struct{}, visitState func(f fun
}

func (cl *ClusterHandler) backgroundAssigningJobs(closeCh chan struct{}, assignJobsCh chan []string) {
defer func() {
if err := recover(); err != nil {
cl.logger.Error(fmt.Sprintf("panic on assigning jobs: %v", err))
}
}()
jobKeys := make([]string, 0, batchSize)
ticker := time.NewTicker(assigningInterval)
defer ticker.Stop()
Expand Down Expand Up @@ -297,17 +311,23 @@ func (cl *ClusterHandler) assignJobs(keys []string) {
}

func (cl *ClusterHandler) handleExecutedJobs(executedJobsCh <-chan cluster.JobExecuted) {
defer func() {
if err := recover(); err != nil {
cl.logger.Error(fmt.Sprintf("panic on executed jobs: %v", err))
}
}()
for payload := range executedJobsCh {
cmd := cluster.PrepareJobExecutedCommand(payload.JobKey, payload.Error, payload.ExecutedTime)
// TODO handle errors. retry?
_, _ = cl.cluster.SyncApplyHelper(cmd, "JobExecutedCommand")
}
}

func NewClusterHandler(typeProvider provider.TypeProvider, executor executor) *ClusterHandler {
func NewClusterHandler(typeProvider provider.TypeProvider, executor executor, logger hclog.Logger) *ClusterHandler {
return &ClusterHandler{
typeProvider: typeProvider,
executor: executor,
nextPeer: utils.NewRoundRobinStrings(make([]string, 0)),
logger: logger,
}
}
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"github.com/hashicorp/go-hclog"
"time"
)

Expand All @@ -17,4 +18,6 @@ type ClusterConfiguration struct {
OuterAddress string
// Addresses of peers in cluster including OuterAddress in format address:port
Peers []string
// Logger is a user-provided hc-log logger. If nil, a logger writing to Stderr
Logger hclog.Logger
}
11 changes: 7 additions & 4 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"errors"
"fmt"
"github.com/hashicorp/go-hclog"
"github.com/integration-system/gds/cluster"
"github.com/integration-system/gds/jobs"
"github.com/integration-system/gds/utils"
log "github.com/integration-system/isp-log"
"sync"
"time"
)
Expand All @@ -33,6 +33,8 @@ type defaultRuntimeExecutor struct {
fMap map[string]*future
executedJobsCh chan<- cluster.JobExecuted
executionTimeout time.Duration

logger hclog.Logger
}

func (e *defaultRuntimeExecutor) CancelJob(key string) bool {
Expand Down Expand Up @@ -124,7 +126,7 @@ func (e *defaultRuntimeExecutor) makeF(f *future) func() {

exec, ok := e.registry.GetExecutor(f.job.Type())
if !ok {
log.Errorf(0, "defaultRuntimeExecutor: not found executor for job type: %v", f.job.Type())
e.logger.Error(fmt.Sprintf("defaultRuntimeExecutor: not found executor for job type: %v", f.job.Type()))
return
}

Expand Down Expand Up @@ -156,7 +158,7 @@ func (e *defaultRuntimeExecutor) makeF(f *future) func() {
if err != nil {
errStr = err.Error()
// TODO remove log
log.Warnf(0, "defaultRuntimeExecutor: job %s type %s has finished with err '%v'", f.job.Key(), f.job.Type(), err)
e.logger.Warn(fmt.Sprintf("defaultRuntimeExecutor: job %s type %s has finished with err '%v'", f.job.Key(), f.job.Type(), err))
}

executedJob := cluster.JobExecuted{
Expand All @@ -169,7 +171,7 @@ func (e *defaultRuntimeExecutor) makeF(f *future) func() {
}
}

func newDefaultRuntimeExecutor(registry executorRegistry, executedJobsCh chan<- cluster.JobExecuted, executionTimeout time.Duration) executor {
func newDefaultRuntimeExecutor(registry executorRegistry, executedJobsCh chan<- cluster.JobExecuted, executionTimeout time.Duration, logger hclog.Logger) executor {
if executionTimeout == 0 {
executionTimeout = DefaultJobExecutionTimeout
}
Expand All @@ -178,5 +180,6 @@ func newDefaultRuntimeExecutor(registry executorRegistry, executedJobsCh chan<-
fMap: make(map[string]*future),
executedJobsCh: executedJobsCh,
executionTimeout: executionTimeout,
logger: logger,
}
}
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@ require (
github.com/hashicorp/raft v1.1.2-0.20191121160549-9ecdba6a067b
github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617
github.com/integration-system/isp-etp-go v2.0.2+incompatible
github.com/integration-system/isp-log v0.0.0-20191010105142-a22bf2e0b56a
github.com/integration-system/net-mux v1.0.0
github.com/json-iterator/go v1.1.8
github.com/mattn/go-isatty v0.0.11 // indirect
github.com/pkg/errors v0.8.1
github.com/robfig/cron/v3 v3.0.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cast v1.3.1
github.com/stretchr/testify v1.4.0
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 // indirect
gopkg.in/yaml.v2 v2.2.4 // indirect
Expand Down
10 changes: 0 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,10 @@ github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617 h1:CJDRE/2tB
github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617/go.mod h1:aUF6HQr8+t3FC/ZHAC+pZreUBhTaxumuu3L+d37uRxk=
github.com/integration-system/isp-etp-go v2.0.2+incompatible h1:+aQiqH/aMy9pUsifIxfWU/DD8cwbMo24AAIV7Ab6aP4=
github.com/integration-system/isp-etp-go v2.0.2+incompatible/go.mod h1:NdgczkoFCer9SZOx674EVXCcKRIhFe8LYnwNwoIMDhI=
github.com/integration-system/isp-log v0.0.0-20191010105142-a22bf2e0b56a h1:rwJ0kgDrWaRWE4lB3L6/j1vThXSsu3KKwHqZnYCjBRQ=
github.com/integration-system/isp-log v0.0.0-20191010105142-a22bf2e0b56a/go.mod h1:zujfuIMzbVTJ3nCFI38Sfo4kxjAe0A+X9Kql1UPS4NA=
github.com/integration-system/net-mux v1.0.0 h1:UY5y9cDq4ngfYaVzwqcIZ2FDw2l0H4slVDXp1E2gopc=
github.com/integration-system/net-mux v1.0.0/go.mod h1:R2PRrXFCOQRbjYIeiJFA6y2/3CfMYEX20H6yrIN6c7g=
github.com/json-iterator/go v1.1.8 h1:QiWkFLKq0T7mpzwOTu6BzNDbfTE8OLrYhVKYMLF46Ok=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down Expand Up @@ -96,12 +92,7 @@ github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7q
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
Expand All @@ -114,7 +105,6 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191008105621-543471e840be h1:QAcqgptGM8IQBC9K/RC4o+O9YmqEm0diQn9QmZw/0mU=
Expand Down
Loading

0 comments on commit beb8877

Please sign in to comment.