Skip to content

Commit

Permalink
[aspen] - fixed issues with concurrent modification of cluster store (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
emilbon99 authored Aug 16, 2024
1 parent 6348767 commit cf5a49b
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 20 deletions.
22 changes: 18 additions & 4 deletions aspen/internal/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,30 @@ type Cluster struct {
}

// Key implements the Cluster interface.
func (c *Cluster) Key() uuid.UUID { return c.Store.PeekState().ClusterKey }
func (c *Cluster) Key() uuid.UUID {
s, release := c.Store.PeekState()
defer release()
return s.ClusterKey
}

// Host implements the Cluster interface.
func (c *Cluster) Host() node.Node { return c.Store.GetHost() }
func (c *Cluster) Host() node.Node {
return c.Store.GetHost()
}

// HostKey implements the Cluster interface.
func (c *Cluster) HostKey() node.Key { return c.Store.PeekState().HostKey }
func (c *Cluster) HostKey() node.Key {
s, release := c.Store.PeekState()
defer release()
return s.HostKey
}

// Nodes implements the Cluster interface.
func (c *Cluster) Nodes() node.Group { return c.Store.PeekState().Nodes }
func (c *Cluster) Nodes() node.Group {
s, release := c.Store.PeekState()
defer release()
return s.Nodes
}

// Node implements the Cluster interface.
func (c *Cluster) Node(key node.Key) (node.Node, error) {
Expand Down
8 changes: 5 additions & 3 deletions aspen/internal/cluster/pledge/pledge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,11 @@ var _ = Describe("PledgeServer", func() {
mu sync.Mutex
nodes = make(node.Group)
candidates = func(i int) func() node.Group {
mu.Lock()
defer mu.Unlock()
return func() node.Group { return nodes.Copy() }
return func() node.Group {
mu.Lock()
defer mu.Unlock()
return nodes.Copy()
}
}
numCandidates = 10
numPledges = 2
Expand Down
18 changes: 14 additions & 4 deletions aspen/internal/cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,34 @@ type core struct {
}

// ClusterKey implements Store.
func (c *core) ClusterKey() uuid.UUID { return c.Observable.PeekState().ClusterKey }
func (c *core) ClusterKey() uuid.UUID {
s, release := c.Observable.PeekState()
ck := s.ClusterKey
release()
return ck

Check warning on line 111 in aspen/internal/cluster/store/store.go

View check run for this annotation

Codecov / codecov/patch

aspen/internal/cluster/store/store.go#L107-L111

Added lines #L107 - L111 were not covered by tests
}

// SetClusterKey implements Store.
func (c *core) SetClusterKey(ctx context.Context, key uuid.UUID) {
s := c.Observable.PeekState()
s := c.Observable.CopyState()

Check warning on line 116 in aspen/internal/cluster/store/store.go

View check run for this annotation

Codecov / codecov/patch

aspen/internal/cluster/store/store.go#L116

Added line #L116 was not covered by tests
s.ClusterKey = key
c.Observable.SetState(ctx, s)
}

// GetNode implements Store.
func (c *core) GetNode(key node.Key) (node.Node, bool) {
n, ok := c.Observable.PeekState().Nodes[key]
state, release := c.Observable.PeekState()
defer release()
n, ok := state.Nodes[key]
return n, ok
}

// GetHost implements Store.
func (c *core) GetHost() node.Node {
n, _ := c.GetNode(c.Observable.PeekState().HostKey)
state, release := c.Observable.PeekState()
h := state.HostKey
release()
n, _ := c.GetNode(h)
return n
}

Expand Down
4 changes: 3 additions & 1 deletion aspen/internal/kv/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func (g *operationReceiver) handle(ctx context.Context, req TxRequest) (TxReques
return TxRequest{}, ctx.Err()
case g.Out.Inlet() <- req:
}
br := g.store.PeekState().toBatchRequest(ctx)
s, release := g.store.PeekState()
defer release()
br := s.toBatchRequest(ctx)
br.Sender = g.Cluster.HostKey()
return br, nil
}
Expand Down
4 changes: 3 additions & 1 deletion aspen/internal/kv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func newStoreEmitter(s store, cfg Config) source {
}

func (e *storeEmitter) Emit(ctx context.Context) (TxRequest, error) {
return e.store.PeekState().toBatchRequest(ctx), nil
s, release := e.store.PeekState()
defer release()
return s.toBatchRequest(ctx), nil
}

type storeSink struct {
Expand Down
4 changes: 2 additions & 2 deletions synnax/pkg/distribution/cluster/ontology.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *NodeOntologyService) ListenForChanges(ctx context.Context) {
if err := s.Ontology.NewWriter(nil).DefineResource(ctx, NodeOntologyID(core.Free)); err != nil {
s.L.Error("failed to define free node ontology resource", zap.Error(err))
}
s.update(ctx, s.Cluster.PeekState())
s.update(ctx, s.Cluster.CopyState())
s.Cluster.OnChange(func(ctx context.Context, change core.ClusterChange) {
s.update(ctx, change.State)
})
Expand All @@ -102,7 +102,7 @@ func (s *NodeOntologyService) OnChange(f func(context.Context, iter.Nexter[schem
// OpenNexter implements ontology.Service.
func (s *NodeOntologyService) OpenNexter() (iter.NexterCloser[ontology.Resource], error) {
return iter.NexterNopCloser(
iter.All(lo.MapToSlice(s.Cluster.PeekState().Nodes, func(_ core.NodeKey, n core.Node) ontology.Resource {
iter.All(lo.MapToSlice(s.Cluster.CopyState().Nodes, func(_ core.NodeKey, n core.Node) ontology.Resource {
return newNodeResource(n)
})),
), nil
Expand Down
10 changes: 5 additions & 5 deletions x/go/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Reader[S any] interface {
CopyState() S
// PeekState returns a read-only view of the current state.
// Modifications to the returned state may cause undefined behavior.
PeekState() S
PeekState() (S, func())
}

// Writer is a writable Store.
Expand Down Expand Up @@ -78,10 +78,9 @@ func (c *core[S]) CopyState() S {
}

// PeekState implements Store.
func (c *core[S]) PeekState() S {
func (c *core[S]) PeekState() (S, func()) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.state
return c.state, c.mu.RUnlock
}

// Observable is a wrapper around a Store that allows the caller to observe
Expand Down Expand Up @@ -148,7 +147,8 @@ func WrapObservable[S, O any](

// SetState implements Store.
func (o *observable[S, O]) SetState(ctx context.Context, state S) {
notify, shouldNotify := o.Transform(o.PeekState(), state)
prev := o.CopyState()
notify, shouldNotify := o.Transform(prev, state)
if shouldNotify {
lo.Ternary(
*o.ObservableConfig.GoNotify,
Expand Down

0 comments on commit cf5a49b

Please sign in to comment.