diff --git a/app/cmd/controller.go b/app/cmd/controller.go index 2a68ce028..df71cd015 100644 --- a/app/cmd/controller.go +++ b/app/cmd/controller.go @@ -210,7 +210,9 @@ func startController(c *cli.Context) error { control.GRPCAddress = util.GetGRPCAddress(listen) control.GRPCServer = controllerrpc.GetControllerGRPCServer(volumeName, engineInstanceName, control) - control.StartGRPCServer() + if err = control.StartGRPCServer(); err != nil { + return err + } return control.WaitForShutdown() } diff --git a/app/cmd/restore_to_file.go b/app/cmd/restore_to_file.go index 0497c7095..e32c27bad 100644 --- a/app/cmd/restore_to_file.go +++ b/app/cmd/restore_to_file.go @@ -229,7 +229,6 @@ func CleanupTempFiles(outputFile string, files ...string) { continue } } - return } func ConvertImage(srcFilepath, dstFilepath, format string) error { diff --git a/main.go b/main.go index 6272dcff7..3c3fd859a 100644 --- a/main.go +++ b/main.go @@ -76,7 +76,9 @@ func longhornCli() { if err != nil { log.Fatal(err) } - pprof.StartCPUProfile(f) + if err = pprof.StartCPUProfile(f); err != nil { + logrus.Fatal(err) + } defer pprof.StopCPUProfile() } diff --git a/pkg/controller/control.go b/pkg/controller/control.go index 570e684d3..7ff3fa756 100644 --- a/pkg/controller/control.go +++ b/pkg/controller/control.go @@ -1208,7 +1208,9 @@ func (c *Controller) monitoring(address string, backend types.Backend) { err := <-monitorChan if err != nil { logrus.WithError(err).Errorf("Backend %v monitoring failed, mark as ERR", address) - c.SetReplicaMode(address, types.ERR) + if err = c.SetReplicaMode(address, types.ERR); err != nil { + logrus.WithError(err).Errorf("Failed to set replica %v to ERR", address) + } } logrus.Infof("Monitoring stopped %v", address) } diff --git a/pkg/dataconn/client.go b/pkg/dataconn/client.go index fdf5bf71e..67d18fbb0 100644 --- a/pkg/dataconn/client.go +++ b/pkg/dataconn/client.go @@ -199,7 +199,12 @@ func (c *Client) nextSeq() uint32 { } func (c *Client) replyError(req *Message, err error) { - journal.RemovePendingOp(req.ID, false) + if opErr := journal.RemovePendingOp(req.ID, false); opErr != nil { + logrus.WithError(opErr).WithFields(logrus.Fields{ + "seq": req.Seq, + "id": req.ID, + }).Error("Error removing pending operation") + } delete(c.messages, req.Seq) req.Type = TypeError req.Data = []byte(err.Error()) @@ -226,7 +231,13 @@ func (c *Client) handleRequest(req *Message) { func (c *Client) handleResponse(resp *Message) { if req, ok := c.messages[resp.Seq]; ok { - journal.RemovePendingOp(req.ID, true) + err := journal.RemovePendingOp(req.ID, true) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "seq": resp.Seq, + "id": req.ID, + }).Error("Error removing pending operation") + } delete(c.messages, resp.Seq) req.Type = resp.Type req.Size = resp.Size diff --git a/pkg/dataconn/server.go b/pkg/dataconn/server.go index fee717ac6..029685d0b 100644 --- a/pkg/dataconn/server.go +++ b/pkg/dataconn/server.go @@ -132,7 +132,9 @@ func (s *Server) write() { Type: TypeClose, } //Best effort to notify client to close connection - s.wire.Write(msg) + if err := s.wire.Write(msg); err != nil { + logrus.WithError(err).Error("Failed to write") + } } } } diff --git a/pkg/frontend/rest/frontend.go b/pkg/frontend/rest/frontend.go index ab34555cd..2c48671ac 100644 --- a/pkg/frontend/rest/frontend.go +++ b/pkg/frontend/rest/frontend.go @@ -66,10 +66,11 @@ func (d *Device) start() error { log.Infof("Rest Frontend listening on %s", listen) + var err error go func() { - http.ListenAndServe(listen, router) + err = http.ListenAndServe(listen, router) }() - return nil + return err } func (d *Device) stop() error { diff --git a/pkg/frontend/socket/frontend.go b/pkg/frontend/socket/frontend.go index ef1818582..85f0b6242 100644 --- a/pkg/frontend/socket/frontend.go +++ b/pkg/frontend/socket/frontend.go @@ -104,9 +104,12 @@ func (t *Socket) startSocketServer(rwu types.ReaderWriterUnmapperAt) error { } } + var err error t.socketPath = socketPath - go t.startSocketServerListen(rwu) - return nil + go func() { + err = t.startSocketServerListen(rwu) + }() + return err } func (t *Socket) startSocketServerListen(rwu types.ReaderWriterUnmapperAt) error { diff --git a/pkg/replica/backup.go b/pkg/replica/backup.go index c2184cc85..88c03cc99 100644 --- a/pkg/replica/backup.go +++ b/pkg/replica/backup.go @@ -78,10 +78,7 @@ func (rb *BackupStatus) HasSnapshot(snapID, volumeID string) bool { } id := diskutil.GenerateSnapshotDiskName(snapID) to := rb.findIndex(id) - if to < 0 { - return false - } - return true + return to >= 0 } func (rb *BackupStatus) OpenSnapshot(snapID, volumeID string) error { diff --git a/pkg/replica/backup_test.go b/pkg/replica/backup_test.go index 23fc006bf..ad2e0a1be 100644 --- a/pkg/replica/backup_test.go +++ b/pkg/replica/backup_test.go @@ -173,6 +173,7 @@ func (s *TestSuite) testBackupWithBackups(c *C, backingFile *backingfile.Backing // Test 003 -> 002 err = rb.OpenSnapshot(snap3, volume) + c.Assert(err, IsNil) mappings, err = rb.CompareSnapshot(snap3, snap2, volume) c.Assert(err, IsNil) c.Assert(len(mappings.Mappings), Equals, 2) @@ -182,6 +183,7 @@ func (s *TestSuite) testBackupWithBackups(c *C, backingFile *backingfile.Backing c.Assert(mappings.Mappings[1].Offset, Equals, int64(8*mb)) c.Assert(mappings.Mappings[1].Size, Equals, int64(2*mb)) err = rb.CloseSnapshot(snap3, volume) + c.Assert(err, IsNil) // Test 002 -> 001 err = rb.OpenSnapshot(snap2, volume) diff --git a/pkg/replica/diff_disk.go b/pkg/replica/diff_disk.go index 9f63a9a6b..1134a7fad 100644 --- a/pkg/replica/diff_disk.go +++ b/pkg/replica/diff_disk.go @@ -58,7 +58,6 @@ func (d *diffDisk) Expand(size int64) { d.location = append(d.location, make([]byte, newLocationSize-len(d.location))...) d.size = size - return } func (d *diffDisk) WriteAt(buf []byte, offset int64) (int, error) { diff --git a/pkg/replica/hash.go b/pkg/replica/hash.go index f0c0cea4d..a32dc42f6 100644 --- a/pkg/replica/hash.go +++ b/pkg/replica/hash.go @@ -51,8 +51,6 @@ type SnapshotHashJob struct { SnapshotName string Rehash bool - file *os.File - SnapshotHashStatus } @@ -95,7 +93,12 @@ func (t *SnapshotHashJob) LockFile() (fileLock *flock.Flock, err error) { } func (t *SnapshotHashJob) UnlockFile(fileLock *flock.Flock) { - fileLock.Unlock() + if err := fileLock.Unlock(); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "snapshot": t.SnapshotName, + "file": fileLock.Path(), + }).Error("Failed to unlock the file lock") + } } func (t *SnapshotHashJob) Execute() (err error) { @@ -120,15 +123,21 @@ func (t *SnapshotHashJob) Execute() (err error) { return } - SetSnapshotHashInfoToChecksumFile(t.SnapshotName, &xattrType.SnapshotHashInfo{ + if err = SetSnapshotHashInfoToChecksumFile(t.SnapshotName, &xattrType.SnapshotHashInfo{ Method: defaultHashMethod, Checksum: checksum, ChangeTime: changeTime, LastHashedAt: lastHashedAt, SilentlyCorrupted: silentlyCorrupted, - }) + }); err != nil { + logrus.WithError(err).Errorf("failed to set snapshot %s hash info to checksum file", t.SnapshotName) + t.State = ProgressStateError + t.Error = err.Error() + return + } - remain, err := t.isChangeTimeRemain(changeTime) + var remain bool + remain, err = t.isChangeTimeRemain(changeTime) if !remain { if err == nil { err = fmt.Errorf("snapshot %v modification time is changed", t.SnapshotName) @@ -136,7 +145,12 @@ func (t *SnapshotHashJob) Execute() (err error) { // Do the best to delete the useless checksum file. // The deletion failure is acceptable, because the mismatching timestamps // will trigger the rehash in the next hash request. - DeleteSnapshotHashInfoChecksumFile(t.SnapshotName) + if deleteErr := DeleteSnapshotHashInfoChecksumFile(t.SnapshotName); deleteErr != nil { + logrus.WithError(deleteErr).Errorf("failed to delete snapshot %v hash info checksum file", t.SnapshotName) + t.State = ProgressStateError + t.Error = deleteErr.Error() + return + } } } @@ -172,8 +186,8 @@ func (t *SnapshotHashJob) Execute() (err error) { return nil } - requireRehash := true if !t.Rehash { + var requireRehash bool requireRehash, checksum, err = t.isRehashRequired(changeTime) if err != nil { return err diff --git a/pkg/replica/replica.go b/pkg/replica/replica.go index 13771e2bc..d7afb983d 100644 --- a/pkg/replica/replica.go +++ b/pkg/replica/replica.go @@ -870,12 +870,16 @@ func (r *Replica) revertDisk(parentDiskFileName, created string) (*Replica, erro info.Parent = newHeadDisk.Parent if _, err := r.encodeToFile(&info, volumeMetaData); err != nil { - r.encodeToFile(&r.info, volumeMetaData) + if _, err = r.encodeToFile(&r.info, volumeMetaData); err != nil { + return nil, err + } return nil, err } // Need to execute before r.Reload() to update r.diskChildrenMap - r.rmDisk(oldHead) + if err = r.rmDisk(oldHead); err != nil { + return nil, err + } rNew, err := r.Reload() if err != nil { @@ -911,7 +915,7 @@ func (r *Replica) createDisk(name string, userCreated bool, created string, labe rollbackFuncList := []func() error{} defer func() { if err == nil { - r.rmDisk(oldHead) + err = r.rmDisk(oldHead) return } @@ -1176,7 +1180,9 @@ func (r *Replica) Delete() error { for name := range r.diskData { if name != r.info.BackingFilePath { - r.rmDisk(name) + if err := r.rmDisk(name); err != nil { + return err + } } } diff --git a/pkg/replica/replica_test.go b/pkg/replica/replica_test.go index 7890f7e33..315d915c6 100644 --- a/pkg/replica/replica_test.go +++ b/pkg/replica/replica_test.go @@ -833,11 +833,13 @@ func (s *TestSuite) TestSnapshotReadWrite(c *C) { fill(buf[b:2*b], 2) count, err = r.WriteAt(buf[b:2*b], b) c.Assert(count, Equals, b) + c.Assert(err, IsNil) err = r.Snapshot("001", true, getNow(), nil) c.Assert(err, IsNil) fill(buf[:b], 1) count, err = r.WriteAt(buf[:b], 0) + c.Assert(err, IsNil) c.Assert(count, Equals, b) err = r.Snapshot("002", true, getNow(), nil) c.Assert(err, IsNil) diff --git a/pkg/replica/rest/replica.go b/pkg/replica/rest/replica.go index fad6517de..98d6bc73f 100644 --- a/pkg/replica/rest/replica.go +++ b/pkg/replica/rest/replica.go @@ -1,9 +1,10 @@ package rest import ( + "net/http" + "github.com/rancher/go-rancher/api" "github.com/rancher/go-rancher/client" - "net/http" ) func (s *Server) ListReplicas(rw http.ResponseWriter, req *http.Request) error { @@ -19,13 +20,3 @@ func (s *Server) Replica(apiContext *api.ApiContext) *Replica { state, info := s.s.Status() return NewReplica(apiContext, state, info, s.s.Replica()) } - -func (s *Server) doOp(req *http.Request, err error) error { - if err != nil { - return err - } - - apiContext := api.GetApiContext(req) - apiContext.Write(s.Replica(apiContext)) - return nil -} diff --git a/pkg/replica/rest/router.go b/pkg/replica/rest/router.go index 52bb143ea..1803097b6 100644 --- a/pkg/replica/rest/router.go +++ b/pkg/replica/rest/router.go @@ -6,6 +6,7 @@ import ( "github.com/gorilla/mux" "github.com/rancher/go-rancher/api" "github.com/rancher/go-rancher/client" + "github.com/sirupsen/logrus" // add pprof endpoint _ "net/http/pprof" @@ -37,7 +38,9 @@ func NewRouter(s *Server) *mux.Router { f := HandleError router.Methods("GET").Path("/ping").Handler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - rw.Write([]byte("pong")) + if _, err := rw.Write([]byte("pong")); err != nil { + logrus.WithError(err).Error("Failed to write response") + } })) // API framework routes diff --git a/pkg/replica/rpc/dataserver.go b/pkg/replica/rpc/dataserver.go index 6251760ff..bedb99ff2 100644 --- a/pkg/replica/rpc/dataserver.go +++ b/pkg/replica/rpc/dataserver.go @@ -58,7 +58,9 @@ func (s *DataServer) listenAndServeTCP() error { go func(conn net.Conn) { server := dataconn.NewServer(conn, s.s) - server.Handle() + if err = server.Handle(); err != nil { + logrus.WithError(err).Error("failed to handle data server") + } }(conn) } } @@ -83,7 +85,9 @@ func (s *DataServer) listenAndServeUNIX() error { logrus.Infof("New connection from: %v", conn.RemoteAddr()) go func(conn net.Conn) { server := dataconn.NewServer(conn, s.s) - server.Handle() + if err = server.Handle(); err != nil { + logrus.WithError(err).Error("failed to handle data server") + } }(conn) } } diff --git a/pkg/replica/server.go b/pkg/replica/server.go index a4ebbc26b..a781385d6 100644 --- a/pkg/replica/server.go +++ b/pkg/replica/server.go @@ -195,8 +195,6 @@ func (s *Server) SetUnmapMarkDiskChainRemoved(enabled bool) { if s.r != nil { s.r.SetUnmapMarkDiskChainRemoved(enabled) } - - return } func (s *Server) SetSnapshotMaxCount(count int) { diff --git a/pkg/sync/rpc/list_test.go b/pkg/sync/rpc/list_test.go index 543c131a0..dcdd6e287 100644 --- a/pkg/sync/rpc/list_test.go +++ b/pkg/sync/rpc/list_test.go @@ -23,9 +23,10 @@ func (s *TestSuite) TestSnapshotHashListCRUD(c *C) { snapshotName := "snapshot0" ctx, cancel := context.WithCancel(context.Background()) task := replica.NewSnapshotHashJob(ctx, cancel, snapshotName, false) - list.Add(snapshotName, task) + err := list.Add(snapshotName, task) + c.Assert(err, IsNil) - _, err := list.Get(snapshotName) + _, err = list.Get(snapshotName) c.Assert(err, IsNil) _, err = list.Get("nonexistence") @@ -49,7 +50,8 @@ func (s *TestSuite) TestSnapshotHashListRefreshTriggerByAdd(c *C) { task := replica.NewSnapshotHashJob(ctx, cancel, snapshotName, false) task.State = replica.ProgressStateComplete - list.Add(snapshotName, task) + err := list.Add(snapshotName, task) + c.Assert(err, IsNil) size := list.GetSize() if i < MaxSnapshotHashJobSize { @@ -68,7 +70,8 @@ func (s *TestSuite) TestSnapshotHashListRefreshTriggerByGet(c *C) { snapshotName := "snapshot" + strconv.Itoa(i) ctx, cancel := context.WithCancel(context.Background()) task := replica.NewSnapshotHashJob(ctx, cancel, snapshotName, false) - list.Add(snapshotName, task) + err := list.Add(snapshotName, task) + c.Assert(err, IsNil) } for i := 0; i < numSnapshots; i++ { diff --git a/pkg/sync/rpc/server.go b/pkg/sync/rpc/server.go index ab76b88ca..b5dfb5004 100644 --- a/pkg/sync/rpc/server.go +++ b/pkg/sync/rpc/server.go @@ -836,7 +836,11 @@ func (s *SyncAgentServer) BackupRestore(ctx context.Context, req *enginerpc.Back return nil, errors.Wrapf(err, "error starting backup restore") } - go s.completeBackupRestore() + go func() { + if completeErr := s.completeBackupRestore(); completeErr != nil { + logrus.WithError(completeErr).Error("Failed to complete backup restore") + } + }() return &emptypb.Empty{}, nil } @@ -1032,7 +1036,11 @@ func (s *SyncAgentServer) SnapshotPurge(ctx context.Context, req *emptypb.Empty) return nil, err } - go s.purgeSnapshots() + go func() { + if purgeErr := s.purgeSnapshots(); purgeErr != nil { + logrus.WithError(purgeErr).Error("Failed to purge snapshots") + } + }() return &emptypb.Empty{}, nil } @@ -1503,7 +1511,13 @@ func (s *SyncAgentServer) SnapshotHashLockState(ctx context.Context, req *emptyp if err != nil { return nil, errors.Wrapf(err, "failed to try lock %v", fileLock) } - defer fileLock.Unlock() + defer func() { + if unlockErr := fileLock.Unlock(); unlockErr != nil { + logrus.WithError(unlockErr).WithFields(logrus.Fields{ + "file": fileLock.Path(), + }).Error("Failed to unlock file") + } + }() return &enginerpc.SnapshotHashLockStateResponse{ IsLocked: !isLocked, diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 7277f6ae9..0874f6a61 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -372,26 +372,37 @@ func (t *Task) AddRestoreReplica(volumeSize, volumeCurrentSize int64, address, i return nil } -func (t *Task) checkRestoreReplicaSize(address, instanceName string, volumeSize int64) error { - replicaCli, err := replicaClient.NewReplicaClient(address, t.client.VolumeName, instanceName) +func (t *Task) checkRestoreReplicaSize(address, instanceName string, volumeSize int64) (err error) { + var ( + replicaCli *replicaClient.ReplicaClient + replicaInfo *types.ReplicaInfo + ) + replicaCli, err = replicaClient.NewReplicaClient(address, t.client.VolumeName, instanceName) if err != nil { - return err + return } - defer replicaCli.CloseReplica() + defer func() { + if closeErr := replicaCli.CloseReplica(); closeErr != nil { + if err == nil { + err = closeErr + } else { + err = fmt.Errorf("original error: %w, close error: %v", err, closeErr) + } + } + }() - replicaInfo, err := replicaCli.GetReplica() + replicaInfo, err = replicaCli.GetReplica() if err != nil { - return err + return } replicaSize, err := strconv.ParseInt(replicaInfo.Size, 10, 64) if err != nil { - return err + return } if replicaSize != volumeSize { - return fmt.Errorf("rebuilding replica size %v is not the same as volume size %v", replicaSize, volumeSize) + err = fmt.Errorf("rebuilding replica size %v is not the same as volume size %v", replicaSize, volumeSize) } - - return nil + return } func (t *Task) VerifyRebuildReplica(address, instanceName string) error { @@ -1028,10 +1039,10 @@ func (t *Task) HashSnapshotStatus(snapshotName string) (map[string]*SnapshotHash } if ok, err := t.isRebuilding(r); err != nil { - err = errors.Wrapf(err, "cannot get snapshot hashing status of %v", r.Address) + err = errors.Wrapf(err, "cannot get snapshot hashing status of %v", r.Address) // nolint: ineffassign,staticcheck return } else if ok { - err = fmt.Errorf("replica %v is rebuilding", r.Address) + err = fmt.Errorf("replica %v is rebuilding", r.Address) // nolint: ineffassign,staticcheck return } diff --git a/pkg/util/util.go b/pkg/util/util.go index 7b5d1c73f..b97b8de2b 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -50,25 +50,17 @@ func ParseAddresses(name string) (string, string, string, int, error) { } func GetGRPCAddress(address string) string { - if strings.HasPrefix(address, "tcp://") { - address = strings.TrimPrefix(address, "tcp://") - } + address = strings.TrimPrefix(address, "tcp://") - if strings.HasPrefix(address, "http://") { - address = strings.TrimPrefix(address, "http://") - } + address = strings.TrimPrefix(address, "http://") - if strings.HasSuffix(address, "/v1") { - address = strings.TrimSuffix(address, "/v1") - } + address = strings.TrimSuffix(address, "/v1") return address } func GetPortFromAddress(address string) (int, error) { - if strings.HasSuffix(address, "/v1") { - address = strings.TrimSuffix(address, "/v1") - } + address = strings.TrimSuffix(address, "/v1") _, strPort, err := net.SplitHostPort(address) if err != nil { diff --git a/pkg/util/validation.go b/pkg/util/validation.go index 341d0934f..ba44b42f8 100644 --- a/pkg/util/validation.go +++ b/pkg/util/validation.go @@ -67,8 +67,6 @@ func IsQualifiedName(value string) []string { const dns1123LabelFmt string = "[a-z0-9]([-a-z0-9]*[a-z0-9])?" -var dns1123LabelRegexp = regexp.MustCompile("^" + dns1123LabelFmt + "$") - const dns1123SubdomainFmt = dns1123LabelFmt + "(\\." + dns1123LabelFmt + ")*" const dns1123SubdomainErrorMsg string = "a DNS-1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character" diff --git a/scripts/validate b/scripts/validate index bf3045155..dcf27ec12 100755 --- a/scripts/validate +++ b/scripts/validate @@ -11,20 +11,8 @@ echo Packages: ${PACKAGES} echo Running: go vet go vet ${PACKAGES} -if [ ! -z "${DRONE_REPO}" ] && [ ! -z "${DRONE_PULL_REQUEST}" ]; then - wget https://github.com/$DRONE_REPO/pull/$DRONE_PULL_REQUEST.patch - echo "Running: golangci-lint run --new-from-patch=${DRONE_PULL_REQUEST}.patch" - golangci-lint run --new-from-patch="${DRONE_PULL_REQUEST}.patch" - rm "${DRONE_PULL_REQUEST}.patch" -elif [ ! -z "${DRONE_COMMIT_REF}" ]; then - echo "Running: golangci-lint run --new-from-rev=${DRONE_COMMIT_REF}" - golangci-lint run --new-from-rev=${DRONE_COMMIT_REF} -else - git symbolic-ref -q HEAD && REV="origin/HEAD" || REV="HEAD^" - headSHA=$(git rev-parse --short=12 ${REV}) - echo "Running: golangci-lint run --new-from-rev=${headSHA}" - golangci-lint run --new-from-rev=${headSHA} -fi +echo "Running: golangci-lint" +golangci-lint run --timeout=5m echo Running: go fmt test -z "$(go fmt ${PACKAGES} | tee /dev/stderr)"