Skip to content

Commit

Permalink
fix integration tests after updating to messenger v1.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
RiiD authored and tomcyr-promo committed May 29, 2023
1 parent d9f3a1d commit faeea8c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 22 deletions.
43 changes: 21 additions & 22 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ func TestIntegration(t *testing.T) {
t.Fatal("failed to prepare rmq: ", err)
}

var senderErr, receiverErr error

wg := sync.WaitGroup{}
wg.Add(2)
wg.Add(1)
go func() {
senderErr = startSender(ctx, messagesToSend)
wg.Done()
defer wg.Done()
_ = startSender(ctx, messagesToSend)
}()

r := make(chan messenger.Envelope)
Expand All @@ -61,9 +59,10 @@ func TestIntegration(t *testing.T) {
close(r)
}()

wg.Add(1)
go func() {
receiverErr = startReceiver(ctx, "receiver.messages", r)
wg.Done()
defer wg.Done()
_ = startReceiver(ctx, "receiver.messages", r)
}()

receivedMessages := make([]messenger.Envelope, numberOfMessages)
Expand Down Expand Up @@ -91,9 +90,6 @@ func TestIntegration(t *testing.T) {
assert.Equal(t, toSend.Message(), received.Message())
assert.Equal(t, toSend.Headers(), received.Headers())
}

assert.Same(t, context.Canceled, senderErr)
assert.Same(t, context.Canceled, receiverErr)
}

func TestIntegration_header_types(t *testing.T) {
Expand Down Expand Up @@ -282,28 +278,30 @@ func startReceiver(ctx context.Context, q string, received chan messenger.Envelo
b := bus.New(middleware.Stack(
middleware.HandleFunc(func(ctx context.Context, b messenger.Dispatcher, e messenger.Envelope) messenger.Envelope {
received <- e
return e
return envelope.WithAck(e)
}),
middleware.Ack(r, r),
), 32, 4)

br := bridge.New(r, b)

wg := sync.WaitGroup{}
wg.Add(2)
defer wg.Wait()

wg.Add(1)
go func() {
defer wg.Done()
_ = br.Run(ctx)
log.Println("receiver bridge done")
wg.Done()
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = b.Run(ctx)
log.Println("receiver bus done")
wg.Done()
}()

wg.Wait()

return ctx.Err()
}

Expand Down Expand Up @@ -334,25 +332,26 @@ func startSender(ctx context.Context, messagesToSend []messenger.Envelope) error
b := bus.New(middleware.Match(
matcher.Type([]byte{}),
middleware.Send(s),
), 32, 4)
), len(messagesToSend)+1, 4)

wg := sync.WaitGroup{}
wg.Add(2)
defer wg.Wait()

wg.Add(1)
go func() {
defer wg.Done()
for _, e := range messagesToSend {
b.Dispatch(ctx, e)
}
log.Println("sent all messages")
wg.Done()
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = b.Run(ctx)
log.Println("sender bus done")
wg.Done()
}()

wg.Wait()

return ctx.Err()
}
Binary file added oryxBuildBinary
Binary file not shown.

0 comments on commit faeea8c

Please sign in to comment.