forked from tarantool/go-tarantool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
watch.go
147 lines (125 loc) · 3.94 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package tarantool
import (
"context"
"io"
"github.com/tarantool/go-iproto"
"github.com/vmihailenco/msgpack/v5"
)
// BroadcastRequest helps to send broadcast messages. See:
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/broadcast/
type BroadcastRequest struct {
call *CallRequest
key string
}
// NewBroadcastRequest returns a new broadcast request for a specified key.
func NewBroadcastRequest(key string) *BroadcastRequest {
req := new(BroadcastRequest)
req.key = key
req.call = NewCallRequest("box.broadcast").Args([]interface{}{key})
return req
}
// Value sets the value for the broadcast request.
// Note: default value is nil.
func (req *BroadcastRequest) Value(value interface{}) *BroadcastRequest {
req.call = req.call.Args([]interface{}{req.key, value})
return req
}
// Context sets a passed context to the broadcast request.
func (req *BroadcastRequest) Context(ctx context.Context) *BroadcastRequest {
req.call = req.call.Context(ctx)
return req
}
// Code returns IPROTO code for the broadcast request.
func (req *BroadcastRequest) Type() iproto.Type {
return req.call.Type()
}
// Body fills an msgpack.Encoder with the broadcast request body.
func (req *BroadcastRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
return req.call.Body(res, enc)
}
// Ctx returns a context of the broadcast request.
func (req *BroadcastRequest) Ctx() context.Context {
return req.call.Ctx()
}
// Async returns is the broadcast request expects a response.
func (req *BroadcastRequest) Async() bool {
return req.call.Async()
}
// Response creates a response for a BroadcastRequest.
func (req *BroadcastRequest) Response(header Header, body io.Reader) (Response, error) {
return DecodeBaseResponse(header, body)
}
// watchRequest subscribes to the updates of a specified key defined on the
// server. After receiving the notification, you should send a new
// watchRequest to acknowledge the notification.
type watchRequest struct {
baseRequest
key string
ctx context.Context
}
// newWatchRequest returns a new watchRequest.
func newWatchRequest(key string) *watchRequest {
req := new(watchRequest)
req.rtype = iproto.IPROTO_WATCH
req.async = true
req.key = key
return req
}
// Body fills an msgpack.Encoder with the watch request body.
func (req *watchRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
if err := enc.EncodeMapLen(1); err != nil {
return err
}
if err := enc.EncodeUint(uint64(iproto.IPROTO_EVENT_KEY)); err != nil {
return err
}
return enc.EncodeString(req.key)
}
// Context sets a passed context to the request.
func (req *watchRequest) Context(ctx context.Context) *watchRequest {
req.ctx = ctx
return req
}
// unwatchRequest unregisters a watcher subscribed to the given notification
// key.
type unwatchRequest struct {
baseRequest
key string
ctx context.Context
}
// newUnwatchRequest returns a new unwatchRequest.
func newUnwatchRequest(key string) *unwatchRequest {
req := new(unwatchRequest)
req.rtype = iproto.IPROTO_UNWATCH
req.async = true
req.key = key
return req
}
// Body fills an msgpack.Encoder with the unwatch request body.
func (req *unwatchRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
if err := enc.EncodeMapLen(1); err != nil {
return err
}
if err := enc.EncodeUint(uint64(iproto.IPROTO_EVENT_KEY)); err != nil {
return err
}
return enc.EncodeString(req.key)
}
// Context sets a passed context to the request.
func (req *unwatchRequest) Context(ctx context.Context) *unwatchRequest {
req.ctx = ctx
return req
}
// WatchEvent is a watch notification event received from a server.
type WatchEvent struct {
Conn *Connection // A source connection.
Key string // A key.
Value interface{} // A value.
}
// Watcher is a subscription to broadcast events.
type Watcher interface {
// Unregister unregisters the watcher.
Unregister()
}
// WatchCallback is a callback to invoke when the key value is updated.
type WatchCallback func(event WatchEvent)