Skip to content

Commit

Permalink
Add watcher to upgrade that notifies when upgrade can start
Browse files Browse the repository at this point in the history
Instead of, as we are with mongo, watching for any changes and then
requiring the consumer to verify it's an actionable change, create a new
watcher extending NamespaceWatcher which does this verification for you

This combines Watch and AllProvisionedControllersReady from
state/upgrade

Test this by mocking the underlying watcher. Ideally, we would have some
tests watching a real database, but we are not able to do this at the
moment.
  • Loading branch information
jack-w-shaw committed Jul 26, 2023
1 parent ee5d6dc commit 930e2a5
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 34 deletions.
42 changes: 41 additions & 1 deletion domain/upgrade/service/package_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion domain/upgrade/service/package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
gc "gopkg.in/check.v1"
)

//go:generate go run go.uber.org/mock/mockgen -package service -destination package_mock_test.go github.com/juju/juju/domain/upgrade/service State
//go:generate go run go.uber.org/mock/mockgen -package service -destination package_mock_test.go github.com/juju/juju/domain/upgrade/service State,WatcherFactory

func TestPackage(t *testing.T) {
gc.TestingT(t)
Expand Down
29 changes: 19 additions & 10 deletions domain/upgrade/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/juju/errors"
"github.com/juju/version/v2"

"github.com/juju/juju/core/changestream"
"github.com/juju/juju/core/watcher"
"github.com/juju/juju/database"
"github.com/juju/juju/domain"
)
Expand All @@ -25,14 +27,22 @@ type State interface {
ActiveUpgrade(context.Context) (string, error)
}

// WatcherFactory describes methods for creating watchers.
type WatcherFactory interface {
// NewNamespaceWatcher returns a new namespace watcher
// for events based on the input change mask.
NewNamespaceWatcher(string, changestream.ChangeType, string) (watcher.StringsWatcher, error)
}

// Service provides the API for working with upgrade info
type Service struct {
st State
st State
watcherFactory WatcherFactory
}

// NewService returns a new Service for interacting with the underlying state.
func NewService(st State) *Service {
return &Service{st: st}
func NewService(st State, wf WatcherFactory) *Service {
return &Service{st: st, watcherFactory: wf}
}

// CreateUpgrade creates an upgrade to and from specified versions
Expand All @@ -59,13 +69,6 @@ func (s *Service) SetControllerReady(ctx context.Context, upgradeUUID, controlle
return domain.CoerceError(err)
}

// AllProvisionedControllersReady returns true if and only if all controllers
// that have been started by the provisioner have been set ready.
func (s *Service) AllProvisionedControllersReady(ctx context.Context, upgradeUUID string) (bool, error) {
allProvisioned, err := s.st.AllProvisionedControllersReady(ctx, upgradeUUID)
return allProvisioned, domain.CoerceError(err)
}

// StartUpgrade starts the current upgrade if it exists
func (s *Service) StartUpgrade(ctx context.Context, upgradeUUID string) error {
err := s.st.StartUpgrade(ctx, upgradeUUID)
Expand All @@ -91,3 +94,9 @@ func (s *Service) ActiveUpgrade(ctx context.Context) (string, error) {
}
return activeUpgrades, domain.CoerceError(err)
}

// WatchForUpgradeReady creates a watcher which notifies when all controller
// nodes have been registered, meaning the upgrade is ready to start
func (s *Service) WatchForUpgradeReady(ctx context.Context, upgradeUUID string) (watcher.NotifyWatcher, error) {
return NewUpgradeReadyWatcher(ctx, s.st, s.watcherFactory, upgradeUUID)
}
35 changes: 13 additions & 22 deletions domain/upgrade/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"database/sql"

"github.com/juju/errors"
"github.com/juju/testing"
jc "github.com/juju/testing/checkers"
"github.com/juju/utils/v3"
"github.com/juju/version/v2"
Expand All @@ -18,8 +17,9 @@ import (
)

type serviceSuite struct {
testing.IsolationSuite
state *MockState

srv *Service
}

var _ = gc.Suite(&serviceSuite{})
Expand All @@ -32,6 +32,7 @@ var (
func (s *serviceSuite) setupMocks(c *gc.C) *gomock.Controller {
ctrl := gomock.NewController(c)
s.state = NewMockState(ctrl)
s.srv = NewService(s.state, nil)
return ctrl
}

Expand All @@ -40,7 +41,7 @@ func (s *serviceSuite) TestCreateUpgrade(c *gc.C) {

s.state.EXPECT().CreateUpgrade(gomock.Any(), version.MustParse("3.0.0"), version.MustParse("3.0.1")).Return(testUUID1, nil)

upgradeUUID, err := NewService(s.state).CreateUpgrade(context.Background(), version.MustParse("3.0.0"), version.MustParse("3.0.1"))
upgradeUUID, err := s.srv.CreateUpgrade(context.Background(), version.MustParse("3.0.0"), version.MustParse("3.0.1"))
c.Assert(err, jc.ErrorIsNil)
c.Assert(upgradeUUID, gc.Equals, testUUID1)
}
Expand All @@ -51,15 +52,15 @@ func (s *serviceSuite) TestCreateUpgradeAlreadyExists(c *gc.C) {
ucErr := sqlite3.Error{ExtendedCode: sqlite3.ErrConstraintUnique}
s.state.EXPECT().CreateUpgrade(gomock.Any(), version.MustParse("3.0.0"), version.MustParse("3.0.1")).Return("", ucErr)

_, err := NewService(s.state).CreateUpgrade(context.Background(), version.MustParse("3.0.0"), version.MustParse("3.0.1"))
_, err := s.srv.CreateUpgrade(context.Background(), version.MustParse("3.0.0"), version.MustParse("3.0.1"))
c.Assert(errors.IsAlreadyExists(err), jc.IsTrue)
}

func (s *serviceSuite) TestCreateUpgradeInvalidVersions(c *gc.C) {
_, err := NewService(s.state).CreateUpgrade(context.Background(), version.MustParse("3.0.1"), version.MustParse("3.0.0"))
_, err := s.srv.CreateUpgrade(context.Background(), version.MustParse("3.0.1"), version.MustParse("3.0.0"))
c.Assert(errors.IsNotValid(err), jc.IsTrue)

_, err = NewService(s.state).CreateUpgrade(context.Background(), version.MustParse("3.0.1"), version.MustParse("3.0.1"))
_, err = s.srv.CreateUpgrade(context.Background(), version.MustParse("3.0.1"), version.MustParse("3.0.1"))
c.Assert(errors.IsNotValid(err), jc.IsTrue)
}

Expand All @@ -68,7 +69,7 @@ func (s *serviceSuite) TestSetControllerReady(c *gc.C) {

s.state.EXPECT().SetControllerReady(gomock.Any(), testUUID1, testUUID2).Return(nil)

err := NewService(s.state).SetControllerReady(context.Background(), testUUID1, testUUID2)
err := s.srv.SetControllerReady(context.Background(), testUUID1, testUUID2)
c.Assert(err, jc.ErrorIsNil)
}

Expand All @@ -78,27 +79,17 @@ func (s *serviceSuite) TestSetControllerReadyForiegnKey(c *gc.C) {
fkErr := sqlite3.Error{ExtendedCode: sqlite3.ErrConstraintForeignKey}
s.state.EXPECT().SetControllerReady(gomock.Any(), testUUID1, testUUID2).Return(fkErr)

err := NewService(s.state).SetControllerReady(context.Background(), testUUID1, testUUID2)
err := s.srv.SetControllerReady(context.Background(), testUUID1, testUUID2)
c.Log(err)
c.Assert(errors.IsNotFound(err), jc.IsTrue)
}

func (s *serviceSuite) TestAllProvisioneddControllerReadyTrue(c *gc.C) {
defer s.setupMocks(c).Finish()

s.state.EXPECT().AllProvisionedControllersReady(gomock.Any(), testUUID1).Return(true, nil)

allProvisioned, err := NewService(s.state).AllProvisionedControllersReady(context.Background(), testUUID1)
c.Assert(err, jc.ErrorIsNil)
c.Assert(allProvisioned, jc.IsTrue)
}

func (s *serviceSuite) TestStartUpgrade(c *gc.C) {
defer s.setupMocks(c).Finish()

s.state.EXPECT().StartUpgrade(gomock.Any(), testUUID1).Return(nil)

err := NewService(s.state).StartUpgrade(context.Background(), testUUID1)
err := s.srv.StartUpgrade(context.Background(), testUUID1)
c.Assert(err, jc.ErrorIsNil)
}

Expand All @@ -107,7 +98,7 @@ func (s *serviceSuite) TestStartUpgradeBeforeCreated(c *gc.C) {

s.state.EXPECT().StartUpgrade(gomock.Any(), testUUID1).Return(sql.ErrNoRows)

err := NewService(s.state).StartUpgrade(context.Background(), testUUID1)
err := s.srv.StartUpgrade(context.Background(), testUUID1)
c.Assert(errors.IsNotFound(err), jc.IsTrue)
}

Expand All @@ -116,7 +107,7 @@ func (s *serviceSuite) TestActiveUpgrade(c *gc.C) {

s.state.EXPECT().ActiveUpgrade(gomock.Any()).Return(testUUID1, nil)

activeUpgrade, err := NewService(s.state).ActiveUpgrade(context.Background())
activeUpgrade, err := s.srv.ActiveUpgrade(context.Background())
c.Assert(err, jc.ErrorIsNil)
c.Assert(activeUpgrade, gc.Equals, testUUID1)
}
Expand All @@ -126,6 +117,6 @@ func (s *serviceSuite) TestActiveUpgradeNoUpgrade(c *gc.C) {

s.state.EXPECT().ActiveUpgrade(gomock.Any()).Return("", errors.Trace(sql.ErrNoRows))

_, err := NewService(s.state).ActiveUpgrade(context.Background())
_, err := s.srv.ActiveUpgrade(context.Background())
c.Assert(errors.IsNotFound(err), jc.IsTrue)
}
105 changes: 105 additions & 0 deletions domain/upgrade/service/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package service

import (
"context"

"github.com/juju/errors"
"github.com/juju/worker/v3/catacomb"

"github.com/juju/juju/core/changestream"
"github.com/juju/juju/core/watcher"
)

type upgradeReadyWatcher struct {
ctx context.Context
catacomb catacomb.Catacomb

st State
upgradeUUID string

in <-chan []string
out chan struct{}
}

// NewUpgradeReadyWatcher creates a watcher which notifies when all controller
// nodes have been registered, meaning the upgrade is ready to start
func NewUpgradeReadyWatcher(ctx context.Context, st State, wf WatcherFactory, upgradeUUID string) (watcher.NotifyWatcher, error) {
namespaceWatcher, err := wf.NewNamespaceWatcher("upgrade_info_controller_node", changestream.Create|changestream.Update, "")
if err != nil {
return nil, errors.Trace(err)
}

w := &upgradeReadyWatcher{
ctx: ctx,
st: st,
upgradeUUID: upgradeUUID,
out: make(chan struct{}),
in: namespaceWatcher.Changes(),
}

if err := catacomb.Invoke(catacomb.Plan{
Site: &w.catacomb,
Work: w.loop,
}); err != nil {
return nil, errors.Trace(err)
}

if err := w.catacomb.Add(namespaceWatcher); err != nil {
return nil, errors.Trace(err)
}

return w, nil
}

func (w *upgradeReadyWatcher) Kill() {
w.catacomb.Kill(nil)
}

func (w *upgradeReadyWatcher) Wait() error {
return w.catacomb.Wait()
}

func (w *upgradeReadyWatcher) Changes() <-chan struct{} {
return w.out
}

func (w *upgradeReadyWatcher) loop() error {
defer close(w.out)

// By reassigning the in and out channels, we effectively ticktock between
// read mode and dispatch mode. This ensures we always dispatch deltas that
// we received before reading more, and every channel read/write is guarded
// by checks of the tomb and subscription liveness.
// Start in read mode so we don't send an erroneous initial message
var out chan struct{}
in := w.in

for {
select {
case <-w.catacomb.Dying():
return w.catacomb.ErrDying()
case <-w.ctx.Done():
w.catacomb.Kill(context.Cause(w.ctx))
case _, ok := <-in:
if !ok {
return nil
}
ready, err := w.st.AllProvisionedControllersReady(w.ctx, w.upgradeUUID)
if err != nil {
return errors.Trace(err)
}
if ready {
// Tick over to dispatch mode.
in = nil
out = w.out
}
case out <- struct{}{}:
// We have dispatched. Tick over to read mode.
in = w.in
out = nil
}
}
}
Loading

0 comments on commit 930e2a5

Please sign in to comment.