-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscribe.go
69 lines (59 loc) · 2.35 KB
/
subscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package ensign
import (
"context"
api "github.com/rotationalio/go-ensign/api/v1beta1"
"github.com/rotationalio/go-ensign/stream"
"google.golang.org/grpc"
)
// A Subscription object with a channel of events is returned when you subscribe to a
// topic or topics. Listen on the provided channel in order to receive events from
// Ensign when they are published to your consumer group. It is the user's
// responsibility to Ack and Nack events when they are handled by using the methods on
// the event itself.
type Subscription struct {
C <-chan *Event
events <-chan *api.EventWrapper
stream *stream.Subscriber
}
// Subscribe creates a subscription stream to the specified topics and returns a
// Subscription with a channel that can be listened on for incoming events. If the
// client cannot connect to Ensign or a subscription stream cannot be established, an
// error is returned.
func (c *Client) Subscribe(topics ...string) (sub *Subscription, err error) {
// Create the internal subscription stream
sub = &Subscription{}
if sub.events, sub.stream, err = stream.NewSubscriber(c, topics, c.copts...); err != nil {
return nil, err
}
// Create the user events channel
out := make(chan *Event, 1)
sub.C = out
// Run the subscription background go routine
go sub.eventHandler(out)
return sub, nil
}
// Close the subscription stream and associated channels, preventing any more events
// from being received and signaling to handler code that no more events will arrive.
func (c *Subscription) Close() error {
return c.stream.Close()
}
func (c *Subscription) eventHandler(out chan<- *Event) {
for wrapper := range c.events {
// Convert the event into an API event
event := &Event{}
if err := event.fromPB(wrapper, subscription); err != nil {
// TODO: what to do about the error?
panic(err)
}
// Attach the stream to send acks/nacks back
event.sub = c.stream
out <- event
}
}
// SubscribeStream allows you to open a gRPC stream server to ensign for subscribing to
// API events directly. This manual mechanism of opening a stream is for advanced users
// and is not recommended in production. Instead using Subscribe or CreateSubscriber is
// the best way to establish a stream connection to Ensign.
func (c *Client) SubscribeStream(ctx context.Context, opts ...grpc.CallOption) (api.Ensign_SubscribeClient, error) {
return c.api.Subscribe(ctx, opts...)
}