From b74356c6087009a47cef7b6bc2a7ce9cec88f279 Mon Sep 17 00:00:00 2001 From: jonathan Date: Tue, 16 May 2023 09:43:09 +0200 Subject: [PATCH] ensure consumerInfo request has timeout --- capture.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/capture.go b/capture.go index 85ae33c..00aab8a 100644 --- a/capture.go +++ b/capture.go @@ -92,7 +92,7 @@ func (c *Capture[P, K]) Run(ctx context.Context, nc *nats.Conn) (err error) { return err } - cinfo, err := c.js.ConsumerInfo(c.opts.NATSStreamName, c.opts.NATSConsumerName) + cinfo, err := c.consumerInfo(ctx) if err != nil { return err } @@ -151,7 +151,7 @@ func (c *Capture[P, K]) Run(ctx context.Context, nc *nats.Conn) (err error) { // we can't distinguish between a timeout due to no messages available, and // timeout due to max pending reached. so we explicitly query and see if we are close. - ci, err := c.js.ConsumerInfo(c.opts.NATSStreamName, c.opts.NATSConsumerName, nats.Context(ctx)) + ci, err := c.consumerInfo(ctx) if err != nil { return err } @@ -171,6 +171,13 @@ func (c *Capture[P, K]) Run(ctx context.Context, nc *nats.Conn) (err error) { } } +func (c *Capture[P, K]) consumerInfo(ctx context.Context) (*nats.ConsumerInfo, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + + return c.js.ConsumerInfo(c.opts.NATSStreamName, c.opts.NATSConsumerName, nats.Context(ctx)) +} + func (c *Capture[P, K]) sweepBlocks(ctx context.Context, forceFlush bool) { for dk, v := range c.blocks { var keep []*dataBlock[P] @@ -193,7 +200,7 @@ func (c *Capture[P, K]) sweepBlocks(ctx context.Context, forceFlush bool) { func (c *Capture[P, K]) finalizeBlock(ctx context.Context, block *dataBlock[P], dk K) error { defer func() { - block.buffer.Remove() + _ = block.buffer.Remove() }() if err := block.close(); err != nil { @@ -242,7 +249,7 @@ func (c *Capture[P, K]) fileSuffix() string { func (c *Capture[P, K]) debugPrint(prefix string) { if false { - cinfo, _ := c.js.ConsumerInfo(c.opts.NATSStreamName, c.opts.NATSConsumerName) + cinfo, _ := c.consumerInfo(context.Background()) log.Debugf("%s: NumAckPending=%d NumPending=%d", prefix, cinfo.NumAckPending, cinfo.NumPending) } }