This repository has been archived by the owner on Oct 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_test.go
86 lines (73 loc) · 2.3 KB
/
client_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package kafkatest_test
import (
"testing"
"time"
"github.com/Shopify/sarama"
qt "github.com/frankban/quicktest"
"github.com/heetch/kafkatest"
)
func TestNew(t *testing.T) {
c := qt.New(t)
k, err := kafkatest.New()
c.Assert(err, qt.Equals, nil)
// Produce a message to a new topic.
cfg := k.Config()
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
producer, err := sarama.NewSyncProducer(k.Addrs(), cfg)
c.Assert(err, qt.Equals, nil)
defer producer.Close()
topic := k.NewTopic()
// Check that the topic has actually been created.
admin, err := sarama.NewClusterAdmin(k.Addrs(), cfg)
c.Assert(err, qt.Equals, nil)
defer admin.Close()
topics, err := admin.ListTopics()
c.Assert(err, qt.Equals, nil)
_, ok := topics[topic]
c.Assert(ok, qt.Equals, true)
// Produce a message to the topic.
_, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("key"),
Value: sarama.StringEncoder("value"),
})
c.Assert(err, qt.Equals, nil)
c.Assert(offset, qt.Equals, int64(0))
// Check that we can consume the message we just produced.
consumer, err := sarama.NewConsumer(k.Addrs(), cfg)
c.Assert(err, qt.Equals, nil)
defer consumer.Close()
pconsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
c.Assert(err, qt.Equals, nil)
defer pconsumer.Close()
select {
case m := <-pconsumer.Messages():
c.Check(string(m.Key), qt.Equals, "key")
c.Check(string(m.Value), qt.Equals, "value")
case <-time.After(10 * time.Second):
c.Fatal("timed out waiting for message")
}
// Close the Kafka instance and check that the
// new topic has been removed.
err = k.Close()
c.Assert(err, qt.Equals, nil)
topics, err = admin.ListTopics()
c.Assert(err, qt.Equals, nil)
_, ok = topics[topic]
c.Assert(ok, qt.Equals, false)
// Check we can call Close again.
err = k.Close()
c.Assert(err, qt.Equals, nil)
}
func TestDisabled(t *testing.T) {
c := qt.New(t)
c.Setenv("KAFKA_DISABLE", "1")
k, err := kafkatest.New()
c.Assert(err, qt.Equals, kafkatest.ErrDisabled)
c.Assert(k, qt.IsNil)
c.Setenv("KAFKA_DISABLE", "bad")
k, err = kafkatest.New()
c.Assert(err, qt.ErrorMatches, `bad value for \$KAFKA_DISABLE: invalid boolean value "bad" \(possible values are: 1, t, T, TRUE, true, True, 0, f, F, FALSE\)`)
c.Assert(k, qt.IsNil)
}