Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce dedicated event publisher per document #1052

Merged
merged 5 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
c.attachments[doc.Key()].watchCtx = watchCtx
c.attachments[doc.Key()].closeWatchStream = cancelFunc

if !opts.IsManual {
if opts.IsRealtime {
err = c.runWatchLoop(watchCtx, doc)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type AttachOptions struct {
// Presence is the presence of the client.
Presence innerpresence.Presence
InitialRoot map[string]any
IsManual bool
IsRealtime bool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Update comment in WithRealtimeSync function

The migration from manual to realtime sync is complete, but there's a minor documentation issue. The comment for WithRealtimeSync incorrectly mentions "manual sync" instead of "realtime sync". The reference in install-crds.yaml is unrelated as it's for ArgoCD configuration.

  • client/options.go: Update the comment // WithRealtimeSync configures the manual sync of the client. to // WithRealtimeSync configures the realtime sync of the client.
🔗 Analysis chain

Verify removal of manual sync references.

The change from manual to real-time synchronization is a significant change. Let's verify that all references to manual synchronization have been properly updated.


Let's check the content of the files that still contain manual sync references to understand the context.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any remaining references to manual sync that might need updating

# Test 1: Search for IsManual references
echo "Searching for IsManual references..."
rg "IsManual" -l

# Test 2: Search for WithManualSync references
echo "Searching for WithManualSync references..."
rg "WithManualSync" -l

# Test 3: Search for "manual sync" or "manual synchronization" in comments
echo "Searching for manual sync references in comments..."
rg -i "manual.?sync" -l

Length of output: 424


Script:

#!/bin/bash
# Check content of files with manual sync references
echo "Content of client/options.go with manual sync references:"
rg -i "manual.?sync" -C 3 client/options.go

echo -e "\nContent of build/charts/yorkie-argocd/crds/install-crds.yaml with manual sync references:"
rg -i "manual.?sync" -C 3 build/charts/yorkie-argocd/crds/install-crds.yaml

Length of output: 981

}

// WithPresence configures the presence of the client.
Expand All @@ -111,9 +111,9 @@ func WithInitialRoot(root map[string]any) AttachOption {
}
}

// WithManualSync configures the manual sync of the client.
func WithManualSync() AttachOption {
return func(o *AttachOptions) { o.IsManual = true }
// WithRealtimeSync configures the manual sync of the client.
func WithRealtimeSync() AttachOption {
return func(o *AttachOptions) { o.IsRealtime = true }
Comment on lines +114 to +116
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect documentation comment.

The documentation comment mentions "manual sync" but the function enables real-time sync. Please update it to accurately reflect the function's purpose.

-// WithRealtimeSync configures the manual sync of the client.
+// WithRealtimeSync enables real-time synchronization for the document.
+// When enabled, changes are automatically synchronized using batch publishing
+// instead of requiring manual synchronization.
 func WithRealtimeSync() AttachOption {
 	return func(o *AttachOptions) { o.IsRealtime = true }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// WithRealtimeSync configures the manual sync of the client.
func WithRealtimeSync() AttachOption {
return func(o *AttachOptions) { o.IsRealtime = true }
// WithRealtimeSync enables real-time synchronization for the document.
// When enabled, changes are automatically synchronized using batch publishing
// instead of requiring manual synchronization.
func WithRealtimeSync() AttachOption {
return func(o *AttachOptions) { o.IsRealtime = true }

}

// DetachOption configures DetachOptions.
Expand Down
131 changes: 131 additions & 0 deletions server/backend/sync/memory/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package memory

import (
"strconv"
gosync "sync"
"sync/atomic"
time "time"

"go.uber.org/zap"

"github.com/yorkie-team/yorkie/server/backend/sync"
"github.com/yorkie-team/yorkie/server/logging"
)

var id loggerID

type loggerID int32

func (c *loggerID) next() string {
next := atomic.AddInt32((*int32)(c), 1)
return "p" + strconv.Itoa(int(next))
}

// BatchPublisher is a publisher that publishes events in batch.
type BatchPublisher struct {
logger *zap.SugaredLogger
mutex gosync.Mutex
events []sync.DocEvent

window time.Duration
closeChan chan struct{}
subs *Subscriptions
}
Comment on lines +40 to +49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding capacity hints to the events slice

The events slice will be frequently appended to and cleared. Consider initializing it with a capacity hint based on expected batch sizes to reduce allocations.

 type BatchPublisher struct {
 	logger *zap.SugaredLogger
 	mutex  gosync.Mutex
-	events []sync.DocEvent
+	events []sync.DocEvent // TODO: Add field comment about typical capacity

 	window    time.Duration
 	closeChan chan struct{}
 	subs      *Subscriptions
 }

Committable suggestion skipped: line range outside the PR's diff.


// NewBatchPublisher creates a new BatchPublisher instance.
func NewBatchPublisher(subs *Subscriptions, window time.Duration) *BatchPublisher {
bp := &BatchPublisher{
logger: logging.New(id.next()),
window: window,
closeChan: make(chan struct{}),
subs: subs,
}

go bp.processLoop()
return bp
}
Comment on lines +51 to +62
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add validation for window duration and consider error handling for goroutine startup

The constructor should validate the window duration and provide a way to handle potential startup errors.

-func NewBatchPublisher(subs *Subscriptions, window time.Duration) *BatchPublisher {
+func NewBatchPublisher(subs *Subscriptions, window time.Duration) (*BatchPublisher, error) {
+	if window <= 0 {
+		return nil, fmt.Errorf("window duration must be positive, got %v", window)
+	}
+	if subs == nil {
+		return nil, fmt.Errorf("subscriptions cannot be nil")
+	}
+
 	bp := &BatchPublisher{
 		logger:    logging.New(id.next()),
 		window:    window,
 		closeChan: make(chan struct{}),
 		subs:      subs,
+		events:    make([]sync.DocEvent, 0, 64), // Initial capacity hint
 	}
 
-	go bp.processLoop()
-	return bp
+	// Start processing loop with error channel for startup synchronization
+	errCh := make(chan error, 1)
+	go func() {
+		errCh <- bp.processLoop()
+	}()
+
+	// Wait for initial startup
+	select {
+	case err := <-errCh:
+		if err != nil {
+			return nil, fmt.Errorf("failed to start batch publisher: %w", err)
+		}
+	case <-time.After(time.Second):
+		return nil, fmt.Errorf("timeout waiting for batch publisher to start")
+	}
+
+	return bp, nil
 }

Committable suggestion skipped: line range outside the PR's diff.


// Publish adds the given event to the batch. If the batch is full, it publishes
// the batch.
func (bp *BatchPublisher) Publish(event sync.DocEvent) {
bp.mutex.Lock()
defer bp.mutex.Unlock()

// TODO(hackerwins): If DocumentChangedEvent is already in the batch, we don't
// need to add it again.
bp.events = append(bp.events, event)
}

func (bp *BatchPublisher) processLoop() {
ticker := time.NewTicker(bp.window)
defer ticker.Stop()

for {
select {
case <-ticker.C:
bp.publish()
case <-bp.closeChan:
return
}
}
}

func (bp *BatchPublisher) publish() {
bp.mutex.Lock()

if len(bp.events) == 0 {
bp.mutex.Unlock()
return
}

events := bp.events
bp.events = nil

bp.mutex.Unlock()

Comment on lines +89 to +101
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider optimizing lock handling

The lock is held while checking for empty events. Consider restructuring to minimize lock duration:

 func (bp *BatchPublisher) publish() {
 	bp.mutex.Lock()
+	events := bp.events
+	bp.events = nil
+	bp.mutex.Unlock()
 
-	if len(bp.events) == 0 {
-		bp.mutex.Unlock()
+	if len(events) == 0 {
 		return
 	}
-
-	events := bp.events
-	bp.events = nil
-
-	bp.mutex.Unlock()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (bp *BatchPublisher) publish() {
bp.mutex.Lock()
if len(bp.events) == 0 {
bp.mutex.Unlock()
return
}
events := bp.events
bp.events = nil
bp.mutex.Unlock()
func (bp *BatchPublisher) publish() {
bp.mutex.Lock()
events := bp.events
bp.events = nil
bp.mutex.Unlock()
if len(events) == 0 {
return
}

if logging.Enabled(zap.DebugLevel) {
bp.logger.Infof(
"Publishing batch of %d events for document %s",
len(bp.events),
bp.subs.docKey,
)
}
Comment on lines +89 to +108
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect logging of batch size

The logging statement uses bp.events after it has been set to nil. It should use events instead.

 	if logging.Enabled(zap.DebugLevel) {
 		bp.logger.Infof(
 			"Publishing batch of %d events for document %s",
-			len(bp.events),
+			len(events),
 			bp.subs.docKey,
 		)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (bp *BatchPublisher) publish() {
bp.mutex.Lock()
if len(bp.events) == 0 {
bp.mutex.Unlock()
return
}
events := bp.events
bp.events = nil
bp.mutex.Unlock()
if logging.Enabled(zap.DebugLevel) {
bp.logger.Infof(
"Publishing batch of %d events for document %s",
len(bp.events),
bp.subs.docKey,
)
}
func (bp *BatchPublisher) publish() {
bp.mutex.Lock()
if len(bp.events) == 0 {
bp.mutex.Unlock()
return
}
events := bp.events
bp.events = nil
bp.mutex.Unlock()
if logging.Enabled(zap.DebugLevel) {
bp.logger.Infof(
"Publishing batch of %d events for document %s",
len(events),
bp.subs.docKey,
)
}


for _, sub := range bp.subs.Values() {
for _, event := range events {
if sub.Subscriber().Compare(event.Publisher) == 0 {
continue
}

if ok := sub.Publish(event); !ok {
bp.logger.Infof(
"Publish(%s,%s) to %s timeout or closed",
event.Type,
event.Publisher,
sub.Subscriber(),
)
}
}
}
}

// Close stops the batch publisher
func (bp *BatchPublisher) Close() {
close(bp.closeChan)
}
Comment on lines +128 to +131
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance Close method to ensure graceful shutdown

The current implementation might lose events that haven't been published yet. Consider implementing a more graceful shutdown:

+// Close stops the batch publisher and ensures all pending events are published
 func (bp *BatchPublisher) Close() {
+	bp.mutex.Lock()
+	if len(bp.events) > 0 {
+		// Publish remaining events
+		bp.publish()
+	}
+	bp.mutex.Unlock()
+
 	close(bp.closeChan)
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +40 to +131
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the gathered information, I can now provide a final response:

Thread safety concerns in BatchPublisher implementation

The implementation has potential issues that need to be addressed:

  1. The Close() method doesn't ensure all pending events are published before shutdown
  2. No unit tests exist specifically for BatchPublisher, while the codebase has race detection enabled (-race flag in Makefile)
  3. The processLoop() exits immediately on close signal without draining pending events

Recommendations:

  • Add a final publish call in Close() to flush pending events
  • Add proper unit tests to verify concurrent publishing and shutdown scenarios
  • Consider adding a sync.WaitGroup to track pending publish operations
🔗 Analysis chain

Verify thread safety and resource cleanup

While the implementation reduces lock contention through batching, there are several areas that need verification:

  1. Race conditions during shutdown
  2. Proper cleanup of resources
  3. Handling of concurrent Publish calls during shutdown
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for tests that verify concurrent publishing and shutdown scenarios
rg -l "TestBatchPublisher.*Concurrent|TestBatchPublisher.*Race" test/

Length of output: 71


Script:

#!/bin/bash
# Let's first check if there are any tests for BatchPublisher
rg -l "TestBatchPublisher" --type go

# Then look for the actual test implementations to understand the coverage
rg "func.*TestBatchPublisher" -A 10 --type go

# Check for any race-related test flags or configurations
rg -l "go test.*-race" 

# Look for any concurrent operations in the test files
rg "go func.*\(\)" -A 5 --type go --glob "*test.go"

Length of output: 9904


Script:

#!/bin/bash
# Let's check the Makefile for race detection configuration
cat Makefile | grep -A 5 "test"

# Look for the actual BatchPublisher implementation to analyze shutdown logic
rg -g "publisher.go" "func.*Close|func.*processLoop" -A 10

# Check if there are any existing tests for BatchPublisher in memory package
rg -g "*_test.go" "BatchPublisher" ./server/backend/sync/memory/

Length of output: 2165

50 changes: 14 additions & 36 deletions server/backend/sync/memory/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package memory

import (
"context"
gotime "time"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use standard alias for the time package

Consider using the standard alias time instead of gotime for clarity and consistency across the codebase.

Apply this diff to update the import:

-import gotime "time"
+import "time"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
gotime "time"
"time"


"go.uber.org/zap"

Expand All @@ -32,13 +33,16 @@ import (
type Subscriptions struct {
docKey types.DocRefKey
internalMap *cmap.Map[string, *sync.Subscription]
publisher *BatchPublisher
}

func newSubscriptions(docKey types.DocRefKey) *Subscriptions {
return &Subscriptions{
s := &Subscriptions{
docKey: docKey,
internalMap: cmap.New[string, *sync.Subscription](),
}
s.publisher = NewBatchPublisher(s, 100*gotime.Millisecond)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make batch interval configurable

The batch interval 100*gotime.Millisecond is hardcoded. Consider making this value configurable to allow flexibility for different use cases and environments.

For example, you might introduce a configuration parameter or pass it as an argument to NewBatchPublisher.

return s
}

// Set adds the given subscription.
Expand All @@ -52,40 +56,8 @@ func (s *Subscriptions) Values() []*sync.Subscription {
}

// Publish publishes the given event.
func (s *Subscriptions) Publish(ctx context.Context, event sync.DocEvent) {
// TODO(hackerwins): Introduce batch publish to reduce lock contention.
// Problem:
// - High lock contention when publishing events frequently.
// - Redundant events being published in short time windows.
// Solution:
// - Collect events to publish in configurable time window.
// - Keep only the latest event for the same event type.
// - Run dedicated publish loop in a single goroutine.
// - Batch publish collected events when the time window expires.
for _, sub := range s.internalMap.Values() {
if sub.Subscriber().Compare(event.Publisher) == 0 {
continue
}

if logging.Enabled(zap.DebugLevel) {
logging.From(ctx).Debugf(
`Publish %s(%s,%s) to %s`,
event.Type,
s.docKey,
event.Publisher,
sub.Subscriber(),
)
}

if ok := sub.Publish(event); !ok {
logging.From(ctx).Warnf(
`Publish(%s,%s) to %s timeout or closed`,
s.docKey,
event.Publisher,
sub.Subscriber(),
)
}
}
func (s *Subscriptions) Publish(event sync.DocEvent) {
s.publisher.Publish(event)
}

// Delete deletes the subscription of the given id.
Expand All @@ -103,6 +75,11 @@ func (s *Subscriptions) Len() int {
return s.internalMap.Len()
}

// Close closes the subscriptions.
func (s *Subscriptions) Close() {
s.publisher.Close()
}
Comment on lines +79 to +81
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle errors during publisher closure

The Close method calls s.publisher.Close() but does not handle potential errors. Consider checking for errors to ensure resources are released properly.

Apply this diff to handle the error:

 func (s *Subscriptions) Close() {
-	s.publisher.Close()
+	if err := s.publisher.Close(); err != nil {
+		// Handle the error appropriately, e.g., log it
+		logging.DefaultLogger().Error(err)
+	}
 }

Committable suggestion skipped: line range outside the PR's diff.


// PubSub is the memory implementation of PubSub, used for single server.
type PubSub struct {
subscriptionsMap *cmap.Map[types.DocRefKey, *Subscriptions]
Expand Down Expand Up @@ -170,6 +147,7 @@ func (m *PubSub) Unsubscribe(

if subs.Len() == 0 {
m.subscriptionsMap.Delete(docKey, func(subs *Subscriptions, exists bool) bool {
subs.Close()
return exists
})
}
Expand Down Expand Up @@ -200,7 +178,7 @@ func (m *PubSub) Publish(
}

if subs, ok := m.subscriptionsMap.Get(docKey); ok {
subs.Publish(ctx, event)
subs.Publish(event)
}

if logging.Enabled(zap.DebugLevel) {
Expand Down
9 changes: 7 additions & 2 deletions server/backend/sync/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"github.com/yorkie-team/yorkie/pkg/document/time"
)

const (
// publishTimeout is the timeout for publishing an event.
publishTimeout = 100 * gotime.Millisecond
)
Comment on lines +29 to +32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider making publishTimeout configurable.

While extracting the timeout to a constant improves maintainability, consider making it configurable through environment variables or configuration settings. This would allow tuning the timeout for different deployment scenarios without code changes.

 const (
-	// publishTimeout is the timeout for publishing an event.
-	publishTimeout = 100 * gotime.Millisecond
+	// defaultPublishTimeout is the default timeout for publishing an event.
+	defaultPublishTimeout = 100 * gotime.Millisecond
 )
+
+// PublishTimeout returns the configured timeout for publishing events
+func PublishTimeout() time.Duration {
+	if val := os.Getenv("YORKIE_PUBLISH_TIMEOUT"); val != "" {
+		if duration, err := time.ParseDuration(val); err == nil {
+			return duration
+		}
+	}
+	return defaultPublishTimeout
+}

Committable suggestion skipped: line range outside the PR's diff.


// Subscription represents a subscription of a subscriber to documents.
type Subscription struct {
id string
Expand Down Expand Up @@ -88,12 +93,12 @@ func (s *Subscription) Publish(event DocEvent) bool {
return false
}

// NOTE: When a subscription is being closed by a subscriber,
// NOTE(hackerwins): When a subscription is being closed by a subscriber,
// the subscriber may not receive messages.
select {
case s.Events() <- event:
return true
case <-gotime.After(100 * gotime.Millisecond):
case <-gotime.After(publishTimeout):
return false
}
}
4 changes: 2 additions & 2 deletions test/bench/grpc_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func BenchmarkRPC(b *testing.B) {
ctx := context.Background()

d1 := document.New(helper.TestDocKey(b))
err := c1.Attach(ctx, d1)
err := c1.Attach(ctx, d1, client.WithRealtimeSync())
assert.NoError(b, err)
testKey1 := "testKey1"
err = d1.Update(func(root *json.Object, p *presence.Presence) error {
Expand All @@ -212,7 +212,7 @@ func BenchmarkRPC(b *testing.B) {
assert.NoError(b, err)

d2 := document.New(helper.TestDocKey(b))
err = c2.Attach(ctx, d2)
err = c2.Attach(ctx, d2, client.WithRealtimeSync())
assert.NoError(b, err)
testKey2 := "testKey2"
err = d2.Update(func(root *json.Object, p *presence.Presence) error {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestAdmin(t *testing.T) {

// 01. c1 attaches and watches d1.
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync()))
wg := sync.WaitGroup{}
wg.Add(1)
rch, cancel, err := c1.Subscribe(d1)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/auth_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestProjectAuthWebhook(t *testing.T) {
assert.NoError(t, err)

doc := document.New(helper.TestDocKey(t))
err = cli.Attach(ctx, doc)
err = cli.Attach(ctx, doc, client.WithRealtimeSync())
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))

_, _, err = cli.Subscribe(doc)
Expand Down
Loading
Loading