Skip to content

Commit

Permalink
Added Requester and async ack support. (#248)
Browse files Browse the repository at this point in the history
Requester supports request/response pattern.

Async ack is provided by WithFinish() - Sender does async send by
default (depends on QoS and binding), WithFinish() allows the caller
to be notified when Finish() is called.

Also added ChanSender/ChanReceiver - useful for testing so might as well make
them exported.

Fixes #227

Signed-off-by: Alan Conway <aconway@redhat.com>
  • Loading branch information
alanconway authored and n3wscott committed Oct 21, 2019
1 parent b0ed337 commit 6389a12
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 18 deletions.
52 changes: 52 additions & 0 deletions pkg/binding/chan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package binding

import (
"context"
"errors"
"io"
)

// ChanSender implements Sender by sending on a channel.
type ChanSender chan<- Message

// ChanReceiver implements Receiver by receiving from a channel.
type ChanReceiver <-chan Message

func (s ChanSender) Send(ctx context.Context, m Message) (err error) {
defer func() {
err2 := m.Finish(err)
if err == nil {
err = err2
}
}()
select {
case <-ctx.Done():
return ctx.Err()
case s <- m:
return nil
}
}

func (s ChanSender) Close(ctx context.Context) (err error) {
defer func() {
if recover() != nil {
err = errors.New("send on closed channel")
}
}()
close(s)
return nil
}

func (r ChanReceiver) Receive(ctx context.Context) (Message, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case m, ok := <-r:
if !ok {
return nil, io.EOF
}
return m, nil
}
}

func (r ChanReceiver) Close(ctx context.Context) error { return nil }
13 changes: 8 additions & 5 deletions pkg/binding/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
Package binding defines interfaces for protocol bindings.
NOTE: Most applications that emit or consume events can use the client
package. This package is for implementing new protocol bindings and
intermediaries; processes that forward events between protocols, rather than
emitting or consuming events themselves.
NOTE: Most applications that emit or consume events should use the ../client
package, which provides a simpler API to the underlying binding.
The interfaces in this package provide extra encoding and protocol information
to allow efficient forwarding and end-to-end reliable delivery between a
Receiver and a Sender belonging to different bindings. This is useful for
intermediary applications that route or forward events, but not necessary for
most "endpoint" applications that emit or consume events.
Protocol Bindings
Expand Down
19 changes: 18 additions & 1 deletion pkg/binding/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,31 @@ type Receiver interface {

// Sender sends messages.
type Sender interface {
// Send blocks till the message is sent or ctx expires.
// Send a message.
//
// Send returns when the "outbound" message has been sent. The Sender may
// still be expecting acknowledgment or holding other state for the message.
//
// m.Finish() is called when sending is finished: expected acknowledgments (or
// errors) have been received, the Sender is no longer holding any state for
// the message. m.Finish() may be called during or after Send().
//
// To support optimized forwading of structured-mode messages, Send()
// should use the encoding returned by m.Structured() if there is one.
// Otherwise m.Event() can be encoded as per the binding's rules.
Send(ctx context.Context, m Message) error
}

// Requester sends a message and receives a response
//
// Optional interface that may be implemented by protocols that support
// request/response correlation.
type Requester interface {
// Request sends m like Sender.Send() but also arranges to receive a response.
// The returned Receiver is used to receive the response.
Request(ctx context.Context, m Message) (Receiver, error)
}

// Closer is the common interface for things that can be closed
type Closer interface {
Close(ctx context.Context) error
Expand Down
20 changes: 20 additions & 0 deletions pkg/binding/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,23 @@ func Structured(m Message, f format.Format) ([]byte, error) {
}
return f.Marshal(e)
}

type finishMessage struct {
Message
finish func(error)
}

func (m finishMessage) Finish(err error) error {
err2 := m.Message.Finish(err) // Finish original message first
if m.finish != nil {
m.finish(err) // Notify callback
}
return err2
}

// WithFinish returns a wrapper for m that calls finish() and
// m.Finish() in its Finish().
// Allows code to be notified when a message is Finished.
func WithFinish(m Message, finish func(error)) Message {
return finishMessage{Message: m, finish: finish}
}
32 changes: 20 additions & 12 deletions pkg/binding/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,9 @@ import (
"github.com/stretchr/testify/assert"
)

type chanSR chan binding.Message

func (ch chanSR) Send(_ context.Context, m binding.Message) error { ch <- m; return nil }
func (ch chanSR) Receive(_ context.Context) (binding.Message, error) { return <-ch, nil }

func TestVersionSender(t *testing.T) {
ch := make(chanSR, 1)
s := binding.VersionSender(ch, spec.V01)
ch := make(chan binding.Message, 1)
s := binding.VersionSender(binding.ChanSender(ch), spec.V01)
want := testEvent
want.Context = want.Context.AsV01()
assert.Equal(t, "1.0", testEvent.SpecVersion())
Expand All @@ -35,8 +30,8 @@ func TestVersionSender(t *testing.T) {
}

func TestStructSender(t *testing.T) {
ch := make(chanSR, 1)
s := binding.StructSender(ch, format.JSON)
ch := make(chan binding.Message, 1)
s := binding.StructSender(binding.ChanSender(ch), format.JSON)

_ = s.Send(context.Background(), binding.EventMessage(testEvent))
f, b := (<-ch).Structured()
Expand All @@ -50,8 +45,8 @@ func TestStructSender(t *testing.T) {
}

func TestBinarySender(t *testing.T) {
ch := make(chanSR, 1)
s := binding.BinarySender(ch)
ch := make(chan binding.Message, 1)
s := binding.BinarySender(binding.ChanSender(ch))

sm := &binding.StructMessage{Format: format.JSON.MediaType(), Bytes: []byte(testJSON)}
_ = s.Send(context.Background(), sm)
Expand All @@ -69,4 +64,17 @@ func TestBinarySender(t *testing.T) {
assert.Equal(t, em, m.(binding.EventMessage)) // Already structured, same message.
}

// FIXME(alanconway) verify callback to original message Finish.
func TestWithFinish(t *testing.T) {
done := make(chan error, 1)
m := binding.WithFinish(binding.EventMessage(testEvent), func(err error) {
done <- err
})
select {
case <-done:
assert.Fail(t, "done early")
default:
}
ch := make(chan binding.Message, 1)
assert.NoError(t, binding.ChanSender(ch).Send(context.Background(), m))
assert.NoError(t, <-done)
}

0 comments on commit 6389a12

Please sign in to comment.