Skip to content

Commit

Permalink
Refactor BatchPublisher to improve event publishing and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Nov 1, 2024
1 parent a49d291 commit c7f2db2
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 29 deletions.
56 changes: 33 additions & 23 deletions server/backend/sync/memory/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package memory

import (
"context"
"strconv"
gosync "sync"
"sync/atomic"
time "time"

"go.uber.org/zap"
Expand All @@ -27,21 +28,31 @@ import (
"github.com/yorkie-team/yorkie/server/logging"
)

var id routineID

type routineID int32

func (c *routineID) next() string {
next := atomic.AddInt32((*int32)(c), 1)
return "p" + strconv.Itoa(int(next))
}

// BatchPublisher is a publisher that publishes events in batch.
type BatchPublisher struct {
events []sync.DocEvent
mutex gosync.Mutex
logger *zap.SugaredLogger
mutex gosync.Mutex
events []sync.DocEvent

window time.Duration
maxBatch int
closeChan chan struct{}
subs *Subscriptions
}

// NewBatchPublisher creates a new BatchPublisher instance.
func NewBatchPublisher(window time.Duration, maxBatch int, subs *Subscriptions) *BatchPublisher {
func NewBatchPublisher(subs *Subscriptions, window time.Duration) *BatchPublisher {
bp := &BatchPublisher{
logger: logging.New(id.next()),
window: window,
maxBatch: maxBatch,
closeChan: make(chan struct{}),
subs: subs,
}
Expand All @@ -52,18 +63,13 @@ func NewBatchPublisher(window time.Duration, maxBatch int, subs *Subscriptions)

// Publish adds the given event to the batch. If the batch is full, it publishes
// the batch.
func (bp *BatchPublisher) Publish(ctx context.Context, event sync.DocEvent) {
func (bp *BatchPublisher) Publish(event sync.DocEvent) {
bp.mutex.Lock()
defer bp.mutex.Unlock()

// TODO(hackerwins): If the event is DocumentChangedEvent, we should merge
// the events to reduce the number of events to be published.
// TODO(hackerwins): If DocumentChangedEvent is already in the batch, we don't
// need to add it again.
bp.events = append(bp.events, event)

// TODO(hackerwins): Consider to use processLoop to publish events.
if len(bp.events) >= bp.maxBatch {
bp.publish(ctx)
}
}

func (bp *BatchPublisher) processLoop() {
Expand All @@ -73,36 +79,42 @@ func (bp *BatchPublisher) processLoop() {
for {
select {
case <-ticker.C:
bp.mutex.Lock()
bp.publish(context.Background())
bp.mutex.Unlock()
bp.publish()
case <-bp.closeChan:
return
}
}
}

func (bp *BatchPublisher) publish(ctx context.Context) {
func (bp *BatchPublisher) publish() {
bp.mutex.Lock()

if len(bp.events) == 0 {
bp.mutex.Unlock()
return
}

events := bp.events
bp.events = nil

bp.mutex.Unlock()

if logging.Enabled(zap.DebugLevel) {
logging.From(ctx).Infof(
bp.logger.Infof(
"Publishing batch of %d events for document %s",
len(bp.events),
bp.subs.docKey,
)
}

for _, sub := range bp.subs.Values() {
for _, event := range bp.events {
for _, event := range events {
if sub.Subscriber().Compare(event.Publisher) == 0 {
continue
}

if ok := sub.Publish(event); !ok {
logging.From(ctx).Infof(
bp.logger.Infof(
"Publish(%s,%s) to %s timeout or closed",
event.Type,
event.Publisher,
Expand All @@ -111,8 +123,6 @@ func (bp *BatchPublisher) publish(ctx context.Context) {
}
}
}

bp.events = nil
}

// Close stops the batch publisher
Expand Down
8 changes: 4 additions & 4 deletions server/backend/sync/memory/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newSubscriptions(docKey types.DocRefKey) *Subscriptions {
docKey: docKey,
internalMap: cmap.New[string, *sync.Subscription](),
}
s.publisher = NewBatchPublisher(100*gotime.Millisecond, 100, s)
s.publisher = NewBatchPublisher(s, 100*gotime.Millisecond)
return s
}

Expand All @@ -56,8 +56,8 @@ func (s *Subscriptions) Values() []*sync.Subscription {
}

// Publish publishes the given event.
func (s *Subscriptions) Publish(ctx context.Context, event sync.DocEvent) {
s.publisher.Publish(ctx, event)
func (s *Subscriptions) Publish(event sync.DocEvent) {
s.publisher.Publish(event)
}

// Delete deletes the subscription of the given id.
Expand Down Expand Up @@ -178,7 +178,7 @@ func (m *PubSub) Publish(
}

if subs, ok := m.subscriptionsMap.Get(docKey); ok {
subs.Publish(ctx, event)
subs.Publish(event)
}

if logging.Enabled(zap.DebugLevel) {
Expand Down
9 changes: 7 additions & 2 deletions server/backend/sync/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"github.com/yorkie-team/yorkie/pkg/document/time"
)

const (
// publishTimeout is the timeout for publishing an event.
publishTimeout = 100 * gotime.Millisecond
)

// Subscription represents a subscription of a subscriber to documents.
type Subscription struct {
id string
Expand Down Expand Up @@ -88,12 +93,12 @@ func (s *Subscription) Publish(event DocEvent) bool {
return false
}

// NOTE: When a subscription is being closed by a subscriber,
// NOTE(hackerwins): When a subscription is being closed by a subscriber,
// the subscriber may not receive messages.
select {
case s.Events() <- event:
return true
case <-gotime.After(100 * gotime.Millisecond):
case <-gotime.After(publishTimeout):
return false
}
}

0 comments on commit c7f2db2

Please sign in to comment.