-
Notifications
You must be signed in to change notification settings - Fork 8
/
client.go
191 lines (150 loc) · 3.97 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package zerorpc
import (
"errors"
"log"
)
// ZeroRPC client representation,
// it holds a pointer to the ZeroMQ socket
type Client struct {
socket *socket
}
// Connects to a ZeroRPC endpoint and returns a pointer to the new client
func NewClient(endpoint string) (*Client, error) {
s, err := connect(endpoint)
if err != nil {
return nil, err
}
c := Client{
socket: s,
}
return &c, nil
}
// Closes the ZeroMQ socket
func (c *Client) Close() error {
return c.socket.close()
}
/*
Invokes a ZeroRPC method,
name is the method name,
args are the method arguments
it returns the ZeroRPC response event on success
if the ZeroRPC server raised an exception,
it's name is returned as the err string along with the response event,
the additional exception text and traceback can be found in the response event args
it returns ErrLostRemote if the channel misses 2 heartbeat events,
default is 10 seconds
Usage example:
package main
import (
"fmt"
"github.com/bsphere/zerorpc"
)
func main() {
c, err := zerorpc.NewClient("tcp://0.0.0.0:4242")
if err != nil {
panic(err)
}
defer c.Close()
response, err := c.Invoke("hello", "John")
if err != nil {
panic(err)
}
fmt.Println(response)
}
It also supports first class exceptions, in case of an exception,
the error returned from Invoke() or InvokeStream() is the exception name
and the args of the returned event are the exception description and traceback.
The client sends heartbeat events every 5 seconds, if twp heartbeat events are missed,
the remote is considered as lost and an ErrLostRemote is returned.
*/
func (c *Client) Invoke(name string, args ...interface{}) (*Event, error) {
log.Printf("ZeroRPC client invoked %s with args %s", name, args)
ev, err := newEvent(name, args)
if err != nil {
return nil, err
}
ch := c.socket.newChannel("")
defer ch.close()
err = ch.sendEvent(ev)
if err != nil {
return nil, err
}
for {
select {
case response := <-ch.channelOutput:
if response.Name == "ERR" {
return response, errors.New(response.Args[0].(string))
} else {
return response, nil
}
case err := <-ch.channelErrors:
return nil, err
}
}
}
/*
Invokes a streaming ZeroRPC method,
name is the method name,
args are the method arguments
it returns an array of ZeroRPC response events on success
if the ZeroRPC server raised an exception,
it's name is returned as the err string along with the response event,
the additional exception text and traceback can be found in the response event args
it returns ErrLostRemote if the channel misses 2 heartbeat events,
default is 10 seconds
Usage example:
package main
import (
"fmt"
"github.com/bsphere/zerorpc"
)
func main() {
c, err := zerorpc.NewClient("tcp://0.0.0.0:4242")
if err != nil {
panic(err)
}
defer c.Close()
response, err := c.InvokeStream("streaming_range", 10, 20, 2)
if err != nil {
fmt.Println(err.Error())
}
for _, r := range response {
fmt.Println(r)
}
}
It also supports first class exceptions, in case of an exception,
the error returned from Invoke() or InvokeStream() is the exception name
and the args of the returned event are the exception description and traceback.
The client sends heartbeat events every 5 seconds, if twp heartbeat events are missed,
the remote is considered as lost and an ErrLostRemote is returned.
*/
func (c *Client) InvokeStream(name string, args ...interface{}) (chan *Event, error) {
log.Printf("ZeroRPC client invoked %s with args %s in streaming mode", name, args)
ev, err := newEvent(name, args)
if err != nil {
return nil, err
}
ch := c.socket.newChannel("")
err = ch.sendEvent(ev)
if err != nil {
return nil, err
}
out := make(chan *Event)
go func(out chan *Event, ch *channel) {
defer close(out)
defer ch.close()
for {
select {
case response := <-ch.channelOutput:
out <- response
if response.Name != "STREAM" {
return
}
case _ = <-ch.channelErrors:
return
}
}
return
}(out, ch)
return out, nil
}