Skip to content

Commit

Permalink
Merge pull request juju#16133 from manadart/3.2-into-3.3
Browse files Browse the repository at this point in the history
juju#16133

Zero-conflict merge to bring forward a single patch:
juju#16129
  • Loading branch information
jujubot authored Aug 22, 2023
2 parents 3113a35 + e4e002b commit 1206b7d
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 85 deletions.
18 changes: 17 additions & 1 deletion worker/dbaccessor/package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package dbaccessor

import (
"testing"
time "time"
"time"

jujutesting "github.com/juju/testing"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -97,7 +97,23 @@ func (s *baseSuite) expectTrackedDBKill() {
s.trackedDB.EXPECT().Wait().Return(nil).AnyTimes()
}

func (s *baseSuite) expectNodeStartupAndShutdown() {
appExp := s.dbApp.EXPECT()
appExp.Ready(gomock.Any()).Return(nil)
appExp.Client(gomock.Any()).Return(s.client, nil).MinTimes(1)
appExp.ID().Return(uint64(666))
appExp.Close().Return(nil)
}

type dbBaseSuite struct {
databasetesting.ControllerSuite
baseSuite
}

func ensureStartup(c *gc.C, w *dbWorker) {
select {
case <-w.dbReady:
case <-time.After(jujutesting.LongWait):
c.Fatal("timed out waiting for Dqlite node start")
}
}
69 changes: 48 additions & 21 deletions worker/dbaccessor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@ import (
"github.com/juju/worker/v3/catacomb"
"github.com/juju/worker/v3/dependency"

coredatabase "github.com/juju/juju/core/database"
"github.com/juju/juju/core/database"
"github.com/juju/juju/database/app"
"github.com/juju/juju/database/dqlite"
"github.com/juju/juju/pubsub/apiserver"
)

const (
// ErrTryAgain indicates that the worker should try again to start the
// worker.
ErrTryAgain = errors.ConstError("DB node is nil, but worker is not dying; rescheduling TrackedDB start attempt")
// errTryAgain indicates that the worker should try
// again later to start a DB tracker worker.
errTryAgain = errors.ConstError("DB node is nil, but worker is not dying; rescheduling TrackedDB start attempt")

// errNotReady indicates that we successfully created a new Dqlite app,
// but the Ready call timed out, and we are waiting for broadcast info.
errNotReady = errors.ConstError("started DB app, but it failed to become ready; waiting for topology updates")
)

// nodeShutdownTimeout is the timeout that we add to the context passed
Expand Down Expand Up @@ -87,7 +91,7 @@ type DBGetter interface {
// GetDB returns a sql.DB reference for the dqlite-backed database that
// contains the data for the specified namespace.
// A NotFound error is returned if the worker is unaware of the requested DB.
GetDB(namespace string) (coredatabase.TrackedDB, error)
GetDB(namespace string) (database.TrackedDB, error)
}

// dbRequest is used to pass requests for TrackedDB
Expand Down Expand Up @@ -189,7 +193,7 @@ func newWorker(cfg WorkerConfig) (*dbWorker, error) {
// will be nil. In this case, we'll return ErrTryAgain. In this
// case we don't want to kill the worker. We'll force the
// worker to try again.
return !errors.Is(err, ErrTryAgain)
return !errors.Is(err, errTryAgain)
},
RestartDelay: time.Second * 10,
}),
Expand Down Expand Up @@ -331,7 +335,7 @@ func (w *dbWorker) Report() map[string]any {
// TODO (stickupkid): Before handing out any DB for any namespace,
// we should first validate it exists in the controller list.
// This should only be required if it's not the controller DB.
func (w *dbWorker) GetDB(namespace string) (coredatabase.TrackedDB, error) {
func (w *dbWorker) GetDB(namespace string) (database.TrackedDB, error) {
// Ensure Dqlite is initialised.
select {
case <-w.dbReady:
Expand Down Expand Up @@ -360,7 +364,7 @@ func (w *dbWorker) GetDB(namespace string) (coredatabase.TrackedDB, error) {
if err != nil {
return nil, errors.Trace(err)
}
return tracked.(coredatabase.TrackedDB), nil
return tracked.(database.TrackedDB), nil
}

// startExistingDqliteNode takes care of starting Dqlite
Expand Down Expand Up @@ -395,6 +399,30 @@ func (w *dbWorker) startExistingDqliteNode() error {
}

func (w *dbWorker) initialiseDqlite(options ...app.Option) error {
ctx, cancel := w.scopedContext()
defer cancel()

if err := w.startDqliteNode(ctx, options...); err != nil {
if errors.Is(err, errNotReady) {
return nil
}
return errors.Trace(err)
}

// Open up the default controller database.
// Other database namespaces are opened lazily via GetDB calls.
// We don't need to apply the database schema here as the
// controller database is created during bootstrap.
if err := w.openDatabase(database.ControllerNS); err != nil {
return errors.Annotate(err, "opening controller database")
}

// Begin handling external requests.
close(w.dbReady)
return nil
}

func (w *dbWorker) startDqliteNode(ctx context.Context, options ...app.Option) error {
w.mu.Lock()
defer w.mu.Unlock()

Expand Down Expand Up @@ -423,34 +451,31 @@ func (w *dbWorker) initialiseDqlite(options ...app.Option) error {
defer cCancel()

if err := w.dbApp.Ready(ctx); err != nil {
if err == context.DeadlineExceeded {
if errors.Is(err, context.DeadlineExceeded) {
// We don't know whether we were cancelled by tomb or by timeout.
// Request API server details in case we need to invoke a backstop
// scenario. If we are shutting down, this won't matter.
if err := w.dbApp.Close(); err != nil {
return errors.Trace(err)
}
w.dbApp = nil
return errors.Annotatef(w.requestAPIServerDetails(), "requesting API server details")

if err := w.requestAPIServerDetails(); err != nil {
return errors.Annotatef(err, "requesting API server details")
}
return errNotReady
}
return errors.Annotatef(err, "ensuring Dqlite is ready to process changes")
}

// Open up the default controller database. Other database namespaces can
// be opened up in a more lazy fashion.
if err := w.openDatabase(coredatabase.ControllerNS); err != nil {
return errors.Annotate(err, "opening initial databases")
}

w.cfg.Logger.Infof("initialized Dqlite application (ID: %v)", w.dbApp.ID())
w.cfg.Logger.Infof("serving Dqlite application (ID: %v)", w.dbApp.ID())

if c, err := w.dbApp.Client(ctx); err == nil {
if info, err := c.Cluster(ctx); err == nil {
w.cfg.Logger.Infof("current cluster: %#v", info)
}
}

close(w.dbReady)
return nil
}

Expand Down Expand Up @@ -483,7 +508,7 @@ func (w *dbWorker) openDatabase(namespace string) error {
case <-w.catacomb.Dying():
return nil, w.catacomb.ErrDying()
default:
return nil, ErrTryAgain
return nil, errTryAgain
}
}

Expand Down Expand Up @@ -591,8 +616,10 @@ func (w *dbWorker) processAPIServerChange(apiDetails apiserver.Details) error {
}

// Otherwise there is no deterministic course of action.
// Play it safe and throw out.
return errors.Errorf("unable to reconcile current controller and Dqlite cluster status")
// We don't want to throw an error here, because it can result in churn
// when entering HA. Just try again to start.
log.Infof("unable to reconcile current controller and Dqlite cluster status; reattempting node start-up")
return errors.Trace(w.startExistingDqliteNode())
}

// Otherwise this is a node added by enabling HA,
Expand Down
Loading

0 comments on commit 1206b7d

Please sign in to comment.