diff --git a/worker/dbaccessor/package_test.go b/worker/dbaccessor/package_test.go index db1f37fe6f7..5e2f5447541 100644 --- a/worker/dbaccessor/package_test.go +++ b/worker/dbaccessor/package_test.go @@ -5,7 +5,7 @@ package dbaccessor import ( "testing" - time "time" + "time" jujutesting "github.com/juju/testing" "go.uber.org/mock/gomock" @@ -97,7 +97,23 @@ func (s *baseSuite) expectTrackedDBKill() { s.trackedDB.EXPECT().Wait().Return(nil).AnyTimes() } +func (s *baseSuite) expectNodeStartupAndShutdown() { + appExp := s.dbApp.EXPECT() + appExp.Ready(gomock.Any()).Return(nil) + appExp.Client(gomock.Any()).Return(s.client, nil).MinTimes(1) + appExp.ID().Return(uint64(666)) + appExp.Close().Return(nil) +} + type dbBaseSuite struct { databasetesting.ControllerSuite baseSuite } + +func ensureStartup(c *gc.C, w *dbWorker) { + select { + case <-w.dbReady: + case <-time.After(jujutesting.LongWait): + c.Fatal("timed out waiting for Dqlite node start") + } +} diff --git a/worker/dbaccessor/worker.go b/worker/dbaccessor/worker.go index 0e5a4ec6c23..3754a1cf44f 100644 --- a/worker/dbaccessor/worker.go +++ b/worker/dbaccessor/worker.go @@ -15,16 +15,20 @@ import ( "github.com/juju/worker/v3/catacomb" "github.com/juju/worker/v3/dependency" - coredatabase "github.com/juju/juju/core/database" + "github.com/juju/juju/core/database" "github.com/juju/juju/database/app" "github.com/juju/juju/database/dqlite" "github.com/juju/juju/pubsub/apiserver" ) const ( - // ErrTryAgain indicates that the worker should try again to start the - // worker. - ErrTryAgain = errors.ConstError("DB node is nil, but worker is not dying; rescheduling TrackedDB start attempt") + // errTryAgain indicates that the worker should try + // again later to start a DB tracker worker. + errTryAgain = errors.ConstError("DB node is nil, but worker is not dying; rescheduling TrackedDB start attempt") + + // errNotReady indicates that we successfully created a new Dqlite app, + // but the Ready call timed out, and we are waiting for broadcast info. + errNotReady = errors.ConstError("started DB app, but it failed to become ready; waiting for topology updates") ) // nodeShutdownTimeout is the timeout that we add to the context passed @@ -87,7 +91,7 @@ type DBGetter interface { // GetDB returns a sql.DB reference for the dqlite-backed database that // contains the data for the specified namespace. // A NotFound error is returned if the worker is unaware of the requested DB. - GetDB(namespace string) (coredatabase.TrackedDB, error) + GetDB(namespace string) (database.TrackedDB, error) } // dbRequest is used to pass requests for TrackedDB @@ -189,7 +193,7 @@ func newWorker(cfg WorkerConfig) (*dbWorker, error) { // will be nil. In this case, we'll return ErrTryAgain. In this // case we don't want to kill the worker. We'll force the // worker to try again. - return !errors.Is(err, ErrTryAgain) + return !errors.Is(err, errTryAgain) }, RestartDelay: time.Second * 10, }), @@ -331,7 +335,7 @@ func (w *dbWorker) Report() map[string]any { // TODO (stickupkid): Before handing out any DB for any namespace, // we should first validate it exists in the controller list. // This should only be required if it's not the controller DB. -func (w *dbWorker) GetDB(namespace string) (coredatabase.TrackedDB, error) { +func (w *dbWorker) GetDB(namespace string) (database.TrackedDB, error) { // Ensure Dqlite is initialised. select { case <-w.dbReady: @@ -360,7 +364,7 @@ func (w *dbWorker) GetDB(namespace string) (coredatabase.TrackedDB, error) { if err != nil { return nil, errors.Trace(err) } - return tracked.(coredatabase.TrackedDB), nil + return tracked.(database.TrackedDB), nil } // startExistingDqliteNode takes care of starting Dqlite @@ -395,6 +399,30 @@ func (w *dbWorker) startExistingDqliteNode() error { } func (w *dbWorker) initialiseDqlite(options ...app.Option) error { + ctx, cancel := w.scopedContext() + defer cancel() + + if err := w.startDqliteNode(ctx, options...); err != nil { + if errors.Is(err, errNotReady) { + return nil + } + return errors.Trace(err) + } + + // Open up the default controller database. + // Other database namespaces are opened lazily via GetDB calls. + // We don't need to apply the database schema here as the + // controller database is created during bootstrap. + if err := w.openDatabase(database.ControllerNS); err != nil { + return errors.Annotate(err, "opening controller database") + } + + // Begin handling external requests. + close(w.dbReady) + return nil +} + +func (w *dbWorker) startDqliteNode(ctx context.Context, options ...app.Option) error { w.mu.Lock() defer w.mu.Unlock() @@ -423,7 +451,7 @@ func (w *dbWorker) initialiseDqlite(options ...app.Option) error { defer cCancel() if err := w.dbApp.Ready(ctx); err != nil { - if err == context.DeadlineExceeded { + if errors.Is(err, context.DeadlineExceeded) { // We don't know whether we were cancelled by tomb or by timeout. // Request API server details in case we need to invoke a backstop // scenario. If we are shutting down, this won't matter. @@ -431,18 +459,16 @@ func (w *dbWorker) initialiseDqlite(options ...app.Option) error { return errors.Trace(err) } w.dbApp = nil - return errors.Annotatef(w.requestAPIServerDetails(), "requesting API server details") + + if err := w.requestAPIServerDetails(); err != nil { + return errors.Annotatef(err, "requesting API server details") + } + return errNotReady } return errors.Annotatef(err, "ensuring Dqlite is ready to process changes") } - // Open up the default controller database. Other database namespaces can - // be opened up in a more lazy fashion. - if err := w.openDatabase(coredatabase.ControllerNS); err != nil { - return errors.Annotate(err, "opening initial databases") - } - - w.cfg.Logger.Infof("initialized Dqlite application (ID: %v)", w.dbApp.ID()) + w.cfg.Logger.Infof("serving Dqlite application (ID: %v)", w.dbApp.ID()) if c, err := w.dbApp.Client(ctx); err == nil { if info, err := c.Cluster(ctx); err == nil { @@ -450,7 +476,6 @@ func (w *dbWorker) initialiseDqlite(options ...app.Option) error { } } - close(w.dbReady) return nil } @@ -483,7 +508,7 @@ func (w *dbWorker) openDatabase(namespace string) error { case <-w.catacomb.Dying(): return nil, w.catacomb.ErrDying() default: - return nil, ErrTryAgain + return nil, errTryAgain } } @@ -591,8 +616,10 @@ func (w *dbWorker) processAPIServerChange(apiDetails apiserver.Details) error { } // Otherwise there is no deterministic course of action. - // Play it safe and throw out. - return errors.Errorf("unable to reconcile current controller and Dqlite cluster status") + // We don't want to throw an error here, because it can result in churn + // when entering HA. Just try again to start. + log.Infof("unable to reconcile current controller and Dqlite cluster status; reattempting node start-up") + return errors.Trace(w.startExistingDqliteNode()) } // Otherwise this is a node added by enabling HA, diff --git a/worker/dbaccessor/worker_test.go b/worker/dbaccessor/worker_test.go index e2f31b7dcc5..ab8663d6257 100644 --- a/worker/dbaccessor/worker_test.go +++ b/worker/dbaccessor/worker_test.go @@ -70,7 +70,7 @@ func (s *workerSuite) TestStartupTimeoutSingleControllerReconfigure(c *gc.C) { c.Assert(errors.Is(err, dependency.ErrBounce), jc.IsTrue) } -func (s *workerSuite) TestStartupTimeoutMultipleControllerError(c *gc.C) { +func (s *workerSuite) TestStartupTimeoutMultipleControllerRetry(c *gc.C) { defer s.setupMocks(c).Finish() s.expectAnyLogs() @@ -78,29 +78,32 @@ func (s *workerSuite) TestStartupTimeoutMultipleControllerError(c *gc.C) { s.expectTrackedDBKill() mgrExp := s.nodeManager.EXPECT() - mgrExp.EnsureDataDir().Return(c.MkDir(), nil) + mgrExp.EnsureDataDir().Return(c.MkDir(), nil).Times(2) mgrExp.IsExistingNode().Return(true, nil).Times(2) - mgrExp.IsBootstrappedNode(gomock.Any()).Return(false, nil).Times(3) - mgrExp.WithTLSOption().Return(nil, nil) - mgrExp.WithLogFuncOption().Return(nil) - mgrExp.WithTracingOption().Return(nil) + mgrExp.IsBootstrappedNode(gomock.Any()).Return(false, nil).Times(4) - // App gets started, we time out waiting, then we close it. + // We expect 2 attempts to start. + mgrExp.WithTLSOption().Return(nil, nil).Times(2) + mgrExp.WithLogFuncOption().Return(nil).Times(2) + mgrExp.WithTracingOption().Return(nil).Times(2) + + // App gets started, we time out waiting, then we close it both times. appExp := s.dbApp.EXPECT() - appExp.Ready(gomock.Any()).Return(context.DeadlineExceeded) - appExp.Close().Return(nil) + appExp.Ready(gomock.Any()).Return(context.DeadlineExceeded).Times(2) + appExp.Close().Return(nil).Times(2) // We expect to request API details. s.hub.EXPECT().Subscribe(apiserver.DetailsTopic, gomock.Any()).Return(func() {}, nil) - s.hub.EXPECT().Publish(apiserver.DetailsRequestTopic, gomock.Any()).Return(func() {}, nil) + s.hub.EXPECT().Publish(apiserver.DetailsRequestTopic, gomock.Any()).Return(func() {}, nil).Times(2) w := s.newWorker(c) - defer w.Kill() + defer workertest.CleanKill(c, w) + dbw := w.(*dbWorker) // If there are multiple servers reported, we can't reason about our // current state in a discrete fashion. The worker throws an error. select { - case w.(*dbWorker).apiServerChanges <- apiserver.Details{ + case dbw.apiServerChanges <- apiserver.Details{ Servers: map[string]apiserver.APIServer{ "0": {ID: "0", InternalAddress: "10.6.6.6:1234"}, "1": {ID: "1", InternalAddress: "10.6.6.7:1234"}, @@ -110,8 +113,13 @@ func (s *workerSuite) TestStartupTimeoutMultipleControllerError(c *gc.C) { c.Fatal("timed out waiting for cluster change to be processed") } - err := workertest.CheckKilled(c, w) - c.Assert(err, gc.ErrorMatches, "unable to reconcile current controller and Dqlite cluster status") + // At this point, the Dqlite node is not started. + // The worker is waiting for legitimate server detail messages. + select { + case <-dbw.dbReady: + c.Fatal("Dqlite node should not be started yet.") + case <-time.After(testing.ShortWait): + } } func (s *workerSuite) TestStartupNotExistingNodeThenCluster(c *gc.C) { @@ -133,7 +141,8 @@ func (s *workerSuite) TestStartupNotExistingNodeThenCluster(c *gc.C) { s.client.EXPECT().Cluster(gomock.Any()).Return(nil, nil) - sync := s.expectNodeStartupAndShutdown(true) + s.expectNodeStartupAndShutdown() + s.dbApp.EXPECT().Handover(gomock.Any()).Return(nil) // When we are starting up as a new node, // we request details immediately. @@ -142,10 +151,11 @@ func (s *workerSuite) TestStartupNotExistingNodeThenCluster(c *gc.C) { w := s.newWorker(c) defer workertest.DirtyKill(c, w) + dbw := w.(*dbWorker) // Without a bind address for ourselves we keep waiting. select { - case w.(*dbWorker).apiServerChanges <- apiserver.Details{ + case dbw.apiServerChanges <- apiserver.Details{ Servers: map[string]apiserver.APIServer{ "0": {ID: "0"}, "1": {ID: "1", InternalAddress: "10.6.6.7:1234"}, @@ -169,7 +179,7 @@ func (s *workerSuite) TestStartupNotExistingNodeThenCluster(c *gc.C) { // At this point, the Dqlite node is not started. // The worker is waiting for legitimate server detail messages. select { - case <-sync: + case <-dbw.dbReady: c.Fatal("Dqlite node should not be started yet.") case <-time.After(testing.ShortWait): } @@ -187,11 +197,7 @@ func (s *workerSuite) TestStartupNotExistingNodeThenCluster(c *gc.C) { c.Fatal("timed out waiting for cluster change to be processed") } - select { - case <-sync: - case <-time.After(testing.LongWait): - c.Fatal("timed out waiting for Dqlite node start") - } + ensureStartup(c, dbw) s.client.EXPECT().Leader(gomock.Any()).Return(&dqlite.NodeInfo{ ID: 1, @@ -230,18 +236,15 @@ func (s *workerSuite) TestWorkerStartupExistingNode(c *gc.C) { s.client.EXPECT().Cluster(gomock.Any()).Return(nil, nil) - sync := s.expectNodeStartupAndShutdown(true) + s.expectNodeStartupAndShutdown() + s.dbApp.EXPECT().Handover(gomock.Any()).Return(nil) s.hub.EXPECT().Subscribe(apiserver.DetailsTopic, gomock.Any()).Return(func() {}, nil) w := s.newWorker(c) defer workertest.DirtyKill(c, w) - select { - case <-sync: - case <-time.After(testing.LongWait): - c.Fatal("timed out waiting for Dqlite node start") - } + ensureStartup(c, w.(*dbWorker)) workertest.CleanKill(c, w) } @@ -266,24 +269,21 @@ func (s *workerSuite) TestWorkerStartupAsBootstrapNodeSingleServerNoRebind(c *gc s.client.EXPECT().Cluster(gomock.Any()).Return(nil, nil) - sync := s.expectNodeStartupAndShutdown(false) + s.expectNodeStartupAndShutdown() s.hub.EXPECT().Subscribe(apiserver.DetailsTopic, gomock.Any()).Return(func() {}, nil) w := s.newWorker(c) defer workertest.DirtyKill(c, w) + dbw := w.(*dbWorker) - select { - case <-sync: - case <-time.After(testing.LongWait): - c.Fatal("timed out waiting for Dqlite node start") - } + ensureStartup(c, dbw) // At this point we have started successfully. // Push a message onto the API details channel. // A single server does not cause a binding change. select { - case w.(*dbWorker).apiServerChanges <- apiserver.Details{ + case dbw.apiServerChanges <- apiserver.Details{ Servers: map[string]apiserver.APIServer{ "0": {ID: "0", InternalAddress: "10.6.6.6:1234"}, }, @@ -295,7 +295,7 @@ func (s *workerSuite) TestWorkerStartupAsBootstrapNodeSingleServerNoRebind(c *gc // Multiple servers still do not cause a binding change // if there is no internal address to bind to. select { - case w.(*dbWorker).apiServerChanges <- apiserver.Details{ + case dbw.apiServerChanges <- apiserver.Details{ Servers: map[string]apiserver.APIServer{ "0": {ID: "0"}, "1": {ID: "1", InternalAddress: "10.6.6.7:1234"}, @@ -357,23 +357,20 @@ func (s *workerSuite) TestWorkerStartupAsBootstrapNodeThenReconfigure(c *gc.C) { // Although the shut-down check for IsBootstrappedNode returns false, // this call to shut-down is actually run before reconfiguring the node. // When the loop exits, the node is already set to nil. - sync := s.expectNodeStartupAndShutdown(false) + s.expectNodeStartupAndShutdown() s.hub.EXPECT().Subscribe(apiserver.DetailsTopic, gomock.Any()).Return(func() {}, nil) w := s.newWorker(c) defer workertest.DirtyKill(c, w) + dbw := w.(*dbWorker) - select { - case <-sync: - case <-time.After(testing.LongWait): - c.Fatal("timed out waiting for Dqlite node start") - } + ensureStartup(c, dbw) // At this point we have started successfully. // Push a message onto the API details channel to simulate a move into HA. select { - case w.(*dbWorker).apiServerChanges <- apiserver.Details{ + case dbw.apiServerChanges <- apiserver.Details{ Servers: map[string]apiserver.APIServer{ "0": {ID: "0", InternalAddress: "10.6.6.6:1234"}, "1": {ID: "1", InternalAddress: "10.6.6.7:1234"}, @@ -394,26 +391,6 @@ func (s *workerSuite) setupMocks(c *gc.C) *gomock.Controller { return ctrl } -func (s *workerSuite) expectNodeStartupAndShutdown(handover bool) chan struct{} { - sync := make(chan struct{}) - - appExp := s.dbApp.EXPECT() - appExp.Ready(gomock.Any()).Return(nil) - appExp.Client(gomock.Any()).Return(s.client, nil).MinTimes(1) - appExp.ID().DoAndReturn(func() uint64 { - close(sync) - return uint64(666) - }) - - if handover { - appExp.Handover(gomock.Any()).Return(nil) - } - - appExp.Close().Return(nil) - - return sync -} - func (s *workerSuite) newWorker(c *gc.C) worker.Worker { cfg := WorkerConfig{ NodeManager: s.nodeManager,