-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathkafka_producer_test.go
101 lines (82 loc) · 2.34 KB
/
kafka_producer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package consumer
import (
"bytes"
"context"
"sync"
"testing"
"time"
"github.com/go-test/deep"
"github.com/inviqa/kafka-consumer-go/data/failure/model"
"github.com/inviqa/kafka-consumer-go/log"
"github.com/inviqa/kafka-consumer-go/test/saramatest"
)
func TestNewKafkaFailureProducer(t *testing.T) {
deep.CompareUnexportedFields = true
defer func() {
deep.CompareUnexportedFields = false
}()
sp := saramatest.NewMockSyncProducer()
fch := make(<-chan model.Failure)
logger := log.NullLogger{}
exp := &kafkaFailureProducer{
producer: sp,
fch: fch,
logger: logger,
}
got := newKafkaFailureProducer(sp, fch, logger)
if diff := deep.Equal(exp, got); diff != nil {
t.Error(diff)
}
}
func TestNewFailureProducer_WithNilLogger(t *testing.T) {
if newKafkaFailureProducer(saramatest.NewMockSyncProducer(), make(<-chan model.Failure), nil) == nil {
t.Errorf("expected a producer but got nil")
}
}
func TestFailureProducer_ListenForFailures(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
sp := saramatest.NewMockSyncProducer()
fch := make(chan model.Failure, 10)
prod := newKafkaFailureProducer(sp, fch, log.NullLogger{})
prod.listenForFailures(ctx, &sync.WaitGroup{})
msg1 := []byte("hello")
msg2 := []byte("world")
f1 := model.Failure{
Reason: "something bad happened",
Message: msg1,
NextTopic: "test",
}
f2 := model.Failure{
Reason: "something bad happened",
Message: msg2,
NextTopic: "test2",
}
fch <- f1
fch <- f2
<-time.After(time.Millisecond * 5)
cancel()
act1 := sp.GetLastMessageReceived("test")
act2 := sp.GetLastMessageReceived("test2")
if bytes.Compare(msg1, act1) != 0 {
t.Errorf("expected '%s' to be published, but got '%s'", msg1, act1)
}
if bytes.Compare(msg2, act2) != 0 {
t.Errorf("expected '%s' to be published, but got '%s'", msg2, act2)
}
}
func TestFailureProducer_ListenForFailuresWithProducerError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
sp := saramatest.NewMockSyncProducer()
sp.ReturnErrorOnSend()
fch := make(chan model.Failure, 10)
prod := newKafkaFailureProducer(sp, fch, log.NullLogger{})
prod.listenForFailures(ctx, &sync.WaitGroup{})
failure := model.Failure{
Reason: "something else happened",
Message: []byte{},
NextTopic: "foo",
}
fch <- failure
<-time.After(time.Millisecond * 5)
cancel()
}