Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aspen] - fixed issues with concurrent modification of cluster store #772

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions aspen/internal/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,30 @@ type Cluster struct {
}

// Key implements the Cluster interface.
func (c *Cluster) Key() uuid.UUID { return c.Store.PeekState().ClusterKey }
func (c *Cluster) Key() uuid.UUID {
s, release := c.Store.PeekState()
defer release()
return s.ClusterKey
}

// Host implements the Cluster interface.
func (c *Cluster) Host() node.Node { return c.Store.GetHost() }
func (c *Cluster) Host() node.Node {
return c.Store.GetHost()
}

// HostKey implements the Cluster interface.
func (c *Cluster) HostKey() node.Key { return c.Store.PeekState().HostKey }
func (c *Cluster) HostKey() node.Key {
s, release := c.Store.PeekState()
defer release()
return s.HostKey
}

// Nodes implements the Cluster interface.
func (c *Cluster) Nodes() node.Group { return c.Store.PeekState().Nodes }
func (c *Cluster) Nodes() node.Group {
s, release := c.Store.PeekState()
defer release()
return s.Nodes
}

// Node implements the Cluster interface.
func (c *Cluster) Node(key node.Key) (node.Node, error) {
Expand Down
8 changes: 5 additions & 3 deletions aspen/internal/cluster/pledge/pledge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,11 @@ var _ = Describe("PledgeServer", func() {
mu sync.Mutex
nodes = make(node.Group)
candidates = func(i int) func() node.Group {
mu.Lock()
defer mu.Unlock()
return func() node.Group { return nodes.Copy() }
return func() node.Group {
mu.Lock()
defer mu.Unlock()
return nodes.Copy()
}
}
numCandidates = 10
numPledges = 2
Expand Down
18 changes: 14 additions & 4 deletions aspen/internal/cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,34 @@
}

// ClusterKey implements Store.
func (c *core) ClusterKey() uuid.UUID { return c.Observable.PeekState().ClusterKey }
func (c *core) ClusterKey() uuid.UUID {
s, release := c.Observable.PeekState()
ck := s.ClusterKey
release()
return ck

Check warning on line 111 in aspen/internal/cluster/store/store.go

View check run for this annotation

Codecov / codecov/patch

aspen/internal/cluster/store/store.go#L107-L111

Added lines #L107 - L111 were not covered by tests
}

// SetClusterKey implements Store.
func (c *core) SetClusterKey(ctx context.Context, key uuid.UUID) {
s := c.Observable.PeekState()
s := c.Observable.CopyState()

Check warning on line 116 in aspen/internal/cluster/store/store.go

View check run for this annotation

Codecov / codecov/patch

aspen/internal/cluster/store/store.go#L116

Added line #L116 was not covered by tests
s.ClusterKey = key
c.Observable.SetState(ctx, s)
}

// GetNode implements Store.
func (c *core) GetNode(key node.Key) (node.Node, bool) {
n, ok := c.Observable.PeekState().Nodes[key]
state, release := c.Observable.PeekState()
defer release()
n, ok := state.Nodes[key]
return n, ok
}

// GetHost implements Store.
func (c *core) GetHost() node.Node {
n, _ := c.GetNode(c.Observable.PeekState().HostKey)
state, release := c.Observable.PeekState()
h := state.HostKey
release()
n, _ := c.GetNode(h)
return n
}

Expand Down
4 changes: 3 additions & 1 deletion aspen/internal/kv/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func (g *operationReceiver) handle(ctx context.Context, req TxRequest) (TxReques
return TxRequest{}, ctx.Err()
case g.Out.Inlet() <- req:
}
br := g.store.PeekState().toBatchRequest(ctx)
s, release := g.store.PeekState()
defer release()
br := s.toBatchRequest(ctx)
br.Sender = g.Cluster.HostKey()
return br, nil
}
Expand Down
4 changes: 3 additions & 1 deletion aspen/internal/kv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func newStoreEmitter(s store, cfg Config) source {
}

func (e *storeEmitter) Emit(ctx context.Context) (TxRequest, error) {
return e.store.PeekState().toBatchRequest(ctx), nil
s, release := e.store.PeekState()
defer release()
return s.toBatchRequest(ctx), nil
}

type storeSink struct {
Expand Down
4 changes: 2 additions & 2 deletions synnax/pkg/distribution/cluster/ontology.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *NodeOntologyService) ListenForChanges(ctx context.Context) {
if err := s.Ontology.NewWriter(nil).DefineResource(ctx, NodeOntologyID(core.Free)); err != nil {
s.L.Error("failed to define free node ontology resource", zap.Error(err))
}
s.update(ctx, s.Cluster.PeekState())
s.update(ctx, s.Cluster.CopyState())
s.Cluster.OnChange(func(ctx context.Context, change core.ClusterChange) {
s.update(ctx, change.State)
})
Expand All @@ -102,7 +102,7 @@ func (s *NodeOntologyService) OnChange(f func(context.Context, iter.Nexter[schem
// OpenNexter implements ontology.Service.
func (s *NodeOntologyService) OpenNexter() (iter.NexterCloser[ontology.Resource], error) {
return iter.NexterNopCloser(
iter.All(lo.MapToSlice(s.Cluster.PeekState().Nodes, func(_ core.NodeKey, n core.Node) ontology.Resource {
iter.All(lo.MapToSlice(s.Cluster.CopyState().Nodes, func(_ core.NodeKey, n core.Node) ontology.Resource {
return newNodeResource(n)
})),
), nil
Expand Down
10 changes: 5 additions & 5 deletions x/go/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Reader[S any] interface {
CopyState() S
// PeekState returns a read-only view of the current state.
// Modifications to the returned state may cause undefined behavior.
PeekState() S
PeekState() (S, func())
}

// Writer is a writable Store.
Expand Down Expand Up @@ -78,10 +78,9 @@ func (c *core[S]) CopyState() S {
}

// PeekState implements Store.
func (c *core[S]) PeekState() S {
func (c *core[S]) PeekState() (S, func()) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.state
return c.state, c.mu.RUnlock
}

// Observable is a wrapper around a Store that allows the caller to observe
Expand Down Expand Up @@ -148,7 +147,8 @@ func WrapObservable[S, O any](

// SetState implements Store.
func (o *observable[S, O]) SetState(ctx context.Context, state S) {
notify, shouldNotify := o.Transform(o.PeekState(), state)
prev := o.CopyState()
notify, shouldNotify := o.Transform(prev, state)
if shouldNotify {
lo.Ternary(
*o.ObservableConfig.GoNotify,
Expand Down
Loading