Skip to content

Commit

Permalink
Merge pull request #11730 from vegaprotocol/candle-thing
Browse files Browse the repository at this point in the history
fix: break out of correct loop when slow subscribers
  • Loading branch information
jeremyletang authored Oct 7, 2024
2 parents ac9993d + 8abe15c commit ba1d73a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 3 deletions.
6 changes: 3 additions & 3 deletions datanode/candlesv2/candle_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func (s *CandleUpdates) run(ctx context.Context) {
if len(candles) > 0 {
lastCandle = &candles[len(candles)-1]
}

subscriptions = s.sendCandlesToSubscribers(candles, subscriptions)
}
}
Expand Down Expand Up @@ -224,12 +223,13 @@ func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entiti
func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) map[string]chan entities.Candle {
ret := subscriptions
for subscriptionID, outCh := range subscriptions {
loop:
for _, candle := range candles {
select {
case outCh <- candle:
default:
ret = removeSubscription(subscriptions, subscriptionID)
break
ret = removeSubscription(ret, subscriptionID)
break loop
}
}
}
Expand Down
31 changes: 31 additions & 0 deletions datanode/candlesv2/candle_updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,37 @@ func TestSubscribeAndUnSubscribeWithNonReturningSource(t *testing.T) {
updates.Unsubscribe(subID2)
}

func TestMultipleSlowConsumers(t *testing.T) {
nSends := 100
testCandleSource := &testCandleSource{candles: make(chan []entities.Candle, nSends), errorCh: make(chan error)}
// ensure the sub channels are buffered
updates := candlesv2.NewCandleUpdates(context.Background(), logging.NewTestLogger(), "testCandles",
testCandleSource, newTestCandleConfig(5).CandleUpdates)
startTime := time.Now()

updated := startTime
firstCandle := createCandle(startTime, updated, 1, 1, 1, 1, 10, 200)
lastCandle := firstCandle // just for the sake of types

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
testCandleSource.candles <- []entities.Candle{firstCandle}

// keep updating the most recent candle
for i := 0; i < nSends; i++ {
updated = updated.Add(time.Second * time.Duration(i))
lastCandle = createCandle(startTime, updated, 1, 1, 1, 1, 10, 200)
testCandleSource.candles <- []entities.Candle{lastCandle, lastCandle, lastCandle}
}
}()
// ensure the first candle is sent
_, _, _ = updates.Subscribe()
_, _, _ = updates.Subscribe()
wg.Wait()
}

func newTestCandleConfig(subscribeBufferSize int) candlesv2.Config {
conf := candlesv2.NewDefaultConfig()
conf.CandleUpdates = candlesv2.CandleUpdatesConfig{
Expand Down

0 comments on commit ba1d73a

Please sign in to comment.