Skip to content

Commit

Permalink
ensure consumerInfo request has timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
kung-foo committed May 16, 2023
1 parent 520adac commit b74356c
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *Capture[P, K]) Run(ctx context.Context, nc *nats.Conn) (err error) {
return err
}

cinfo, err := c.js.ConsumerInfo(c.opts.NATSStreamName, c.opts.NATSConsumerName)
cinfo, err := c.consumerInfo(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func (c *Capture[P, K]) Run(ctx context.Context, nc *nats.Conn) (err error) {

// we can't distinguish between a timeout due to no messages available, and
// timeout due to max pending reached. so we explicitly query and see if we are close.
ci, err := c.js.ConsumerInfo(c.opts.NATSStreamName, c.opts.NATSConsumerName, nats.Context(ctx))
ci, err := c.consumerInfo(ctx)
if err != nil {
return err
}
Expand All @@ -171,6 +171,13 @@ func (c *Capture[P, K]) Run(ctx context.Context, nc *nats.Conn) (err error) {
}
}

func (c *Capture[P, K]) consumerInfo(ctx context.Context) (*nats.ConsumerInfo, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

return c.js.ConsumerInfo(c.opts.NATSStreamName, c.opts.NATSConsumerName, nats.Context(ctx))
}

func (c *Capture[P, K]) sweepBlocks(ctx context.Context, forceFlush bool) {
for dk, v := range c.blocks {
var keep []*dataBlock[P]
Expand All @@ -193,7 +200,7 @@ func (c *Capture[P, K]) sweepBlocks(ctx context.Context, forceFlush bool) {

func (c *Capture[P, K]) finalizeBlock(ctx context.Context, block *dataBlock[P], dk K) error {
defer func() {
block.buffer.Remove()
_ = block.buffer.Remove()
}()

if err := block.close(); err != nil {
Expand Down Expand Up @@ -242,7 +249,7 @@ func (c *Capture[P, K]) fileSuffix() string {

func (c *Capture[P, K]) debugPrint(prefix string) {
if false {
cinfo, _ := c.js.ConsumerInfo(c.opts.NATSStreamName, c.opts.NATSConsumerName)
cinfo, _ := c.consumerInfo(context.Background())
log.Debugf("%s: NumAckPending=%d NumPending=%d", prefix, cinfo.NumAckPending, cinfo.NumPending)
}
}
Expand Down

0 comments on commit b74356c

Please sign in to comment.