diff --git a/client/client.go b/client/client.go index 46b39d925..8e3866fa1 100644 --- a/client/client.go +++ b/client/client.go @@ -337,7 +337,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ... c.attachments[doc.Key()].watchCtx = watchCtx c.attachments[doc.Key()].closeWatchStream = cancelFunc - if !opts.IsManual { + if opts.IsRealtime { err = c.runWatchLoop(watchCtx, doc) if err != nil { return err diff --git a/client/options.go b/client/options.go index fcb03b6a9..2d6204e4c 100644 --- a/client/options.go +++ b/client/options.go @@ -94,7 +94,7 @@ type AttachOptions struct { // Presence is the presence of the client. Presence innerpresence.Presence InitialRoot map[string]any - IsManual bool + IsRealtime bool } // WithPresence configures the presence of the client. @@ -111,9 +111,9 @@ func WithInitialRoot(root map[string]any) AttachOption { } } -// WithManualSync configures the manual sync of the client. -func WithManualSync() AttachOption { - return func(o *AttachOptions) { o.IsManual = true } +// WithRealtimeSync configures the manual sync of the client. +func WithRealtimeSync() AttachOption { + return func(o *AttachOptions) { o.IsRealtime = true } } // DetachOption configures DetachOptions. diff --git a/server/backend/sync/memory/publisher.go b/server/backend/sync/memory/publisher.go new file mode 100644 index 000000000..5beae0a5f --- /dev/null +++ b/server/backend/sync/memory/publisher.go @@ -0,0 +1,131 @@ +/* + * Copyright 2024 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package memory + +import ( + "strconv" + gosync "sync" + "sync/atomic" + time "time" + + "go.uber.org/zap" + + "github.com/yorkie-team/yorkie/server/backend/sync" + "github.com/yorkie-team/yorkie/server/logging" +) + +var id loggerID + +type loggerID int32 + +func (c *loggerID) next() string { + next := atomic.AddInt32((*int32)(c), 1) + return "p" + strconv.Itoa(int(next)) +} + +// BatchPublisher is a publisher that publishes events in batch. +type BatchPublisher struct { + logger *zap.SugaredLogger + mutex gosync.Mutex + events []sync.DocEvent + + window time.Duration + closeChan chan struct{} + subs *Subscriptions +} + +// NewBatchPublisher creates a new BatchPublisher instance. +func NewBatchPublisher(subs *Subscriptions, window time.Duration) *BatchPublisher { + bp := &BatchPublisher{ + logger: logging.New(id.next()), + window: window, + closeChan: make(chan struct{}), + subs: subs, + } + + go bp.processLoop() + return bp +} + +// Publish adds the given event to the batch. If the batch is full, it publishes +// the batch. +func (bp *BatchPublisher) Publish(event sync.DocEvent) { + bp.mutex.Lock() + defer bp.mutex.Unlock() + + // TODO(hackerwins): If DocumentChangedEvent is already in the batch, we don't + // need to add it again. + bp.events = append(bp.events, event) +} + +func (bp *BatchPublisher) processLoop() { + ticker := time.NewTicker(bp.window) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + bp.publish() + case <-bp.closeChan: + return + } + } +} + +func (bp *BatchPublisher) publish() { + bp.mutex.Lock() + + if len(bp.events) == 0 { + bp.mutex.Unlock() + return + } + + events := bp.events + bp.events = nil + + bp.mutex.Unlock() + + if logging.Enabled(zap.DebugLevel) { + bp.logger.Infof( + "Publishing batch of %d events for document %s", + len(bp.events), + bp.subs.docKey, + ) + } + + for _, sub := range bp.subs.Values() { + for _, event := range events { + if sub.Subscriber().Compare(event.Publisher) == 0 { + continue + } + + if ok := sub.Publish(event); !ok { + bp.logger.Infof( + "Publish(%s,%s) to %s timeout or closed", + event.Type, + event.Publisher, + sub.Subscriber(), + ) + } + } + } +} + +// Close stops the batch publisher +func (bp *BatchPublisher) Close() { + close(bp.closeChan) +} diff --git a/server/backend/sync/memory/pubsub.go b/server/backend/sync/memory/pubsub.go index a4cd6e7cd..ad7a7879b 100644 --- a/server/backend/sync/memory/pubsub.go +++ b/server/backend/sync/memory/pubsub.go @@ -18,6 +18,7 @@ package memory import ( "context" + gotime "time" "go.uber.org/zap" @@ -32,13 +33,16 @@ import ( type Subscriptions struct { docKey types.DocRefKey internalMap *cmap.Map[string, *sync.Subscription] + publisher *BatchPublisher } func newSubscriptions(docKey types.DocRefKey) *Subscriptions { - return &Subscriptions{ + s := &Subscriptions{ docKey: docKey, internalMap: cmap.New[string, *sync.Subscription](), } + s.publisher = NewBatchPublisher(s, 100*gotime.Millisecond) + return s } // Set adds the given subscription. @@ -52,40 +56,8 @@ func (s *Subscriptions) Values() []*sync.Subscription { } // Publish publishes the given event. -func (s *Subscriptions) Publish(ctx context.Context, event sync.DocEvent) { - // TODO(hackerwins): Introduce batch publish to reduce lock contention. - // Problem: - // - High lock contention when publishing events frequently. - // - Redundant events being published in short time windows. - // Solution: - // - Collect events to publish in configurable time window. - // - Keep only the latest event for the same event type. - // - Run dedicated publish loop in a single goroutine. - // - Batch publish collected events when the time window expires. - for _, sub := range s.internalMap.Values() { - if sub.Subscriber().Compare(event.Publisher) == 0 { - continue - } - - if logging.Enabled(zap.DebugLevel) { - logging.From(ctx).Debugf( - `Publish %s(%s,%s) to %s`, - event.Type, - s.docKey, - event.Publisher, - sub.Subscriber(), - ) - } - - if ok := sub.Publish(event); !ok { - logging.From(ctx).Warnf( - `Publish(%s,%s) to %s timeout or closed`, - s.docKey, - event.Publisher, - sub.Subscriber(), - ) - } - } +func (s *Subscriptions) Publish(event sync.DocEvent) { + s.publisher.Publish(event) } // Delete deletes the subscription of the given id. @@ -103,6 +75,11 @@ func (s *Subscriptions) Len() int { return s.internalMap.Len() } +// Close closes the subscriptions. +func (s *Subscriptions) Close() { + s.publisher.Close() +} + // PubSub is the memory implementation of PubSub, used for single server. type PubSub struct { subscriptionsMap *cmap.Map[types.DocRefKey, *Subscriptions] @@ -170,6 +147,7 @@ func (m *PubSub) Unsubscribe( if subs.Len() == 0 { m.subscriptionsMap.Delete(docKey, func(subs *Subscriptions, exists bool) bool { + subs.Close() return exists }) } @@ -200,7 +178,7 @@ func (m *PubSub) Publish( } if subs, ok := m.subscriptionsMap.Get(docKey); ok { - subs.Publish(ctx, event) + subs.Publish(event) } if logging.Enabled(zap.DebugLevel) { diff --git a/server/backend/sync/pubsub.go b/server/backend/sync/pubsub.go index 7fdc3ba01..4f54d4ed0 100644 --- a/server/backend/sync/pubsub.go +++ b/server/backend/sync/pubsub.go @@ -26,6 +26,11 @@ import ( "github.com/yorkie-team/yorkie/pkg/document/time" ) +const ( + // publishTimeout is the timeout for publishing an event. + publishTimeout = 100 * gotime.Millisecond +) + // Subscription represents a subscription of a subscriber to documents. type Subscription struct { id string @@ -88,12 +93,12 @@ func (s *Subscription) Publish(event DocEvent) bool { return false } - // NOTE: When a subscription is being closed by a subscriber, + // NOTE(hackerwins): When a subscription is being closed by a subscriber, // the subscriber may not receive messages. select { case s.Events() <- event: return true - case <-gotime.After(100 * gotime.Millisecond): + case <-gotime.After(publishTimeout): return false } } diff --git a/test/bench/grpc_bench_test.go b/test/bench/grpc_bench_test.go index 55107eb6f..c972ed078 100644 --- a/test/bench/grpc_bench_test.go +++ b/test/bench/grpc_bench_test.go @@ -202,7 +202,7 @@ func BenchmarkRPC(b *testing.B) { ctx := context.Background() d1 := document.New(helper.TestDocKey(b)) - err := c1.Attach(ctx, d1) + err := c1.Attach(ctx, d1, client.WithRealtimeSync()) assert.NoError(b, err) testKey1 := "testKey1" err = d1.Update(func(root *json.Object, p *presence.Presence) error { @@ -212,7 +212,7 @@ func BenchmarkRPC(b *testing.B) { assert.NoError(b, err) d2 := document.New(helper.TestDocKey(b)) - err = c2.Attach(ctx, d2) + err = c2.Attach(ctx, d2, client.WithRealtimeSync()) assert.NoError(b, err) testKey2 := "testKey2" err = d2.Update(func(root *json.Object, p *presence.Presence) error { diff --git a/test/integration/admin_test.go b/test/integration/admin_test.go index 6d77773b8..ab0bfaceb 100644 --- a/test/integration/admin_test.go +++ b/test/integration/admin_test.go @@ -86,7 +86,7 @@ func TestAdmin(t *testing.T) { // 01. c1 attaches and watches d1. d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) wg := sync.WaitGroup{} wg.Add(1) rch, cancel, err := c1.Subscribe(d1) diff --git a/test/integration/auth_webhook_test.go b/test/integration/auth_webhook_test.go index a37a6bf41..c4ac808f0 100644 --- a/test/integration/auth_webhook_test.go +++ b/test/integration/auth_webhook_test.go @@ -174,7 +174,7 @@ func TestProjectAuthWebhook(t *testing.T) { assert.NoError(t, err) doc := document.New(helper.TestDocKey(t)) - err = cli.Attach(ctx, doc) + err = cli.Attach(ctx, doc, client.WithRealtimeSync()) assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err)) _, _, err = cli.Subscribe(doc) diff --git a/test/integration/document_test.go b/test/integration/document_test.go index 3b82dfe03..c397583e9 100644 --- a/test/integration/document_test.go +++ b/test/integration/document_test.go @@ -173,11 +173,11 @@ func TestDocument(t *testing.T) { _, _, err := c1.Subscribe(d1) assert.ErrorIs(t, err, client.ErrDocumentNotAttached) - err = c1.Attach(ctx, d1) + err = c1.Attach(ctx, d1, client.WithRealtimeSync()) assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + err = c2.Attach(ctx, d2, client.WithRealtimeSync()) assert.NoError(t, err) wg := sync.WaitGroup{} @@ -431,7 +431,7 @@ func TestDocument(t *testing.T) { assert.ErrorIs(t, err, client.ErrDocumentNotAttached) // 02. c1 attaches d1 and watches it. - assert.NoError(t, c1.Attach(ctx, d1)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) _, _, err = c1.Subscribe(d1) assert.NoError(t, err) @@ -456,19 +456,19 @@ func TestDocument(t *testing.T) { } d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) rch1, _, err := c1.Subscribe(d1) assert.NoError(t, err) d1.SubscribeBroadcastEvent("mention", handler) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c2.Attach(ctx, d2)) + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) rch2, _, err := c2.Subscribe(d2) assert.NoError(t, err) d2.SubscribeBroadcastEvent("mention", handler) d3 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c3.Attach(ctx, d3)) + assert.NoError(t, c3.Attach(ctx, d3, client.WithRealtimeSync())) rch3, _, err := c3.Subscribe(d3) assert.NoError(t, err) d3.SubscribeBroadcastEvent("mention", handler) @@ -518,13 +518,13 @@ func TestDocument(t *testing.T) { } d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) rch1, _, err := c1.Subscribe(d1) assert.NoError(t, err) d1.SubscribeBroadcastEvent("mention", handler) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c2.Attach(ctx, d2)) + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) rch2, _, err := c2.Subscribe(d2) assert.NoError(t, err) d2.SubscribeBroadcastEvent("mention", handler) @@ -576,14 +576,14 @@ func TestDocument(t *testing.T) { } d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) rch1, _, err := c1.Subscribe(d1) assert.NoError(t, err) d1.SubscribeBroadcastEvent("mention", handler) // c2 doesn't subscribe to the "mention" broadcast event. d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c2.Attach(ctx, d2)) + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) rch2, _, err := c2.Subscribe(d2) assert.NoError(t, err) @@ -622,7 +622,7 @@ func TestDocument(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) _, _, err := c1.Subscribe(d1) assert.NoError(t, err) d1.SubscribeBroadcastEvent("mention", nil) @@ -645,13 +645,13 @@ func TestDocument(t *testing.T) { } d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) rch1, _, err := c1.Subscribe(d1) assert.NoError(t, err) d1.SubscribeBroadcastEvent("mention", handler) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c2.Attach(ctx, d2)) + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) rch2, _, err := c2.Subscribe(d2) assert.NoError(t, err) d2.SubscribeBroadcastEvent("mention", handler) @@ -729,7 +729,7 @@ func TestDocumentWithProjects(t *testing.T) { wg.Add(1) d1 := document.New(helper.TestDocKey(t)) - err = c1.Attach(ctx, d1) + err = c1.Attach(ctx, d1, client.WithRealtimeSync()) assert.NoError(t, err) rch, cancel1, err := c1.Subscribe(d1) defer cancel1() @@ -768,7 +768,7 @@ func TestDocumentWithProjects(t *testing.T) { }, }) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c2.Attach(ctx, d2)) + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) _, cancel2, err := c2.Subscribe(d2) assert.NoError(t, err) @@ -781,7 +781,7 @@ func TestDocumentWithProjects(t *testing.T) { // d3 is in another project, so c1 and c2 should not receive events. d3 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c3.Attach(ctx, d3)) + assert.NoError(t, c3.Attach(ctx, d3, client.WithRealtimeSync())) _, cancel3, err := c3.Subscribe(d3) assert.NoError(t, err) assert.NoError(t, d3.Update(func(root *json.Object, p *presence.Presence) error { diff --git a/test/integration/presence_test.go b/test/integration/presence_test.go index 9f0a8e891..122bbcc3b 100644 --- a/test/integration/presence_test.go +++ b/test/integration/presence_test.go @@ -128,9 +128,10 @@ func TestPresence(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() - assert.NoError(t, c2.Attach(ctx, d2)) + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) + assert.NoError(t, c1.Sync(ctx)) defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() // 02. Watch the first client's document. @@ -143,9 +144,6 @@ func TestPresence(t *testing.T) { defer cancel1() assert.NoError(t, err) go func() { - defer func() { - wgEvents.Done() - }() for { select { case <-time.After(time.Second): @@ -161,6 +159,7 @@ func TestPresence(t *testing.T) { Type: wr.Type, Presences: wr.Presences, }) + wgEvents.Done() } if len(responsePairs) == 3 { return @@ -176,8 +175,11 @@ func TestPresence(t *testing.T) { c2.ID().String(): {}, }, }) + _, cancel2, err := c2.Subscribe(d2) assert.NoError(t, err) + wgEvents.Wait() + wgEvents.Add(1) // 04. Update the second client's presence. err = d2.Update(func(root *json.Object, p *presence.Presence) error { @@ -193,6 +195,8 @@ func TestPresence(t *testing.T) { }) assert.NoError(t, c2.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) + wgEvents.Wait() + wgEvents.Add(1) // 05. Unwatch the second client's document. expected = append(expected, watchResponsePair{ @@ -207,14 +211,136 @@ func TestPresence(t *testing.T) { assert.Equal(t, expected, responsePairs) }) + t.Run("watch after attach events test", func(t *testing.T) { + // 01. Create two clients and documents and attach them. + ctx := context.Background() + d1 := document.New(helper.TestDocKey(t)) + d2 := document.New(helper.TestDocKey(t)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) + defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) + defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() + + // 02. Watch the first client's document. + var expected []watchResponsePair + var responsePairs []watchResponsePair + wgEvents := sync.WaitGroup{} + wgEvents.Add(1) + + wrch, cancel1, err := c1.Subscribe(d1) + defer cancel1() + assert.NoError(t, err) + go func() { + for { + select { + case <-time.After(time.Second): + assert.Fail(t, "timeout") + return + case wr := <-wrch: + if wr.Err != nil { + assert.Fail(t, "unexpected stream closing", wr.Err) + return + } + if wr.Type != client.DocumentChanged { + responsePairs = append(responsePairs, watchResponsePair{ + Type: wr.Type, + Presences: wr.Presences, + }) + wgEvents.Done() + } + if len(responsePairs) == 1 { + return + } + } + } + }() + + // 3. After c1 syncing the change of c2's attachment, c2 watches the document. + expected = append(expected, watchResponsePair{ + Type: client.DocumentWatched, + Presences: map[string]innerpresence.Presence{ + c2.ID().String(): {}, + }, + }) + assert.NoError(t, c1.Sync(ctx)) + _, cancel2, err := c2.Subscribe(d2) + assert.NoError(t, err) + defer cancel2() + + wgEvents.Wait() + assert.Equal(t, expected, responsePairs) + }) + + t.Run("attach after watch events test", func(t *testing.T) { + // 01. Create two clients and documents and attach them. + ctx := context.Background() + d1 := document.New(helper.TestDocKey(t)) + d2 := document.New(helper.TestDocKey(t)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) + defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) + defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() + + // 02. Watch the first client's document. + var expected []watchResponsePair + var responsePairs []watchResponsePair + wgEvents := sync.WaitGroup{} + wgEvents.Add(1) + + wrch, cancel1, err := c1.Subscribe(d1) + defer cancel1() + assert.NoError(t, err) + go func() { + for { + select { + case <-time.After(time.Second): + assert.Fail(t, "timeout") + return + case wr := <-wrch: + if wr.Err != nil { + assert.Fail(t, "unexpected stream closing", wr.Err) + return + } + if wr.Type != client.DocumentChanged { + responsePairs = append(responsePairs, watchResponsePair{ + Type: wr.Type, + Presences: wr.Presences, + }) + wgEvents.Done() + } + if len(responsePairs) == 1 { + return + } + } + } + }() + + // 3. After c2 watches the document, c1 syncs the change of c2's attachment. + expected = append(expected, watchResponsePair{ + Type: client.DocumentWatched, + Presences: map[string]innerpresence.Presence{ + c2.ID().String(): {}, + }, + }) + _, cancel2, err := c2.Subscribe(d2) + assert.NoError(t, err) + defer cancel2() + time.Sleep(100 * time.Millisecond) // wait until the window time for batch publishing + assert.NoError(t, c1.Sync(ctx)) + + wgEvents.Wait() + assert.Equal(t, expected, responsePairs) + }) + t.Run("unwatch after detach events test", func(t *testing.T) { // 01. Create two clients and documents and attach them. ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() - assert.NoError(t, c2.Attach(ctx, d2)) + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) + assert.NoError(t, c1.Sync(ctx)) // 02. Watch the first client's document. var expected []watchResponsePair @@ -226,9 +352,6 @@ func TestPresence(t *testing.T) { defer cancel1() assert.NoError(t, err) go func() { - defer func() { - wgEvents.Done() - }() for { select { case <-time.After(10 * time.Second): @@ -244,6 +367,7 @@ func TestPresence(t *testing.T) { Type: wr.Type, Presences: wr.Presences, }) + wgEvents.Done() } if len(responsePairs) == 3 { @@ -261,8 +385,9 @@ func TestPresence(t *testing.T) { }, }) _, cancel2, err := c2.Subscribe(d2) - defer cancel2() assert.NoError(t, err) + wgEvents.Wait() + wgEvents.Add(1) // 04. Update the second client's presence. err = d2.Update(func(root *json.Object, p *presence.Presence) error { @@ -278,6 +403,8 @@ func TestPresence(t *testing.T) { }) assert.NoError(t, c2.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) + wgEvents.Wait() + wgEvents.Add(1) // 05. Unwatch the second client's document. expected = append(expected, watchResponsePair{ @@ -299,9 +426,10 @@ func TestPresence(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() - assert.NoError(t, c2.Attach(ctx, d2)) + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) + assert.NoError(t, c1.Sync(ctx)) // 02. Watch the first client's document. var expected []watchResponsePair @@ -313,9 +441,6 @@ func TestPresence(t *testing.T) { defer cancel1() assert.NoError(t, err) go func() { - defer func() { - wgEvents.Done() - }() for { select { case <-time.After(10 * time.Second): @@ -331,6 +456,7 @@ func TestPresence(t *testing.T) { Type: wr.Type, Presences: wr.Presences, }) + wgEvents.Done() } if len(responsePairs) == 3 { @@ -348,8 +474,9 @@ func TestPresence(t *testing.T) { }, }) _, cancel2, err := c2.Subscribe(d2) - defer cancel2() assert.NoError(t, err) + wgEvents.Wait() + wgEvents.Add(1) // 04. Update the second client's presence. err = d2.Update(func(root *json.Object, p *presence.Presence) error { @@ -365,6 +492,8 @@ func TestPresence(t *testing.T) { }) assert.NoError(t, c2.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) + wgEvents.Wait() + wgEvents.Add(1) // 05. Unwatch the second client's document. expected = append(expected, watchResponsePair{ @@ -382,6 +511,130 @@ func TestPresence(t *testing.T) { assert.Equal(t, expected, responsePairs) }) + t.Run("watch after update events test", func(t *testing.T) { + // 01. Create two clients and documents and attach them. + ctx := context.Background() + d1 := document.New(helper.TestDocKey(t)) + d2 := document.New(helper.TestDocKey(t)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) + defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) + defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() + + // 02. Watch the first client's document. + var expected []watchResponsePair + var responsePairs []watchResponsePair + wgEvents := sync.WaitGroup{} + wgEvents.Add(1) + + wrch, cancel1, err := c1.Subscribe(d1) + defer cancel1() + assert.NoError(t, err) + go func() { + for { + select { + case <-time.After(time.Second): + assert.Fail(t, "timeout") + return + case wr := <-wrch: + if wr.Err != nil { + assert.Fail(t, "unexpected stream closing", wr.Err) + return + } + if wr.Type != client.DocumentChanged { + responsePairs = append(responsePairs, watchResponsePair{ + Type: wr.Type, + Presences: wr.Presences, + }) + wgEvents.Done() + } + if len(responsePairs) == 1 { + return + } + } + } + }() + + // 3. After c1 syncing the change of c2's update, c2 watches the document. + expected = append(expected, watchResponsePair{ + Type: client.DocumentWatched, + Presences: map[string]innerpresence.Presence{ + c2.ID().String(): {"updated": "true"}, + }, + }) + err = d2.Update(func(root *json.Object, p *presence.Presence) error { + p.Set("updated", "true") + return nil + }) + assert.NoError(t, err) + assert.NoError(t, c2.Sync(ctx)) + assert.NoError(t, c1.Sync(ctx)) + _, cancel2, err := c2.Subscribe(d2) + assert.NoError(t, err) + defer cancel2() + + wgEvents.Wait() + assert.Equal(t, expected, responsePairs) + }) + + t.Run("watch after detach events test", func(t *testing.T) { + // 01. Create two clients and documents and attach them. + ctx := context.Background() + d1 := document.New(helper.TestDocKey(t)) + d2 := document.New(helper.TestDocKey(t)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) + defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) + + // 02. Watch the first client's document. + var responsePairs []watchResponsePair + wgEvents := sync.WaitGroup{} + wgEvents.Add(1) + + wrch, cancel1, err := c1.Subscribe(d1) + defer cancel1() + assert.NoError(t, err) + go func() { + for { + select { + case <-time.After(time.Second): + wgEvents.Done() + return + case wr := <-wrch: + if wr.Err != nil { + assert.Fail(t, "unexpected stream closing", wr.Err) + return + } + if wr.Type != client.DocumentChanged { + responsePairs = append(responsePairs, watchResponsePair{ + Type: wr.Type, + Presences: wr.Presences, + }) + println("what???", wr.Type) + assert.Fail(t, "unexpected presence event") + } + } + } + }() + + // 3. After c1 syncing the change of c2's detachment, c2 watches the document. + err = d2.Update(func(root *json.Object, p *presence.Presence) error { + p.Set("updated", "true") + return nil + }) + assert.NoError(t, err) + assert.NoError(t, c1.Sync(ctx)) + + _, cancel2, err := c2.Subscribe(d2) // No events yet - before the window time for batch publishing + assert.NoError(t, err) + defer cancel2() + assert.NoError(t, c2.Detach(ctx, d2)) + assert.NoError(t, c1.Sync(ctx)) + + wgEvents.Wait() + assert.Equal(t, 0, len(responsePairs)) + }) + t.Run("watching multiple documents test", func(t *testing.T) { ctx := context.Background() @@ -390,7 +643,7 @@ func TestPresence(t *testing.T) { d1 := document.New(helper.TestDocKey(t)) d2 := document.New(helper.TestDocKey(t)) d3 := document.New(helper.TestDocKey(t) + "2") - assert.NoError(t, c1.Attach(ctx, d1)) + assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync())) defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() // 02. Watch the first client's document. @@ -433,9 +686,9 @@ func TestPresence(t *testing.T) { // 03. The second client attaches a document with the same key as the first client's document // and another document with a different key. - assert.NoError(t, c2.Attach(ctx, d2)) + assert.NoError(t, c2.Attach(ctx, d2, client.WithRealtimeSync())) defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() - assert.NoError(t, c2.Attach(ctx, d3)) + assert.NoError(t, c2.Attach(ctx, d3, client.WithRealtimeSync())) defer func() { assert.NoError(t, c2.Detach(ctx, d3)) }() // 04. The second client watches the documents attached by itself. diff --git a/test/integration/server_test.go b/test/integration/server_test.go index 30dc03d5e..7003aad8a 100644 --- a/test/integration/server_test.go +++ b/test/integration/server_test.go @@ -42,7 +42,7 @@ func TestServer(t *testing.T) { assert.NoError(t, cli.Activate(ctx)) doc := document.New(helper.TestDocKey(t)) - assert.NoError(t, cli.Attach(ctx, doc)) + assert.NoError(t, cli.Attach(ctx, doc, client.WithRealtimeSync())) wg := sync.WaitGroup{} wrch, _, err := cli.Subscribe(doc)