diff --git a/Makefile b/Makefile index 454b57079..fa23aa53f 100644 --- a/Makefile +++ b/Makefile @@ -105,7 +105,7 @@ endif check-static: tools/bin/golangci-lint $(GO) mod vendor - tools/bin/golangci-lint --disable errcheck run $$($(PACKAGE_DIRECTORIES)) + tools/bin/golangci-lint run $$($(PACKAGE_DIRECTORIES)) clean: go clean -i ./... diff --git a/drainer/server.go b/drainer/server.go index 4b7ff5046..ec3877255 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -335,13 +335,19 @@ func (s *Server) ApplyAction(w http.ResponseWriter, r *http.Request) { log.Info("receive apply action request", zap.String("nodeID", nodeID), zap.String("action", action)) if nodeID != s.ID { - rd.JSON(w, http.StatusOK, util.ErrResponsef("invalid nodeID %s, this pump's nodeID is %s", nodeID, s.ID)) + err := rd.JSON(w, http.StatusOK, util.ErrResponsef("invalid nodeID %s, this pump's nodeID is %s", nodeID, s.ID)) + if err != nil { + log.Error("Failed to render JSON response", zap.Error(err)) + } return } s.statusMu.RLock() if s.status.State != node.Online { - rd.JSON(w, http.StatusOK, util.ErrResponsef("this pump's state is %s, apply %s failed!", s.status.State, action)) + err := rd.JSON(w, http.StatusOK, util.ErrResponsef("this pump's state is %s, apply %s failed!", s.status.State, action)) + if err != nil { + log.Error("Failed to render JSON response", zap.Error(err)) + } s.statusMu.RUnlock() return } @@ -355,13 +361,19 @@ func (s *Server) ApplyAction(w http.ResponseWriter, r *http.Request) { s.status.State = node.Closing default: s.statusMu.Unlock() - rd.JSON(w, http.StatusOK, util.ErrResponsef("invalid action %s", action)) + err := rd.JSON(w, http.StatusOK, util.ErrResponsef("invalid action %s", action)) + if err != nil { + log.Error("Failed to render JSON response", zap.Error(err)) + } return } s.statusMu.Unlock() go s.Close() - rd.JSON(w, http.StatusOK, util.SuccessResponse(fmt.Sprintf("apply action %s success!", action), nil)) + err := rd.JSON(w, http.StatusOK, util.SuccessResponse(fmt.Sprintf("apply action %s success!", action), nil)) + if err != nil { + log.Error("Failed to render JSON response", zap.Error(err)) + } } // GetLatestTS returns the last binlog's commit ts which synced to downstream. @@ -370,7 +382,10 @@ func (s *Server) GetLatestTS(w http.ResponseWriter, r *http.Request) { IndentJSON: true, }) ts := s.syncer.GetLatestCommitTS() - rd.JSON(w, http.StatusOK, util.SuccessResponse("get drainer's latest ts success!", map[string]int64{"ts": ts})) + err := rd.JSON(w, http.StatusOK, util.SuccessResponse("get drainer's latest ts success!", map[string]int64{"ts": ts})) + if err != nil { + log.Error("Failed to render JSON response", zap.Error(err)) + } } // commitStatus commit the node's last status to pd when close the server. diff --git a/drainer/server_test.go b/drainer/server_test.go index bb3d87a6c..8ba065956 100644 --- a/drainer/server_test.go +++ b/drainer/server_test.go @@ -346,8 +346,9 @@ func (s *newServerSuite) TestInvalidDestDBType(c *C) { cfg.DataDir = path.Join(c.MkDir(), "drainer") cfg.ListenAddr = "http://" + cfg.ListenAddr cfg.SyncerCfg.DestDBType = "nothing" - cfg.adjustConfig() - _, err := NewServer(cfg) + err := cfg.adjustConfig() + c.Assert(err, IsNil) + _, err = NewServer(cfg) c.Assert(err, ErrorMatches, ".*unknown DestDBType.*") c.Assert(cfg.SyncerCfg.To.ClusterID, Equals, uint64(8012)) } diff --git a/drainer/status.go b/drainer/status.go index b620edd55..a08464980 100644 --- a/drainer/status.go +++ b/drainer/status.go @@ -16,6 +16,9 @@ package drainer import ( "encoding/json" "net/http" + + "github.com/pingcap/log" + "go.uber.org/zap" ) // HTTPStatus exposes current status of the collector via HTTP @@ -28,5 +31,7 @@ type HTTPStatus struct { // Status implements http.ServeHTTP interface func (s *HTTPStatus) Status(w http.ResponseWriter, r *http.Request) { - json.NewEncoder(w).Encode(s) + if err := json.NewEncoder(w).Encode(s); err != nil { + log.Error("Failed to encode status", zap.Error(err), zap.Any("status", *s)) + } } diff --git a/pump/node_test.go b/pump/node_test.go index 3b13faf81..6ab80cf87 100644 --- a/pump/node_test.go +++ b/pump/node_test.go @@ -22,6 +22,9 @@ import ( "strings" "time" + "github.com/pingcap/log" + "go.uber.org/zap" + . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/tidb-binlog/pkg/etcd" @@ -195,7 +198,11 @@ func (s *mockDrainerServer) Start() (*grpc.Server, error) { } gs := grpc.NewServer() binlog.RegisterCisternServer(gs, s) - go gs.Serve(lis) + go func() { + if err := gs.Serve(lis); err != nil { + log.Error("Unexpected exit of gRPC server", zap.Error(err)) + } + }() return gs, nil } diff --git a/pump/server.go b/pump/server.go index e744cbbef..7be82bfd4 100644 --- a/pump/server.go +++ b/pump/server.go @@ -400,7 +400,11 @@ func (s *Server) Start() error { ) httpL := m.Match(cmux.HTTP1Fast()) - go s.gs.Serve(grpcL) + go func() { + if err := s.gs.Serve(grpcL); err != nil { + log.Error("Unexpected exit of gRPC server", zap.Error(err)) + } + }() router := mux.NewRouter() router.HandleFunc("/status", s.Status).Methods("GET") @@ -656,7 +660,9 @@ func (s *Server) startMetrics() { func (s *Server) AllDrainers(w http.ResponseWriter, r *http.Request) { node, ok := s.node.(*pumpNode) if !ok { - json.NewEncoder(w).Encode("can't provide service") + if err := json.NewEncoder(w).Encode("can't provide service"); err != nil { + log.Error("Failed to encode msg", zap.Error(err)) + } return } @@ -665,7 +671,9 @@ func (s *Server) AllDrainers(w http.ResponseWriter, r *http.Request) { log.Error("get pumps failed", zap.Error(err)) } - json.NewEncoder(w).Encode(pumps) + if err := json.NewEncoder(w).Encode(pumps); err != nil { + log.Error("Failed to encode pumps", zap.Error(err), zap.Any("pumps", pumps)) + } } // Status exposes pumps' status to HTTP handler. @@ -776,12 +784,18 @@ func (s *Server) ApplyAction(w http.ResponseWriter, r *http.Request) { log.Info("receive action", zap.String("nodeID", nodeID), zap.String("action", action)) if nodeID != s.node.NodeStatus().NodeID { - rd.JSON(w, http.StatusOK, util.ErrResponsef("invalide nodeID %s, this pump's nodeID is %s", nodeID, s.node.NodeStatus().NodeID)) + err := rd.JSON(w, http.StatusOK, util.ErrResponsef("invalide nodeID %s, this pump's nodeID is %s", nodeID, s.node.NodeStatus().NodeID)) + if err != nil { + log.Error("Failed to render JSON response", zap.Error(err)) + } return } if s.node.NodeStatus().State != node.Online { - rd.JSON(w, http.StatusOK, util.ErrResponsef("this pump's state is %s, apply %s failed!", s.node.NodeStatus().State, action)) + err := rd.JSON(w, http.StatusOK, util.ErrResponsef("this pump's state is %s, apply %s failed!", s.node.NodeStatus().State, action)) + if err != nil { + log.Error("Failed to render JSON response", zap.Error(err)) + } return } @@ -793,12 +807,18 @@ func (s *Server) ApplyAction(w http.ResponseWriter, r *http.Request) { log.Info("pump's state change to closing", zap.String("nodeID", nodeID)) s.node.NodeStatus().State = node.Closing default: - rd.JSON(w, http.StatusOK, util.ErrResponsef("invalide action %s", action)) + err := rd.JSON(w, http.StatusOK, util.ErrResponsef("invalide action %s", action)) + if err != nil { + log.Error("Failed to render JSON response", zap.Error(err)) + } return } go s.Close() - rd.JSON(w, http.StatusOK, util.SuccessResponse(fmt.Sprintf("apply action %s success!", action), nil)) + err := rd.JSON(w, http.StatusOK, util.SuccessResponse(fmt.Sprintf("apply action %s success!", action), nil)) + if err != nil { + log.Error("Failed to render JSON response", zap.Error(err)) + } } var utilGetTSO = util.GetTSO diff --git a/pump/server_test.go b/pump/server_test.go index 0957f3cf6..1550a1108 100644 --- a/pump/server_test.go +++ b/pump/server_test.go @@ -475,10 +475,10 @@ func (s *gcBinlogFileSuite) TestShouldGCMinDrainerTSO(c *C) { outAlertGCMS := millisecond + (earlyAlertGC+10*time.Minute).Nanoseconds()/1000/1000 outAlertGCTS := int64(oracle.EncodeTSO(outAlertGCMS)) - registry.UpdateNode(ctx, "drainers/1", &node.Status{MaxCommitTS: inAlertGCTS, State: node.Online}) - registry.UpdateNode(ctx, "drainers/2", &node.Status{MaxCommitTS: 1002, State: node.Online}) + mustUpdateNode(ctx, registry, "drainers/1", &node.Status{MaxCommitTS: inAlertGCTS, State: node.Online}) + mustUpdateNode(ctx, registry, "drainers/2", &node.Status{MaxCommitTS: 1002, State: node.Online}) // drainers/3 is set to be offline, so its MaxCommitTS is expected to be ignored - registry.UpdateNode(ctx, "drainers/3", &node.Status{MaxCommitTS: outAlertGCTS, State: node.Offline}) + mustUpdateNode(ctx, registry, "drainers/3", &node.Status{MaxCommitTS: outAlertGCTS, State: node.Offline}) // Set a shorter interval because we don't really want to wait 1 hour origInterval := gcInterval @@ -499,6 +499,12 @@ func (s *gcBinlogFileSuite) TestShouldGCMinDrainerTSO(c *C) { // todo: add in and out of alert test while binlog has failpoint } +func mustUpdateNode(pctx context.Context, r *node.EtcdRegistry, prefix string, status *node.Status) { + if err := r.UpdateNode(pctx, prefix, status); err != nil { + panic(err) + } +} + type waitCommitTSSuite struct{} var _ = Suite(&waitCommitTSSuite{}) @@ -745,7 +751,11 @@ func (s *startServerSuite) TestStartPumpServer(c *C) { close(sig) p.Close() }() - go p.Start() + go func() { + if err := p.Start(); err != nil { + c.Logf("Pump server stopped in error: %v", err) + } + }() // wait until the server is online timeEnd := time.After(5 * time.Second) diff --git a/pump/storage/log.go b/pump/storage/log.go index 0325c8746..e31e344ec 100644 --- a/pump/storage/log.go +++ b/pump/storage/log.go @@ -369,7 +369,11 @@ func seekToNextRecord(reader *bufio.Reader) (bytes int, err error) { return } - reader.Discard(1) + if _, err = reader.Discard(1); err != nil { + // If we reach here, we've already successfully called `Peek(4)` + // and `Discard(1)` should not fail. + panic(err) + } bytes++ } }