-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathping_pong_test.go
121 lines (106 loc) · 3.24 KB
/
ping_pong_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package main
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.gazette.dev/core/broker/client"
pb "go.gazette.dev/core/broker/protocol"
"go.gazette.dev/core/brokertest"
pc "go.gazette.dev/core/consumer/protocol"
"go.gazette.dev/core/consumertest"
"go.gazette.dev/core/etcdtest"
"go.gazette.dev/core/labels"
"go.gazette.dev/core/mainboilerplate/runconsumer"
"go.gazette.dev/core/message"
)
// This function will be invoked automatically by `go test`. It starts up an
// etcd process that will be used by the tests.
func TestMain(m *testing.M) {
etcdtest.TestMainWithEtcd(m)
}
func TestPingPong(t *testing.T) {
var etcd = etcdtest.TestClient()
defer etcdtest.Cleanup()
var testJournals, testShards = buildSpecFixtures(4)
// Start a broker & create journal fixtures.
var broker = brokertest.NewBroker(t, etcd, "local", "broker")
var rjc = pb.NewRoutedJournalClient(broker.Client(), pb.NoopDispatchRouter{})
brokertest.CreateJournals(t, broker, testJournals...)
var app = new(App)
var cfg = app.NewConfig()
cfg.(*config).PingPong.Players = 100
cfg.(*config).PingPong.Period = 0
// Start and serve a consumer, and create shard fixtures.
var cmr = consumertest.NewConsumer(consumertest.Args{
C: t,
Etcd: etcd,
Journals: rjc,
App: app,
})
cmr.Tasks.GoRun()
assert.NoError(t, app.InitApplication(
runconsumer.InitArgs{
Context: context.Background(),
Config: cfg,
Server: cmr.Server,
Service: cmr.Service,
}))
consumertest.CreateShards(t, cmr, testShards...)
// Start one ping-pong game.
var as = client.NewAppendService(context.Background(), broker.Client())
startOneGame(app.mapping, message.NewPublisher(as, nil), *cfg.(*config))
// Read one of the partitions, and expect to see an ongoing stream of volleys.
var it = message.NewReadCommittedIter(
client.NewRetryReader(context.Background(), broker.Client(),
pb.ReadRequest{Journal: testJournals[0].Name, Block: true},
),
func(spec *pb.JournalSpec) (i message.Message, e error) {
return new(Volley), nil
},
message.NewSequencer(nil, 512),
)
for i := 0; i != 10; i++ {
var env, err = it.Next()
assert.NoError(t, err)
_ = env.Message.(*Volley)
t.Log(env.Message.(*Volley))
}
// Shutdown.
cmr.Tasks.Cancel()
assert.NoError(t, cmr.Tasks.Wait())
broker.Tasks.Cancel()
assert.NoError(t, broker.Tasks.Wait())
}
func buildSpecFixtures(parts int) (journals []*pb.JournalSpec, shards []*pc.ShardSpec) {
for p := 0; p != parts; p++ {
var (
part = fmt.Sprintf("%02d", p)
shard = &pc.ShardSpec{
Id: pc.ShardID("part-" + part),
Sources: []pc.ShardSpec_Source{
{Journal: pb.Journal("volleys/part=" + part)},
},
RecoveryLogPrefix: "recovery/logs",
HintPrefix: "/gazette/hints",
MaxTxnDuration: time.Second,
}
)
journals = append(journals,
brokertest.Journal(pb.JournalSpec{
Name: shard.Sources[0].Journal,
LabelSet: pb.MustLabelSet(
labels.MessageType, "ping_pong.Volley",
labels.ContentType, labels.ContentType_JSONLines,
),
}),
brokertest.Journal(pb.JournalSpec{
Name: shard.RecoveryLog(),
LabelSet: pb.MustLabelSet(labels.ContentType, labels.ContentType_RecoveryLog),
}),
)
shards = append(shards, shard)
}
return
}