diff --git a/api/converter/to_bytes.go b/api/converter/to_bytes.go index 29fbd54b5..7313e9f51 100644 --- a/api/converter/to_bytes.go +++ b/api/converter/to_bytes.go @@ -29,7 +29,7 @@ import ( ) // SnapshotToBytes converts the given document to byte array. -func SnapshotToBytes(obj *crdt.Object, presences *innerpresence.Map) ([]byte, error) { +func SnapshotToBytes(obj *crdt.Object, presences map[string]innerpresence.Presence) ([]byte, error) { pbElem, err := toJSONElement(obj) if err != nil { return nil, err diff --git a/api/converter/to_pb.go b/api/converter/to_pb.go index 0867437e5..dcf4b1466 100644 --- a/api/converter/to_pb.go +++ b/api/converter/to_pb.go @@ -123,12 +123,11 @@ func ToDocumentSummary(summary *types.DocumentSummary) (*api.DocumentSummary, er } // ToPresences converts the given model to Protobuf format. -func ToPresences(presences *innerpresence.Map) map[string]*api.Presence { +func ToPresences(presences map[string]innerpresence.Presence) map[string]*api.Presence { pbPresences := make(map[string]*api.Presence) - presences.Range(func(k string, v innerpresence.Presence) bool { + for k, v := range presences { pbPresences[k] = ToPresence(v) - return true - }) + } return pbPresences } diff --git a/client/client.go b/client/client.go index 655433967..76cc16f9a 100644 --- a/client/client.go +++ b/client/client.go @@ -432,7 +432,7 @@ func (c *Client) Watch( clientIDs = append(clientIDs, id.String()) } - doc.SetOnlineClientSet(clientIDs...) + doc.SetOnlineClients(clientIDs...) return nil, nil case *api.WatchDocumentResponse_Event: eventType, err := converter.FromEventType(resp.Event.Type) @@ -450,19 +450,22 @@ func (c *Client) Watch( return &WatchResponse{Type: DocumentChanged}, nil case types.DocumentsWatchedEvent: doc.AddOnlineClient(cli.String()) - if doc.OnlinePresence(cli.String()) == nil { + if doc.Presence(cli.String()) == nil { return nil, nil } return &WatchResponse{ Type: DocumentWatched, Presences: map[string]innerpresence.Presence{ - cli.String(): doc.OnlinePresence(cli.String()), + cli.String(): doc.Presence(cli.String()), }, }, nil case types.DocumentsUnwatchedEvent: - p := doc.OnlinePresence(cli.String()) + p := doc.Presence(cli.String()) doc.RemoveOnlineClient(cli.String()) + if p == nil { + return nil, nil + } return &WatchResponse{ Type: DocumentUnwatched, @@ -521,6 +524,8 @@ func (c *Client) Watch( t := PresenceChanged if e.Type == document.WatchedEvent { t = DocumentWatched + } else if e.Type == document.UnwatchedEvent { + t = DocumentUnwatched } rch <- WatchResponse{Type: t, Presences: e.Presences} case <-ctx.Done(): diff --git a/pkg/document/document.go b/pkg/document/document.go index 985dd2f6d..d9ca6b2dd 100644 --- a/pkg/document/document.go +++ b/pkg/document/document.go @@ -43,6 +43,10 @@ const ( // enabling real-time synchronization. WatchedEvent DocEventType = "watched" + // UnwatchedEvent means that the client has disconnected from the server, + // disabling real-time synchronization. + UnwatchedEvent DocEventType = "unwatched" + // PresenceChangedEvent means that the presences of the clients who are editing // the document have changed. PresenceChangedEvent DocEventType = "presence-changed" @@ -282,48 +286,50 @@ func (d *Document) ensureClone() error { return nil } -// Presences returns the presence map of this document. -func (d *Document) Presences() map[string]innerpresence.Presence { - // TODO(hackerwins): We need to use client key instead of actor ID for exposing presence. - presences := make(map[string]innerpresence.Presence) - d.doc.presences.Range(func(key string, value innerpresence.Presence) bool { - presences[key] = value - return true - }) - return presences +// MyPresence returns the presence of the actor. +func (d *Document) MyPresence() innerpresence.Presence { + return d.doc.MyPresence() } // Presence returns the presence of the given client. +// If the client is not online, it returns nil. func (d *Document) Presence(clientID string) innerpresence.Presence { return d.doc.Presence(clientID) } -// MyPresence returns the presence of the actor. -func (d *Document) MyPresence() innerpresence.Presence { - return d.doc.MyPresence() +// PresenceForTest returns the presence of the given client +// regardless of whether the client is online or not. +func (d *Document) PresenceForTest(clientID string) innerpresence.Presence { + return d.doc.PresenceForTest(clientID) } -// SetOnlineClientSet sets the online client set. -func (d *Document) SetOnlineClientSet(clientIDs ...string) { - d.doc.SetOnlineClientSet(clientIDs...) +// Presences returns the presence map of online clients. +func (d *Document) Presences() map[string]innerpresence.Presence { + // TODO(hackerwins): We need to use client key instead of actor ID for exposing presence. + return d.doc.Presences() +} + +// AllPresences returns the presence map of all clients +// regardless of whether the client is online or not. +func (d *Document) AllPresences() map[string]innerpresence.Presence { + return d.doc.AllPresences() } -// AddOnlineClient adds the given client to the online client set. +// SetOnlineClients sets the online clients. +func (d *Document) SetOnlineClients(clientIDs ...string) { + d.doc.SetOnlineClients(clientIDs...) +} + +// AddOnlineClient adds the given client to the online clients. func (d *Document) AddOnlineClient(clientID string) { d.doc.AddOnlineClient(clientID) } -// RemoveOnlineClient removes the given client from the online client set. +// RemoveOnlineClient removes the given client from the online clients. func (d *Document) RemoveOnlineClient(clientID string) { d.doc.RemoveOnlineClient(clientID) } -// OnlinePresence returns the presence of the given client. If the client is not -// online, it returns nil. -func (d *Document) OnlinePresence(clientID string) innerpresence.Presence { - return d.doc.OnlinePresence(clientID) -} - // Events returns the events of this document. func (d *Document) Events() <-chan DocEvent { return d.events diff --git a/pkg/document/innerpresence/presence.go b/pkg/document/innerpresence/presence.go index 7376d0f02..fb1168a0b 100644 --- a/pkg/document/innerpresence/presence.go +++ b/pkg/document/innerpresence/presence.go @@ -50,13 +50,13 @@ func (m *Map) Range(f func(clientID string, presence Presence) bool) { } // Load returns the presence for the given clientID. -func (m *Map) Load(clientID string) (Presence, bool) { +func (m *Map) Load(clientID string) Presence { presence, ok := m.presences.Load(clientID) if !ok { - return nil, false + return nil } - return presence.(Presence), true + return presence.(Presence) } // LoadOrStore returns the existing presence if exists. @@ -140,6 +140,9 @@ func (p Presence) Clear() { // DeepCopy copies itself deeply. func (p Presence) DeepCopy() Presence { + if p == nil { + return nil + } clone := make(map[string]string) for k, v := range p { clone[k] = v diff --git a/pkg/document/internal_document.go b/pkg/document/internal_document.go index e4ce0d8fc..f71cbd43c 100644 --- a/pkg/document/internal_document.go +++ b/pkg/document/internal_document.go @@ -257,17 +257,29 @@ func (d *InternalDocument) ApplyChanges(changes ...*change.Change) ([]DocEvent, if c.PresenceChange() != nil { clientID := c.ID().ActorID().String() if _, ok := d.onlineClients.Load(clientID); ok { - event := DocEvent{ - Type: PresenceChangedEvent, - Presences: map[string]innerpresence.Presence{ - clientID: c.PresenceChange().Presence, - }, + switch c.PresenceChange().ChangeType { + case innerpresence.Put: + eventType := PresenceChangedEvent + if !d.presences.Has(clientID) { + eventType = WatchedEvent + } + event := DocEvent{ + Type: eventType, + Presences: map[string]innerpresence.Presence{ + clientID: c.PresenceChange().Presence, + }, + } + events = append(events, event) + case innerpresence.Clear: + event := DocEvent{ + Type: UnwatchedEvent, + Presences: map[string]innerpresence.Presence{ + clientID: d.Presence(clientID), + }, + } + events = append(events, event) + d.RemoveOnlineClient(clientID) } - - if !d.presences.Has(clientID) { - event.Type = WatchedEvent - } - events = append(events, event) } } @@ -283,34 +295,56 @@ func (d *InternalDocument) ApplyChanges(changes ...*change.Change) ([]DocEvent, // MyPresence returns the presence of the actor currently editing the document. func (d *InternalDocument) MyPresence() innerpresence.Presence { - p := d.presences.LoadOrStore(d.changeID.ActorID().String(), innerpresence.NewPresence()) + if d.status != StatusAttached { + return innerpresence.NewPresence() + } + p := d.presences.Load(d.changeID.ActorID().String()) return p.DeepCopy() } -// Presences returns the map of presences of the actors currently editing the document. -func (d *InternalDocument) Presences() *innerpresence.Map { - return d.presences -} - -// OnlinePresence returns the presence of the given client. If the client is not -// online, it returns nil. -func (d *InternalDocument) OnlinePresence(clientID string) innerpresence.Presence { +// Presence returns the presence of the given client. +// If the client is not online, it returns nil. +func (d *InternalDocument) Presence(clientID string) innerpresence.Presence { if _, ok := d.onlineClients.Load(clientID); !ok { return nil } - presence, _ := d.presences.Load(clientID) - return presence + return d.presences.Load(clientID).DeepCopy() } -// Presence returns the presence of the given client. -func (d *InternalDocument) Presence(clientID string) innerpresence.Presence { - presence, _ := d.presences.Load(clientID) - return presence +// PresenceForTest returns the presence of the given client +// regardless of whether the client is online or not. +func (d *InternalDocument) PresenceForTest(clientID string) innerpresence.Presence { + return d.presences.Load(clientID).DeepCopy() +} + +// Presences returns the presence map of online clients. +func (d *InternalDocument) Presences() map[string]innerpresence.Presence { + presences := make(map[string]innerpresence.Presence) + d.onlineClients.Range(func(key, value interface{}) bool { + p := d.presences.Load(key.(string)) + if p == nil { + return true + } + presences[key.(string)] = p.DeepCopy() + return true + }) + return presences +} + +// AllPresences returns the presence map of all clients +// regardless of whether the client is online or not. +func (d *InternalDocument) AllPresences() map[string]innerpresence.Presence { + presences := make(map[string]innerpresence.Presence) + d.presences.Range(func(key string, value innerpresence.Presence) bool { + presences[key] = value.DeepCopy() + return true + }) + return presences } -// SetOnlineClientSet sets the online client set. -func (d *InternalDocument) SetOnlineClientSet(ids ...string) { +// SetOnlineClients sets the online clients. +func (d *InternalDocument) SetOnlineClients(ids ...string) { d.onlineClients.Range(func(key, value interface{}) bool { d.onlineClients.Delete(key) return true @@ -321,12 +355,12 @@ func (d *InternalDocument) SetOnlineClientSet(ids ...string) { } } -// AddOnlineClient adds the given client to the online client set. +// AddOnlineClient adds the given client to the online clients. func (d *InternalDocument) AddOnlineClient(clientID string) { d.onlineClients.Store(clientID, true) } -// RemoveOnlineClient removes the given client from the online client set. +// RemoveOnlineClient removes the given client from the online clients. func (d *InternalDocument) RemoveOnlineClient(clientID string) { d.onlineClients.Delete(clientID) } diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index 93882c313..1a880cf27 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -947,7 +947,7 @@ func (d *DB) CreateSnapshotInfo( docID types.ID, doc *document.InternalDocument, ) error { - snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences()) + snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences()) if err != nil { return err } diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index bdcdc460a..6465c72bd 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -1031,7 +1031,7 @@ func (c *Client) CreateSnapshotInfo( if err != nil { return err } - snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences()) + snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences()) if err != nil { return err } diff --git a/server/packs/pushpull.go b/server/packs/pushpull.go index 2eb0b95ae..e3c5151e2 100644 --- a/server/packs/pushpull.go +++ b/server/packs/pushpull.go @@ -157,7 +157,7 @@ func pullSnapshot( } cpAfterPull := cpAfterPush.NextServerSeq(docInfo.ServerSeq) - snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences()) + snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences()) if err != nil { return nil, err } diff --git a/server/rpc/admin_server.go b/server/rpc/admin_server.go index 3487b1ee9..340cff513 100644 --- a/server/rpc/admin_server.go +++ b/server/rpc/admin_server.go @@ -254,7 +254,7 @@ func (s *adminServer) GetSnapshotMeta( return nil, err } - snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences()) + snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences()) if err != nil { return nil, err } diff --git a/test/integration/presence_test.go b/test/integration/presence_test.go index 1ff65809c..e2b37aa69 100644 --- a/test/integration/presence_test.go +++ b/test/integration/presence_test.go @@ -58,14 +58,14 @@ func TestPresence(t *testing.T) { p.Set("updated", "true") return nil })) - encoded, err := gojson.Marshal(d1.Presences()) + encoded, err := gojson.Marshal(d1.AllPresences()) assert.NoError(t, err) assert.Equal(t, fmt.Sprintf(`{"%s":{"updated":"true"}}`, c1.ID()), string(encoded)) // 03 Sync documents and check that the presence is updated on the other client assert.NoError(t, c1.Sync(ctx)) assert.NoError(t, c2.Sync(ctx)) - encoded, err = gojson.Marshal(d2.Presences()) + encoded, err = gojson.Marshal(d2.AllPresences()) assert.NoError(t, err) assert.Equal(t, fmt.Sprintf(`{"%s":{"updated":"true"},"%s":{}}`, c1.ID(), c2.ID()), string(encoded)) }) @@ -88,14 +88,14 @@ func TestPresence(t *testing.T) { return nil })) } - encoded, err := gojson.Marshal(d1.Presences()) + encoded, err := gojson.Marshal(d1.AllPresences()) assert.NoError(t, err) assert.Equal(t, fmt.Sprintf(`{"%s":{"updated":"9"}}`, c1.ID()), string(encoded)) // 03 Sync documents and check that the presence is updated on the other client assert.NoError(t, c1.Sync(ctx)) assert.NoError(t, c2.Sync(ctx)) - encoded, err = gojson.Marshal(d2.Presences()) + encoded, err = gojson.Marshal(d2.AllPresences()) assert.NoError(t, err) assert.Equal(t, fmt.Sprintf(`{"%s":{"updated":"9"},"%s":{}}`, c1.ID(), c2.ID()), string(encoded)) }) @@ -112,15 +112,15 @@ func TestPresence(t *testing.T) { // 02. Check that the presence is updated on the other client. assert.NoError(t, c1.Sync(ctx)) assert.Equal(t, innerpresence.Presence{"key": c1.Key()}, d1.MyPresence()) - assert.Equal(t, innerpresence.Presence{"key": c2.Key()}, d1.Presence(c2.ID().String())) + assert.Equal(t, innerpresence.Presence{"key": c2.Key()}, d1.PresenceForTest(c2.ID().String())) assert.Equal(t, innerpresence.Presence{"key": c2.Key()}, d2.MyPresence()) - assert.Equal(t, innerpresence.Presence{"key": c1.Key()}, d2.Presence(c1.ID().String())) + assert.Equal(t, innerpresence.Presence{"key": c1.Key()}, d2.PresenceForTest(c1.ID().String())) // 03. The first client detaches the document and check that the presence is updated on the other client. assert.NoError(t, c1.Detach(ctx, d1)) assert.NoError(t, c2.Sync(ctx)) assert.Equal(t, innerpresence.Presence{"key": c2.Key()}, d2.MyPresence()) - assert.Nil(t, d2.Presence(c1.ID().String())) + assert.Nil(t, d2.PresenceForTest(c1.ID().String())) }) t.Run("presence-related events test", func(t *testing.T) { @@ -249,7 +249,7 @@ func TestPresence(t *testing.T) { }) } - if len(responsePairs) == 4 { + if len(responsePairs) == 3 { return } } @@ -285,23 +285,16 @@ func TestPresence(t *testing.T) { // 05. Unwatch the second client's document. expected = append(expected, watchResponsePair{ - Type: client.PresenceChanged, + Type: client.DocumentUnwatched, Presences: map[string]innerpresence.Presence{ - c2.ID().String(): nil, + c2.ID().String(): d2.MyPresence(), }, }) assert.NoError(t, c2.Detach(ctx, d2)) assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) + wgEvents.Wait() - expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): nil, - }, - }) cancel2() - - wgEvents.Wait() assert.Equal(t, expected, responsePairs) })