Skip to content

Commit

Permalink
Cleanup/deregister a mailbox if we fail to register (#172)
Browse files Browse the repository at this point in the history
* Cleanup/deregister a mailbox if we fail to register

* Fix the defers and mutex locking in mailboxes
  • Loading branch information
ajroetker authored Jan 18, 2022
1 parent 2768bfb commit aff5c5c
Showing 1 changed file with 28 additions and 26 deletions.
54 changes: 28 additions & 26 deletions mailbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,40 +121,18 @@ func newMailbox(s *Server, name, nsName string, size int) (*Mailbox, error) {
s.mu.Unlock()

s.mumb.Lock()
defer s.mumb.Unlock()

_, ok := s.mailboxes[nsName]
if ok {
return nil, fmt.Errorf("%w: nsName=%s", ErrAlreadyRegistered, nsName)
}
s.mumb.Unlock()

timeout, cancel := context.WithTimeout(context.Background(), s.cfg.Timeout)
err := s.registry.Register(timeout, nsName)
cancel()
// Check if the error is a particular fatal error
// from etcd. Some errors have no recovery. See
// the list of all possible errors here:
//
// https://github.com/etcd-io/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go
//
// They are unfortunately not classidied into
// recoverable or non-recoverable.
if err != nil && strings.Contains(err.Error(), "etcdserver: requested lease not found") {
s.reportFatalError(err)
return nil, err
}
if err != nil {
return nil, err
}

boxC := make(chan Request, size)
cleanup := func() {
s.mumb.Lock()
defer s.mumb.Unlock()

// Immediately delete the subscription so that no one
// can send to it, at least from this host.
delete(s.mailboxes, nsName)
s.mumb.Unlock()

// Deregister the name.
var err error
Expand All @@ -165,13 +143,37 @@ func newMailbox(s *Server, name, nsName string, size int) (*Mailbox, error) {
cancel()
return err != nil
})
// Ingnore ErrNotOwner because most likely the previous owner panic'ed or exited badly.
// Ingnore ErrNotOwner because most likely the previous owner panic'ed or exited badly.
// So we'll ignore the error and let the mailbox creator retry later. We don't want to panic
// in that case because it will take down more mailboxes and make it worse.
// in that case because it will take down more mailboxes and make it worse.
if err != nil && err != registry.ErrNotOwner {
panic(fmt.Errorf("%w: unable to deregister mailbox: %v, error: %v", errDeregisteredFailed, nsName, err))
}
}

timeout, cancel := context.WithTimeout(context.Background(), s.cfg.Timeout)
err := s.registry.Register(timeout, nsName)
cancel()
// Check if the error is a particular fatal error
// from etcd. Some errors have no recovery. See
// the list of all possible errors here:
//
// https://github.com/etcd-io/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go
//
// They are unfortunately not classidied into
// recoverable or non-recoverable.
if err != nil && strings.Contains(err.Error(), "etcdserver: requested lease not found") {
s.reportFatalError(err)
return nil, err
}
if err != nil {
cleanup()
return nil, err
}

s.mumb.Lock()
defer s.mumb.Unlock()
boxC := make(chan Request, size)
box := &Mailbox{
name: name,
nsName: nsName,
Expand Down

0 comments on commit aff5c5c

Please sign in to comment.