Skip to content

Commit c5588f4

Browse files
Merge pull request #198 from jonasrichard/smaller-batch
Fix infinite loop in event batch iterator
2 parents aaae767 + d80cdd1 commit c5588f4

File tree

4 files changed

+74
-6
lines changed

4 files changed

+74
-6
lines changed

batch.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package eventhub
22

33
import (
4+
"errors"
5+
46
"github.com/Azure/azure-amqp-common-go/v3/uuid"
57
"github.com/Azure/go-amqp"
68
)
@@ -47,6 +49,9 @@ const (
4749
KeyOfNoPartitionKey = "NoPartitionKey"
4850
)
4951

52+
// ErrMessageIsTooBig represents the error when one single event in the batch is bigger than the maximum batch size
53+
var ErrMessageIsTooBig = errors.New("message is too big")
54+
5055
// BatchWithMaxSizeInBytes configures the EventBatchIterator to fill the batch to the specified max size in bytes
5156
func BatchWithMaxSizeInBytes(sizeInBytes int) BatchOption {
5257
return func(batchOption *BatchOptions) error {
@@ -106,7 +111,7 @@ func (ebi *EventBatchIterator) Next(eventID string, opts *BatchOptions) (*EventB
106111
}
107112
}
108113

109-
events := ebi.PartitionEventsMap[key]
114+
events := ebi.PartitionEventsMap[key][ebi.Cursors[key]:]
110115
eb := NewEventBatch(eventID, opts)
111116
if key != KeyOfNoPartitionKey && len(events) > 0 {
112117
eb.PartitionKey = events[0].PartitionKey
@@ -118,6 +123,11 @@ func (ebi *EventBatchIterator) Next(eventID string, opts *BatchOptions) (*EventB
118123
}
119124

120125
if !ok {
126+
if len(eb.marshaledMessages) == 0 {
127+
ebi.Cursors[key]++
128+
return nil, ErrMessageIsTooBig
129+
}
130+
121131
return eb, nil
122132
}
123133
ebi.Cursors[key]++

batch_test.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package eventhub_test
22

33
import (
4+
"strconv"
45
"testing"
56

67
"github.com/stretchr/testify/assert"
78

8-
"github.com/Azure/azure-event-hubs-go/v3"
9+
eventhub "github.com/Azure/azure-event-hubs-go/v3"
910
)
1011

1112
func TestNewEventBatch(t *testing.T) {
@@ -28,6 +29,7 @@ func TestEventBatch_AddManyMessages(t *testing.T) {
2829
ok, err := eb.Add(event)
2930
assert.True(t, ok)
3031
assert.NoError(t, err)
32+
3133
msgSize := eb.Size() - wrapperSize
3234

3335
limit := ((int(eb.MaxSize) - 100) / msgSize) - 1
@@ -52,3 +54,56 @@ func TestEventBatch_Clear(t *testing.T) {
5254
eb.Clear()
5355
assert.Equal(t, 100, eb.Size())
5456
}
57+
58+
func TestHugeBatches(t *testing.T) {
59+
data := make([]byte, 500)
60+
events := make([]*eventhub.Event, 0)
61+
62+
for i := 0; i < 100; i++ {
63+
// 100 / 4 * 50000 = 1250000 bytes per partition
64+
partitionKey := strconv.Itoa(i % 4)
65+
evt := &eventhub.Event{
66+
Data: data,
67+
PartitionKey: &partitionKey,
68+
}
69+
70+
events = append(events, evt)
71+
}
72+
73+
opts := &eventhub.BatchOptions{
74+
MaxSize: 10000,
75+
}
76+
iter := eventhub.NewEventBatchIterator(events...)
77+
iterCount := 0
78+
79+
for !iter.Done() {
80+
_, err := iter.Next("batchId", opts)
81+
assert.NoError(t, err)
82+
83+
iterCount++
84+
85+
if iterCount > 101 {
86+
assert.Fail(t, "Too much iteration")
87+
}
88+
}
89+
90+
assert.Greater(t, iterCount, 5)
91+
}
92+
93+
func TestOneHugeEvent(t *testing.T) {
94+
data := make([]byte, 1100)
95+
events := []*eventhub.Event{
96+
{
97+
Data: data,
98+
},
99+
}
100+
opts := &eventhub.BatchOptions{
101+
MaxSize: 1000,
102+
}
103+
iter := eventhub.NewEventBatchIterator(events...)
104+
105+
for !iter.Done() {
106+
_, err := iter.Next("batchId", opts)
107+
assert.Equal(t, err, eventhub.ErrMessageIsTooBig)
108+
}
109+
}

changelog.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Change Log
22

3+
## `v3.3.3`
4+
- EventBatchIterator drops messages which bigger than 1MB with an error
5+
36
## `v3.3.2`
47
- passing a context to internal calls that use go-amqp that now expect a context
58
- updating dependencies in go.mod
@@ -50,7 +53,7 @@
5053
- cleanup connection after making management request
5154

5255
## `v1.3.0`
53-
- add `SystemProperties` to `Event` which contains immutable broker provided metadata (squence number, offset,
56+
- add `SystemProperties` to `Event` which contains immutable broker provided metadata (squence number, offset,
5457
enqueued time)
5558

5659
## `v1.2.0`
@@ -63,11 +66,11 @@
6366
- update to amqp 0.11.0 and change sender to use unsettled rather than receiver second mode
6467

6568
## `v1.1.3`
66-
- fix leak in partition persistence
69+
- fix leak in partition persistence
6770
- fix discarding event properties on batch sending
6871

6972
## `v1.1.2`
70-
- take dep on updated amqp common which has more permissive RPC status description parsing
73+
- take dep on updated amqp common which has more permissive RPC status description parsing
7174

7275
## `v1.1.1`
7376
- close sender when hub is closed

version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ package eventhub
22

33
const (
44
// Version is the semantic version number
5-
Version = "3.3.1"
5+
Version = "3.3.3"
66
)

0 commit comments

Comments
 (0)