Skip to content

Commit

Permalink
puller: avoid oom during incremental scan (#998)
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu authored Feb 17, 2025
1 parent 796d1bd commit 510d0a3
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 11 deletions.
33 changes: 31 additions & 2 deletions logservice/logpuller/region_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package logpuller
import (
"encoding/hex"
"time"
"unsafe"

"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -45,6 +46,24 @@ type regionEvent struct {
resolvedTs uint64
}

func (event *regionEvent) getSize() int {
if event == nil {
return 0
}
size := int(unsafe.Sizeof(*event))
if event.entries != nil {
size += int(unsafe.Sizeof(*event.entries))
size += int(unsafe.Sizeof(*event.entries.Entries))
for _, row := range event.entries.Entries.GetEntries() {
size += int(unsafe.Sizeof(*row))
size += len(row.Key)
size += len(row.Value)
size += len(row.OldValue)
}
}
return size
}

type regionEventHandler struct {
subClient *SubscriptionClient
}
Expand Down Expand Up @@ -88,7 +107,10 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent)
return false
}

func (h *regionEventHandler) GetSize(event regionEvent) int { return 0 }
func (h *regionEventHandler) GetSize(event regionEvent) int {
return event.getSize()
}

func (h *regionEventHandler) GetArea(path SubscriptionID, dest *subscribedSpan) int {
return 0
}
Expand Down Expand Up @@ -130,7 +152,14 @@ func (h *regionEventHandler) GetType(event regionEvent) dynstream.EventType {
return dynstream.DefaultEventType
}

func (h *regionEventHandler) OnDrop(event regionEvent) {}
func (h *regionEventHandler) OnDrop(event regionEvent) {
log.Warn("drop region event",
zap.Uint64("regionID", event.state.getRegionID()),
zap.Uint64("requestID", event.state.requestID),
zap.Uint64("workerID", event.worker.workerID),
zap.Bool("hasEntries", event.entries != nil),
zap.Bool("stateIsStale", event.state.isStale()))
}

func (h *regionEventHandler) handleRegionError(state *regionFeedState, worker *regionRequestWorker) {
stepsToRemoved := state.markRemoved()
Expand Down
8 changes: 4 additions & 4 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func newRegionRequestWorker(
state: state,
worker: worker,
}
worker.client.ds.Push(subID, regionEvent)
worker.client.pushRegionEventToDS(subID, regionEvent)
}
}
// The store may fail forever, so we need try to re-schedule all pending regions.
Expand Down Expand Up @@ -253,7 +253,7 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event)
default:
log.Panic("unknown event type", zap.Any("event", event))
}
s.client.ds.Push(SubscriptionID(event.RequestId), regionEvent)
s.client.pushRegionEventToDS(SubscriptionID(event.RequestId), regionEvent)
} else {
log.Warn("region request worker receives a region event for an untracked region",
zap.Uint64("workerID", s.workerID),
Expand All @@ -269,7 +269,7 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res
s.client.metrics.batchResolvedSize.Observe(float64(len(resolvedTsEvent.Regions)))
for _, regionID := range resolvedTsEvent.Regions {
if state := s.getRegionState(subscriptionID, regionID); state != nil {
s.client.ds.Push(SubscriptionID(resolvedTsEvent.RequestId), regionEvent{
s.client.pushRegionEventToDS(SubscriptionID(resolvedTsEvent.RequestId), regionEvent{
state: state,
worker: s,
resolvedTs: resolvedTsEvent.Ts,
Expand Down Expand Up @@ -345,7 +345,7 @@ func (s *regionRequestWorker) processRegionSendTask(
state: state,
worker: s,
}
s.client.ds.Push(subID, regionEvent)
s.client.pushRegionEventToDS(subID, regionEvent)
}
} else if region.subscribedSpan.stopped.Load() {
// It can be skipped directly because there must be no pending states from
Expand Down
49 changes: 47 additions & 2 deletions logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ type SubscriptionClient struct {
lockResolver txnutil.LockResolver

ds dynstream.DynamicStream[int, SubscriptionID, regionEvent, *subscribedSpan, *regionEventHandler]
// the following three fields are used to manage feedback from ds and notify other goroutines
mu sync.Mutex
cond *sync.Cond
paused atomic.Bool

// the credential to connect tikv
credential *security.Credential
Expand Down Expand Up @@ -227,14 +231,16 @@ func NewSubscriptionClient(

option := dynstream.NewOption()
option.BatchCount = 1024
option.UseBuffer = true
option.UseBuffer = false
option.EnableMemoryControl = true
ds := dynstream.NewParallelDynamicStream(
func(subID SubscriptionID) uint64 { return uint64(subID) },
&regionEventHandler{subClient: subClient},
option,
)
ds.Start()
subClient.ds = ds
subClient.cond = sync.NewCond(&subClient.mu)

subClient.initMetrics()
return subClient
Expand Down Expand Up @@ -308,7 +314,9 @@ func (s *SubscriptionClient) Subscribe(
s.totalSpans.spanMap[subID] = rt
s.totalSpans.Unlock()

s.ds.AddPath(rt.subID, rt, dynstream.AreaSettings{})
s.ds.AddPath(rt.subID, rt, dynstream.AreaSettings{
MaxPendingSize: 2 * 1024 * 1024 * 1024, // 2GB
})

s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: rt}
}
Expand All @@ -335,6 +343,42 @@ func (s *SubscriptionClient) wakeSubscription(subID SubscriptionID) {
s.ds.Wake(subID)
}

func (s *SubscriptionClient) pushRegionEventToDS(subID SubscriptionID, event regionEvent) {
// fast path
if !s.paused.Load() {
s.ds.Push(subID, event)
return
}
// slow path: wait until paused is false
s.mu.Lock()
for s.paused.Load() {
s.cond.Wait()
}
s.mu.Unlock()
s.ds.Push(subID, event)
}

func (s *SubscriptionClient) handleDSFeedBack(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case feedback := <-s.ds.Feedback():
switch feedback.FeedbackType {
case dynstream.PauseArea:
s.paused.Store(true)
log.Info("subscription client pause push region event")
case dynstream.ResumeArea:
s.paused.Store(false)
s.cond.Broadcast()
log.Info("subscription client resume push region event")
case dynstream.PausePath, dynstream.ResumePath:
// Ignore it, because it is no need to pause and resume a path in puller.
}
}
}
}

// RegionCount returns subscribed region count for the span.
func (s *SubscriptionClient) RegionCount(subID SubscriptionID) uint64 {
s.totalSpans.RLock()
Expand All @@ -356,6 +400,7 @@ func (s *SubscriptionClient) Run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error { return s.updateMetrics(ctx) })
g.Go(func() error { return s.handleDSFeedBack(ctx) })
g.Go(func() error { return s.handleRangeTasks(ctx) })
g.Go(func() error { return s.handleRegions(ctx, g) })
g.Go(func() error { return s.handleErrors(ctx) })
Expand Down
6 changes: 4 additions & 2 deletions utils/dynstream/memory_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,14 @@ func (as *areaMemStat[A, P, T, D, H]) shouldPauseArea() (pause bool, resume bool
return
}

func (as *areaMemStat[A, P, T, D, H]) decPendingSize(size int64) {
func (as *areaMemStat[A, P, T, D, H]) decPendingSize(path *pathInfo[A, P, T, D, H], size int64) {
as.totalPendingSize.Add(int64(-size))
if as.totalPendingSize.Load() < 0 {
log.Warn("Total pending size is less than 0, reset it to 0", zap.Int64("totalPendingSize", as.totalPendingSize.Load()))
as.totalPendingSize.Store(0)
}
as.updatePathPauseState(path)
as.updateAreaPauseState(path)
}

// A memControl is used to control the memory usage of the dynamic stream.
Expand Down Expand Up @@ -249,7 +251,7 @@ func (m *memControl[A, P, T, D, H]) addPathToArea(path *pathInfo[A, P, T, D, H],
// This method is called after the path is removed.
func (m *memControl[A, P, T, D, H]) removePathFromArea(path *pathInfo[A, P, T, D, H]) {
area := path.areaMemStat
area.decPendingSize(int64(path.pendingSize.Load()))
area.decPendingSize(path, int64(path.pendingSize.Load()))

m.mutex.Lock()
defer m.mutex.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion utils/dynstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (pi *pathInfo[A, P, T, D, H]) popEvent() (eventWrap[A, P, T, D, H], bool) {
pi.pendingSize.Add(uint32(-e.eventSize))

if pi.areaMemStat != nil {
pi.areaMemStat.decPendingSize(int64(e.eventSize))
pi.areaMemStat.decPendingSize(pi, int64(e.eventSize))
}
return e, true
}
Expand Down

0 comments on commit 510d0a3

Please sign in to comment.