Skip to content

Commit

Permalink
*: Check errors (pingcap#826)
Browse files Browse the repository at this point in the history
*  Check errors
suzaku authored and IANTHEREAL committed Nov 27, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 1fdfd69 commit 4e71dc0
Showing 9 changed files with 84 additions and 24 deletions.
7 changes: 5 additions & 2 deletions drainer/collector.go
Original file line number Diff line number Diff line change
@@ -339,7 +339,10 @@ func (c *Collector) handlePumpStatusUpdate(ctx context.Context, n *node.Status)
func (c *Collector) keepUpdatingStatus(ctx context.Context, fUpdate func(context.Context) error) {
// add all the pump to merger
c.merger.Stop()
fUpdate(ctx)
err := fUpdate(ctx)
if err != nil {
log.Error("Update collector status", zap.Error(err))
}
c.merger.Continue()

// update status when had pump notify or reach wait time
@@ -352,7 +355,7 @@ func (c *Collector) keepUpdatingStatus(ctx context.Context, fUpdate func(context
nr.wg.Done()
case <-time.After(c.interval):
if err := fUpdate(ctx); err != nil {
log.Error("Failed to update collector status", zap.Error(err))
log.Error("Update collector status", zap.Error(err))
}
case err := <-c.errCh:
log.Error("collector meets error", zap.Error(err))
15 changes: 13 additions & 2 deletions drainer/server.go
Original file line number Diff line number Diff line change
@@ -298,12 +298,23 @@ func (s *Server) Start() error {

// register drainer server with gRPC server and start to serve listener
binlog.RegisterCisternServer(s.gs, s)
go s.gs.Serve(grpcL)
go func() {
err := s.gs.Serve(grpcL)
if err != nil {
log.Error("grpc server stopped", zap.Error(err))
}
}()

router := s.initAPIRouter()
http.Handle("/", router)

go http.Serve(httpL, nil)
go func() {
err := http.Serve(httpL, nil)
if err != nil {
// http.Server always return non-nil error, so we don't have to use Error level here
log.Info("drainer http server stopped", zap.Error(err))
}
}()

log.Info("start to server request", zap.String("addr", s.tcpAddr))
if err := m.Serve(); !strings.Contains(err.Error(), "use of closed network connection") {
16 changes: 12 additions & 4 deletions pkg/loader/bench_test.go
Original file line number Diff line number Diff line change
@@ -98,7 +98,9 @@ func benchmarkDelete(b *testing.B, merge bool) {
}

b.ResetTimer()
deleteTable(r.db, r.loader, b.N)
if err := deleteTable(r.db, r.loader, b.N); err != nil {
b.Fatal(err)
}

r.close()
}
@@ -109,11 +111,17 @@ func benchmarkWrite(b *testing.B, merge bool) {
b.Fatal(err)
}

dropTable(r.db, r.loader)
createTable(r.db, r.loader)
if err := dropTable(r.db, r.loader); err != nil {
b.Fatal(err)
}
if err := createTable(r.db, r.loader); err != nil {
b.Fatal(err)
}

b.ResetTimer()
loadTable(r.db, r.loader, b.N)
if err := loadTable(r.db, r.loader, b.N); err != nil {
b.Fatal(err)
}

r.close()
}
6 changes: 4 additions & 2 deletions pkg/loader/executor.go
Original file line number Diff line number Diff line change
@@ -82,8 +82,10 @@ func (tx *tx) exec(query string, args ...interface{}) (gosql.Result, error) {
func (tx *tx) autoRollbackExec(query string, args ...interface{}) (res gosql.Result, err error) {
res, err = tx.exec(query, args...)
if err != nil {
log.Error("exec fail", zap.String("query", query), zap.Reflect("args", args), zap.Error(err))
tx.Rollback()
log.Error("Exec fail, will rollback", zap.String("query", query), zap.Reflect("args", args), zap.Error(err))
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Auto rollback", zap.Error(rbErr))
}
err = errors.Trace(err)
}
return
12 changes: 9 additions & 3 deletions pkg/loader/load.go
Original file line number Diff line number Diff line change
@@ -316,13 +316,17 @@ func (s *loaderImpl) execDDL(ddl *DDL) error {
if len(ddl.Database) > 0 && !isCreateDatabaseDDL(ddl.SQL) {
_, err = tx.Exec(fmt.Sprintf("use %s;", quoteName(ddl.Database)))
if err != nil {
tx.Rollback()
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Rollback failed", zap.Error(rbErr))
}
return err
}
}

if _, err = tx.Exec(ddl.SQL); err != nil {
tx.Rollback()
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Rollback failed", zap.String("sql", ddl.SQL), zap.Error(rbErr))
}
return err
}

@@ -381,7 +385,9 @@ func (s *loaderImpl) singleExec(executor *executor, dmls []*DML) error {
}
}

causality.Add(keys)
if err := causality.Add(keys); err != nil {
log.Error("Add keys to causality failed", zap.Error(err), zap.Strings("keys", keys))
}
key := causality.Get(keys[0])
idx := int(genHashKey(key)) % len(byHash)
byHash[idx] = append(byHash[idx], dml)
36 changes: 28 additions & 8 deletions pump/server.go
Original file line number Diff line number Diff line change
@@ -142,7 +142,10 @@ func NewServer(cfg *Config) (*Server, error) {
return nil, errors.Trace(err)
}

kvstore.Register("tikv", tikv.Driver{})
err = kvstore.Register("tikv", tikv.Driver{})
if err != nil {
return nil, errors.Trace(err)
}
tiPath := fmt.Sprintf("tikv://%s?disableGC=true", urlv.HostString())
tiStore, err := newKVStoreFn(tiPath)
if err != nil {
@@ -377,7 +380,12 @@ func (s *Server) Start() error {
binlog.RegisterPumpServer(s.gs, s)

if s.unixAddr != "" {
go s.gs.Serve(unixLis)
go func() {
err := s.gs.Serve(unixLis)
if err != nil {
log.Error("grpc server stopped", zap.Error(err))
}
}()
}

// grpc and http will use the same tcp connection
@@ -404,7 +412,12 @@ func (s *Server) Start() error {
prometheus.DefaultGatherer = registry
http.Handle("/metrics", promhttp.Handler())

go http.Serve(httpL, nil)
go func() {
err := http.Serve(httpL, nil)
if err != nil {
log.Info("HTTP server stopped", zap.Error(err))
}
}()

previousState := s.node.NodeStatus().State
// register this node
@@ -688,10 +701,14 @@ func (s *Server) BinlogByTS(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, binlog.String())
if len(binlog.PrewriteValue) > 0 {
prewriteValue := new(pb.PrewriteValue)
prewriteValue.Unmarshal(binlog.PrewriteValue)

fmt.Fprint(w, "\n\n PrewriteValue: \n")
fmt.Fprint(w, prewriteValue.String())
err := prewriteValue.Unmarshal(binlog.PrewriteValue)
if err != nil {
log.Error("Failed to unmarshal prewriteValue", zap.Error(err))
fmt.Fprint(w, "\n\n PrewriteValue: <Unmarshallable>\n")
} else {
fmt.Fprint(w, "\n\n PrewriteValue: \n")
fmt.Fprint(w, prewriteValue.String())
}
}

if len(binlog.PrewriteKey) > 0 {
@@ -846,7 +863,10 @@ func (s *Server) commitStatus() {
case node.Pausing, node.Online:
state = node.Paused
case node.Closing:
s.waitSafeToOffline(context.Background())
err := s.waitSafeToOffline(context.Background())
if err != nil {
log.Error("Waiting to offline failed", zap.Error(err))
}
log.Info("safe to offline now")
state = node.Offline
default:
8 changes: 7 additions & 1 deletion pump/status.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,9 @@ import (
"encoding/json"
"net/http"

"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/tidb-binlog/pkg/node"
pb "github.com/pingcap/tipb/go-binlog"
)
@@ -31,5 +34,8 @@ type HTTPStatus struct {

// Status implements http.ServeHTTP interface
func (s *HTTPStatus) Status(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(s)
err := json.NewEncoder(w).Encode(s)
if err != nil {
log.Error("Encode JSON status", zap.Any("status", s), zap.Error(err))
}
}
4 changes: 3 additions & 1 deletion pump/storage/chaser_test.go
Original file line number Diff line number Diff line change
@@ -110,7 +110,9 @@ func (rs *runSuite) TestShouldRetryIfFailedToCatchUp(c *C) {
Offset: start.Offset + int64(i*10),
},
}
f(&next)
if err := f(&next); err != nil {
return err
}
}
if len(offsetRecords) == 3 {
close(finished)
4 changes: 3 additions & 1 deletion pump/storage/storage.go
Original file line number Diff line number Diff line change
@@ -660,7 +660,9 @@ func (a *Append) GC(ts int64) {
}

atomic.StoreInt64(&a.gcTS, ts)
a.saveGCTSToDB(ts)
if err := a.saveGCTSToDB(ts); err != nil {
log.Error("Failed to save GCTS", zap.Int64("ts", ts), zap.Error(err))
}
gcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(ts))))

if !atomic.CompareAndSwapInt32(&a.gcWorking, 0, 1) {

0 comments on commit 4e71dc0

Please sign in to comment.