diff --git a/aspen/internal/cluster/cluster.go b/aspen/internal/cluster/cluster.go index 50e4f16aeb..67ad6761d4 100644 --- a/aspen/internal/cluster/cluster.go +++ b/aspen/internal/cluster/cluster.go @@ -136,16 +136,30 @@ type Cluster struct { } // Key implements the Cluster interface. -func (c *Cluster) Key() uuid.UUID { return c.Store.PeekState().ClusterKey } +func (c *Cluster) Key() uuid.UUID { + s, release := c.Store.PeekState() + defer release() + return s.ClusterKey +} // Host implements the Cluster interface. -func (c *Cluster) Host() node.Node { return c.Store.GetHost() } +func (c *Cluster) Host() node.Node { + return c.Store.GetHost() +} // HostKey implements the Cluster interface. -func (c *Cluster) HostKey() node.Key { return c.Store.PeekState().HostKey } +func (c *Cluster) HostKey() node.Key { + s, release := c.Store.PeekState() + defer release() + return s.HostKey +} // Nodes implements the Cluster interface. -func (c *Cluster) Nodes() node.Group { return c.Store.PeekState().Nodes } +func (c *Cluster) Nodes() node.Group { + s, release := c.Store.PeekState() + defer release() + return s.Nodes +} // Node implements the Cluster interface. func (c *Cluster) Node(key node.Key) (node.Node, error) { diff --git a/aspen/internal/cluster/pledge/pledge_test.go b/aspen/internal/cluster/pledge/pledge_test.go index 63bb567245..386e72ae06 100644 --- a/aspen/internal/cluster/pledge/pledge_test.go +++ b/aspen/internal/cluster/pledge/pledge_test.go @@ -243,9 +243,11 @@ var _ = Describe("PledgeServer", func() { mu sync.Mutex nodes = make(node.Group) candidates = func(i int) func() node.Group { - mu.Lock() - defer mu.Unlock() - return func() node.Group { return nodes.Copy() } + return func() node.Group { + mu.Lock() + defer mu.Unlock() + return nodes.Copy() + } } numCandidates = 10 numPledges = 2 diff --git a/aspen/internal/cluster/store/store.go b/aspen/internal/cluster/store/store.go index 1be09eac7b..78e28f6dd4 100644 --- a/aspen/internal/cluster/store/store.go +++ b/aspen/internal/cluster/store/store.go @@ -104,24 +104,34 @@ type core struct { } // ClusterKey implements Store. -func (c *core) ClusterKey() uuid.UUID { return c.Observable.PeekState().ClusterKey } +func (c *core) ClusterKey() uuid.UUID { + s, release := c.Observable.PeekState() + ck := s.ClusterKey + release() + return ck +} // SetClusterKey implements Store. func (c *core) SetClusterKey(ctx context.Context, key uuid.UUID) { - s := c.Observable.PeekState() + s := c.Observable.CopyState() s.ClusterKey = key c.Observable.SetState(ctx, s) } // GetNode implements Store. func (c *core) GetNode(key node.Key) (node.Node, bool) { - n, ok := c.Observable.PeekState().Nodes[key] + state, release := c.Observable.PeekState() + defer release() + n, ok := state.Nodes[key] return n, ok } // GetHost implements Store. func (c *core) GetHost() node.Node { - n, _ := c.GetNode(c.Observable.PeekState().HostKey) + state, release := c.Observable.PeekState() + h := state.HostKey + release() + n, _ := c.GetNode(h) return n } diff --git a/aspen/internal/kv/gossip.go b/aspen/internal/kv/gossip.go index 78a4bcd023..6b5096a46a 100644 --- a/aspen/internal/kv/gossip.go +++ b/aspen/internal/kv/gossip.go @@ -79,7 +79,9 @@ func (g *operationReceiver) handle(ctx context.Context, req TxRequest) (TxReques return TxRequest{}, ctx.Err() case g.Out.Inlet() <- req: } - br := g.store.PeekState().toBatchRequest(ctx) + s, release := g.store.PeekState() + defer release() + br := s.toBatchRequest(ctx) br.Sender = g.Cluster.HostKey() return br, nil } diff --git a/aspen/internal/kv/store.go b/aspen/internal/kv/store.go index 7aeaa9db25..3d31583e96 100644 --- a/aspen/internal/kv/store.go +++ b/aspen/internal/kv/store.go @@ -57,7 +57,9 @@ func newStoreEmitter(s store, cfg Config) source { } func (e *storeEmitter) Emit(ctx context.Context) (TxRequest, error) { - return e.store.PeekState().toBatchRequest(ctx), nil + s, release := e.store.PeekState() + defer release() + return s.toBatchRequest(ctx), nil } type storeSink struct { diff --git a/synnax/pkg/distribution/cluster/ontology.go b/synnax/pkg/distribution/cluster/ontology.go index b71f51912d..70dccd306e 100644 --- a/synnax/pkg/distribution/cluster/ontology.go +++ b/synnax/pkg/distribution/cluster/ontology.go @@ -76,7 +76,7 @@ func (s *NodeOntologyService) ListenForChanges(ctx context.Context) { if err := s.Ontology.NewWriter(nil).DefineResource(ctx, NodeOntologyID(core.Free)); err != nil { s.L.Error("failed to define free node ontology resource", zap.Error(err)) } - s.update(ctx, s.Cluster.PeekState()) + s.update(ctx, s.Cluster.CopyState()) s.Cluster.OnChange(func(ctx context.Context, change core.ClusterChange) { s.update(ctx, change.State) }) @@ -102,7 +102,7 @@ func (s *NodeOntologyService) OnChange(f func(context.Context, iter.Nexter[schem // OpenNexter implements ontology.Service. func (s *NodeOntologyService) OpenNexter() (iter.NexterCloser[ontology.Resource], error) { return iter.NexterNopCloser( - iter.All(lo.MapToSlice(s.Cluster.PeekState().Nodes, func(_ core.NodeKey, n core.Node) ontology.Resource { + iter.All(lo.MapToSlice(s.Cluster.CopyState().Nodes, func(_ core.NodeKey, n core.Node) ontology.Resource { return newNodeResource(n) })), ), nil diff --git a/x/go/store/store.go b/x/go/store/store.go index e21f5b3fd0..0bdb60805e 100644 --- a/x/go/store/store.go +++ b/x/go/store/store.go @@ -31,7 +31,7 @@ type Reader[S any] interface { CopyState() S // PeekState returns a read-only view of the current state. // Modifications to the returned state may cause undefined behavior. - PeekState() S + PeekState() (S, func()) } // Writer is a writable Store. @@ -78,10 +78,9 @@ func (c *core[S]) CopyState() S { } // PeekState implements Store. -func (c *core[S]) PeekState() S { +func (c *core[S]) PeekState() (S, func()) { c.mu.RLock() - defer c.mu.RUnlock() - return c.state + return c.state, c.mu.RUnlock } // Observable is a wrapper around a Store that allows the caller to observe @@ -148,7 +147,8 @@ func WrapObservable[S, O any]( // SetState implements Store. func (o *observable[S, O]) SetState(ctx context.Context, state S) { - notify, shouldNotify := o.Transform(o.PeekState(), state) + prev := o.CopyState() + notify, shouldNotify := o.Transform(prev, state) if shouldNotify { lo.Ternary( *o.ObservableConfig.GoNotify,