From 930e2a579e6ab105afebac150050373f378daa8e Mon Sep 17 00:00:00 2001 From: Jack Shaw Date: Tue, 25 Jul 2023 12:19:23 +0100 Subject: [PATCH] Add watcher to upgrade that notifies when upgrade can start Instead of, as we are with mongo, watching for any changes and then requiring the consumer to verify it's an actionable change, create a new watcher extending NamespaceWatcher which does this verification for you This combines Watch and AllProvisionedControllersReady from state/upgrade Test this by mocking the underlying watcher. Ideally, we would have some tests watching a real database, but we are not able to do this at the moment. --- domain/upgrade/service/package_mock_test.go | 42 +++++++- domain/upgrade/service/package_test.go | 2 +- domain/upgrade/service/service.go | 29 ++++-- domain/upgrade/service/service_test.go | 35 +++---- domain/upgrade/service/watcher.go | 105 ++++++++++++++++++++ domain/upgrade/service/watcher_test.go | 83 ++++++++++++++++ 6 files changed, 262 insertions(+), 34 deletions(-) create mode 100644 domain/upgrade/service/watcher.go create mode 100644 domain/upgrade/service/watcher_test.go diff --git a/domain/upgrade/service/package_mock_test.go b/domain/upgrade/service/package_mock_test.go index 4bcbc39e258..205828c9a81 100644 --- a/domain/upgrade/service/package_mock_test.go +++ b/domain/upgrade/service/package_mock_test.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/juju/juju/domain/upgrade/service (interfaces: State) +// Source: github.com/juju/juju/domain/upgrade/service (interfaces: State,WatcherFactory) // Package service is a generated GoMock package. package service @@ -8,6 +8,8 @@ import ( context "context" reflect "reflect" + changestream "github.com/juju/juju/core/changestream" + watcher "github.com/juju/juju/core/watcher" version "github.com/juju/version/v2" gomock "go.uber.org/mock/gomock" ) @@ -121,3 +123,41 @@ func (mr *MockStateMockRecorder) StartUpgrade(arg0, arg1 interface{}) *gomock.Ca mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartUpgrade", reflect.TypeOf((*MockState)(nil).StartUpgrade), arg0, arg1) } + +// MockWatcherFactory is a mock of WatcherFactory interface. +type MockWatcherFactory struct { + ctrl *gomock.Controller + recorder *MockWatcherFactoryMockRecorder +} + +// MockWatcherFactoryMockRecorder is the mock recorder for MockWatcherFactory. +type MockWatcherFactoryMockRecorder struct { + mock *MockWatcherFactory +} + +// NewMockWatcherFactory creates a new mock instance. +func NewMockWatcherFactory(ctrl *gomock.Controller) *MockWatcherFactory { + mock := &MockWatcherFactory{ctrl: ctrl} + mock.recorder = &MockWatcherFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockWatcherFactory) EXPECT() *MockWatcherFactoryMockRecorder { + return m.recorder +} + +// NewNamespaceWatcher mocks base method. +func (m *MockWatcherFactory) NewNamespaceWatcher(arg0 string, arg1 changestream.ChangeType, arg2 string) (watcher.Watcher[[]string], error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewNamespaceWatcher", arg0, arg1, arg2) + ret0, _ := ret[0].(watcher.Watcher[[]string]) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewNamespaceWatcher indicates an expected call of NewNamespaceWatcher. +func (mr *MockWatcherFactoryMockRecorder) NewNamespaceWatcher(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewNamespaceWatcher", reflect.TypeOf((*MockWatcherFactory)(nil).NewNamespaceWatcher), arg0, arg1, arg2) +} diff --git a/domain/upgrade/service/package_test.go b/domain/upgrade/service/package_test.go index ae20e815862..3fa05c10ae7 100644 --- a/domain/upgrade/service/package_test.go +++ b/domain/upgrade/service/package_test.go @@ -9,7 +9,7 @@ import ( gc "gopkg.in/check.v1" ) -//go:generate go run go.uber.org/mock/mockgen -package service -destination package_mock_test.go github.com/juju/juju/domain/upgrade/service State +//go:generate go run go.uber.org/mock/mockgen -package service -destination package_mock_test.go github.com/juju/juju/domain/upgrade/service State,WatcherFactory func TestPackage(t *testing.T) { gc.TestingT(t) diff --git a/domain/upgrade/service/service.go b/domain/upgrade/service/service.go index a445cb32906..1ce35efbbd1 100644 --- a/domain/upgrade/service/service.go +++ b/domain/upgrade/service/service.go @@ -10,6 +10,8 @@ import ( "github.com/juju/errors" "github.com/juju/version/v2" + "github.com/juju/juju/core/changestream" + "github.com/juju/juju/core/watcher" "github.com/juju/juju/database" "github.com/juju/juju/domain" ) @@ -25,14 +27,22 @@ type State interface { ActiveUpgrade(context.Context) (string, error) } +// WatcherFactory describes methods for creating watchers. +type WatcherFactory interface { + // NewNamespaceWatcher returns a new namespace watcher + // for events based on the input change mask. + NewNamespaceWatcher(string, changestream.ChangeType, string) (watcher.StringsWatcher, error) +} + // Service provides the API for working with upgrade info type Service struct { - st State + st State + watcherFactory WatcherFactory } // NewService returns a new Service for interacting with the underlying state. -func NewService(st State) *Service { - return &Service{st: st} +func NewService(st State, wf WatcherFactory) *Service { + return &Service{st: st, watcherFactory: wf} } // CreateUpgrade creates an upgrade to and from specified versions @@ -59,13 +69,6 @@ func (s *Service) SetControllerReady(ctx context.Context, upgradeUUID, controlle return domain.CoerceError(err) } -// AllProvisionedControllersReady returns true if and only if all controllers -// that have been started by the provisioner have been set ready. -func (s *Service) AllProvisionedControllersReady(ctx context.Context, upgradeUUID string) (bool, error) { - allProvisioned, err := s.st.AllProvisionedControllersReady(ctx, upgradeUUID) - return allProvisioned, domain.CoerceError(err) -} - // StartUpgrade starts the current upgrade if it exists func (s *Service) StartUpgrade(ctx context.Context, upgradeUUID string) error { err := s.st.StartUpgrade(ctx, upgradeUUID) @@ -91,3 +94,9 @@ func (s *Service) ActiveUpgrade(ctx context.Context) (string, error) { } return activeUpgrades, domain.CoerceError(err) } + +// WatchForUpgradeReady creates a watcher which notifies when all controller +// nodes have been registered, meaning the upgrade is ready to start +func (s *Service) WatchForUpgradeReady(ctx context.Context, upgradeUUID string) (watcher.NotifyWatcher, error) { + return NewUpgradeReadyWatcher(ctx, s.st, s.watcherFactory, upgradeUUID) +} diff --git a/domain/upgrade/service/service_test.go b/domain/upgrade/service/service_test.go index 651ea329ff8..8befe716ca2 100644 --- a/domain/upgrade/service/service_test.go +++ b/domain/upgrade/service/service_test.go @@ -8,7 +8,6 @@ import ( "database/sql" "github.com/juju/errors" - "github.com/juju/testing" jc "github.com/juju/testing/checkers" "github.com/juju/utils/v3" "github.com/juju/version/v2" @@ -18,8 +17,9 @@ import ( ) type serviceSuite struct { - testing.IsolationSuite state *MockState + + srv *Service } var _ = gc.Suite(&serviceSuite{}) @@ -32,6 +32,7 @@ var ( func (s *serviceSuite) setupMocks(c *gc.C) *gomock.Controller { ctrl := gomock.NewController(c) s.state = NewMockState(ctrl) + s.srv = NewService(s.state, nil) return ctrl } @@ -40,7 +41,7 @@ func (s *serviceSuite) TestCreateUpgrade(c *gc.C) { s.state.EXPECT().CreateUpgrade(gomock.Any(), version.MustParse("3.0.0"), version.MustParse("3.0.1")).Return(testUUID1, nil) - upgradeUUID, err := NewService(s.state).CreateUpgrade(context.Background(), version.MustParse("3.0.0"), version.MustParse("3.0.1")) + upgradeUUID, err := s.srv.CreateUpgrade(context.Background(), version.MustParse("3.0.0"), version.MustParse("3.0.1")) c.Assert(err, jc.ErrorIsNil) c.Assert(upgradeUUID, gc.Equals, testUUID1) } @@ -51,15 +52,15 @@ func (s *serviceSuite) TestCreateUpgradeAlreadyExists(c *gc.C) { ucErr := sqlite3.Error{ExtendedCode: sqlite3.ErrConstraintUnique} s.state.EXPECT().CreateUpgrade(gomock.Any(), version.MustParse("3.0.0"), version.MustParse("3.0.1")).Return("", ucErr) - _, err := NewService(s.state).CreateUpgrade(context.Background(), version.MustParse("3.0.0"), version.MustParse("3.0.1")) + _, err := s.srv.CreateUpgrade(context.Background(), version.MustParse("3.0.0"), version.MustParse("3.0.1")) c.Assert(errors.IsAlreadyExists(err), jc.IsTrue) } func (s *serviceSuite) TestCreateUpgradeInvalidVersions(c *gc.C) { - _, err := NewService(s.state).CreateUpgrade(context.Background(), version.MustParse("3.0.1"), version.MustParse("3.0.0")) + _, err := s.srv.CreateUpgrade(context.Background(), version.MustParse("3.0.1"), version.MustParse("3.0.0")) c.Assert(errors.IsNotValid(err), jc.IsTrue) - _, err = NewService(s.state).CreateUpgrade(context.Background(), version.MustParse("3.0.1"), version.MustParse("3.0.1")) + _, err = s.srv.CreateUpgrade(context.Background(), version.MustParse("3.0.1"), version.MustParse("3.0.1")) c.Assert(errors.IsNotValid(err), jc.IsTrue) } @@ -68,7 +69,7 @@ func (s *serviceSuite) TestSetControllerReady(c *gc.C) { s.state.EXPECT().SetControllerReady(gomock.Any(), testUUID1, testUUID2).Return(nil) - err := NewService(s.state).SetControllerReady(context.Background(), testUUID1, testUUID2) + err := s.srv.SetControllerReady(context.Background(), testUUID1, testUUID2) c.Assert(err, jc.ErrorIsNil) } @@ -78,27 +79,17 @@ func (s *serviceSuite) TestSetControllerReadyForiegnKey(c *gc.C) { fkErr := sqlite3.Error{ExtendedCode: sqlite3.ErrConstraintForeignKey} s.state.EXPECT().SetControllerReady(gomock.Any(), testUUID1, testUUID2).Return(fkErr) - err := NewService(s.state).SetControllerReady(context.Background(), testUUID1, testUUID2) + err := s.srv.SetControllerReady(context.Background(), testUUID1, testUUID2) c.Log(err) c.Assert(errors.IsNotFound(err), jc.IsTrue) } -func (s *serviceSuite) TestAllProvisioneddControllerReadyTrue(c *gc.C) { - defer s.setupMocks(c).Finish() - - s.state.EXPECT().AllProvisionedControllersReady(gomock.Any(), testUUID1).Return(true, nil) - - allProvisioned, err := NewService(s.state).AllProvisionedControllersReady(context.Background(), testUUID1) - c.Assert(err, jc.ErrorIsNil) - c.Assert(allProvisioned, jc.IsTrue) -} - func (s *serviceSuite) TestStartUpgrade(c *gc.C) { defer s.setupMocks(c).Finish() s.state.EXPECT().StartUpgrade(gomock.Any(), testUUID1).Return(nil) - err := NewService(s.state).StartUpgrade(context.Background(), testUUID1) + err := s.srv.StartUpgrade(context.Background(), testUUID1) c.Assert(err, jc.ErrorIsNil) } @@ -107,7 +98,7 @@ func (s *serviceSuite) TestStartUpgradeBeforeCreated(c *gc.C) { s.state.EXPECT().StartUpgrade(gomock.Any(), testUUID1).Return(sql.ErrNoRows) - err := NewService(s.state).StartUpgrade(context.Background(), testUUID1) + err := s.srv.StartUpgrade(context.Background(), testUUID1) c.Assert(errors.IsNotFound(err), jc.IsTrue) } @@ -116,7 +107,7 @@ func (s *serviceSuite) TestActiveUpgrade(c *gc.C) { s.state.EXPECT().ActiveUpgrade(gomock.Any()).Return(testUUID1, nil) - activeUpgrade, err := NewService(s.state).ActiveUpgrade(context.Background()) + activeUpgrade, err := s.srv.ActiveUpgrade(context.Background()) c.Assert(err, jc.ErrorIsNil) c.Assert(activeUpgrade, gc.Equals, testUUID1) } @@ -126,6 +117,6 @@ func (s *serviceSuite) TestActiveUpgradeNoUpgrade(c *gc.C) { s.state.EXPECT().ActiveUpgrade(gomock.Any()).Return("", errors.Trace(sql.ErrNoRows)) - _, err := NewService(s.state).ActiveUpgrade(context.Background()) + _, err := s.srv.ActiveUpgrade(context.Background()) c.Assert(errors.IsNotFound(err), jc.IsTrue) } diff --git a/domain/upgrade/service/watcher.go b/domain/upgrade/service/watcher.go new file mode 100644 index 00000000000..04d5c420ead --- /dev/null +++ b/domain/upgrade/service/watcher.go @@ -0,0 +1,105 @@ +// Copyright 2023 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package service + +import ( + "context" + + "github.com/juju/errors" + "github.com/juju/worker/v3/catacomb" + + "github.com/juju/juju/core/changestream" + "github.com/juju/juju/core/watcher" +) + +type upgradeReadyWatcher struct { + ctx context.Context + catacomb catacomb.Catacomb + + st State + upgradeUUID string + + in <-chan []string + out chan struct{} +} + +// NewUpgradeReadyWatcher creates a watcher which notifies when all controller +// nodes have been registered, meaning the upgrade is ready to start +func NewUpgradeReadyWatcher(ctx context.Context, st State, wf WatcherFactory, upgradeUUID string) (watcher.NotifyWatcher, error) { + namespaceWatcher, err := wf.NewNamespaceWatcher("upgrade_info_controller_node", changestream.Create|changestream.Update, "") + if err != nil { + return nil, errors.Trace(err) + } + + w := &upgradeReadyWatcher{ + ctx: ctx, + st: st, + upgradeUUID: upgradeUUID, + out: make(chan struct{}), + in: namespaceWatcher.Changes(), + } + + if err := catacomb.Invoke(catacomb.Plan{ + Site: &w.catacomb, + Work: w.loop, + }); err != nil { + return nil, errors.Trace(err) + } + + if err := w.catacomb.Add(namespaceWatcher); err != nil { + return nil, errors.Trace(err) + } + + return w, nil +} + +func (w *upgradeReadyWatcher) Kill() { + w.catacomb.Kill(nil) +} + +func (w *upgradeReadyWatcher) Wait() error { + return w.catacomb.Wait() +} + +func (w *upgradeReadyWatcher) Changes() <-chan struct{} { + return w.out +} + +func (w *upgradeReadyWatcher) loop() error { + defer close(w.out) + + // By reassigning the in and out channels, we effectively ticktock between + // read mode and dispatch mode. This ensures we always dispatch deltas that + // we received before reading more, and every channel read/write is guarded + // by checks of the tomb and subscription liveness. + // Start in read mode so we don't send an erroneous initial message + var out chan struct{} + in := w.in + + for { + select { + case <-w.catacomb.Dying(): + return w.catacomb.ErrDying() + case <-w.ctx.Done(): + w.catacomb.Kill(context.Cause(w.ctx)) + case _, ok := <-in: + if !ok { + return nil + } + ready, err := w.st.AllProvisionedControllersReady(w.ctx, w.upgradeUUID) + if err != nil { + return errors.Trace(err) + } + if ready { + // Tick over to dispatch mode. + in = nil + out = w.out + } + case out <- struct{}{}: + // We have dispatched. Tick over to read mode. + in = w.in + out = nil + } + } +} diff --git a/domain/upgrade/service/watcher_test.go b/domain/upgrade/service/watcher_test.go new file mode 100644 index 00000000000..d2a7f2f9f5d --- /dev/null +++ b/domain/upgrade/service/watcher_test.go @@ -0,0 +1,83 @@ +// Copyright 2023 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package service + +import ( + "context" + "time" + + jc "github.com/juju/testing/checkers" + "go.uber.org/mock/gomock" + gc "gopkg.in/check.v1" + + "github.com/juju/juju/core/changestream" + "github.com/juju/juju/core/watcher" + "github.com/juju/juju/core/watcher/watchertest" + coretesting "github.com/juju/juju/testing" +) + +type watcherSuite struct { + state *MockState + nsCh chan []string + + w watcher.NotifyWatcher +} + +var _ = gc.Suite(&watcherSuite{}) + +func (s *watcherSuite) setupMocks(c *gc.C) *gomock.Controller { + ctrl := gomock.NewController(c) + s.state = NewMockState(ctrl) + s.nsCh = make(chan []string) + + nsWatcher := watchertest.NewMockStringsWatcher(s.nsCh) + wf := NewMockWatcherFactory(ctrl) + wf.EXPECT().NewNamespaceWatcher("upgrade_info_controller_node", changestream.Create|changestream.Update, "").Return(nsWatcher, nil) + var err error + s.w, err = NewUpgradeReadyWatcher(context.Background(), s.state, wf, testUUID1) + c.Assert(err, jc.ErrorIsNil) + + return ctrl +} + +func (s *watcherSuite) TestUpgradeReadyWatcherSingleNode(c *gc.C) { + defer s.setupMocks(c).Finish() + + ch := s.w.Changes() + + s.state.EXPECT().AllProvisionedControllersReady(gomock.Any(), testUUID1).Return(true, nil) + + s.nsCh <- []string{"blah"} + select { + case _, ok := <-ch: + c.Assert(ok, jc.IsTrue) + case <-time.After(coretesting.ShortWait): + c.Fatal("Timed out waiting for ready notification") + } +} + +func (s *watcherSuite) TestUpgradeReadyWatcherHA(c *gc.C) { + defer s.setupMocks(c).Finish() + + ch := s.w.Changes() + + s.state.EXPECT().AllProvisionedControllersReady(gomock.Any(), testUUID1).Return(false, nil).Times(2) + s.state.EXPECT().AllProvisionedControllersReady(gomock.Any(), testUUID1).Return(true, nil) + + s.nsCh <- []string{"blah"} + s.nsCh <- []string{"blah"} + select { + case _, _ = <-ch: + c.Fatal("Received unexpected ready notification") + case <-time.After(coretesting.ShortWait): + } + + s.nsCh <- []string{"blah"} + select { + case _, ok := <-ch: + c.Assert(ok, jc.IsTrue) + case <-time.After(coretesting.ShortWait): + c.Fatal("Timed out waiting for ready notification") + } +}