-
Notifications
You must be signed in to change notification settings - Fork 1
/
rpc_client.go
119 lines (113 loc) · 3.29 KB
/
rpc_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package synapse
import (
"github.com/bitly/go-simplejson"
"github.com/streadway/amqp"
"time"
"fmt"
"encoding/json"
)
/**
绑定RPC Callback监听队列
*/
func (s *Server) clientQueue() {
s.rpcClientChannel = s.CreateChannel(0, "RpcClient")
q, err := s.rpcClientChannel.QueueDeclare(
fmt.Sprintf("%s_%s_client_%s", s.SysName, s.AppName, s.AppId),
true, // durable
true, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
Log(fmt.Sprintf("Failed to declare rpcQueue: %s", err), LogError)
}
err = s.rpcClientChannel.QueueBind(
q.Name,
fmt.Sprintf("client.%s.%s", s.AppName, s.AppId),
s.SysName,
false,
nil)
if err != nil {
Log(fmt.Sprintf("Failed to Bind Rpc Exchange and Queue: %s", err), LogError)
}
}
/**
创建 Callback 队列监听
*/
func (s *Server) rpcCallbackQueueListen() {
s.cli, err = s.rpcClientChannel.Consume(
fmt.Sprintf("%s_%s_client_%s", s.SysName, s.AppName, s.AppId), // queue
fmt.Sprintf("client.%s.%s", s.AppName, s.AppId), // consumer
true, // auto-ack
true, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
Log(fmt.Sprintf("Failed to register Rpc Callback consumer: %s", err), LogError)
}
Log(fmt.Sprintf("Rpc Client Timeout: %ds", s.RpcTimeout), LogInfo)
Log("Rpc Client Ready", LogInfo)
}
/**
RPC Clenit
*/
func (s *Server) rpcClient(appName, action string, params map[string]interface{}, msgId string) {
query, _ := json.Marshal(params)
err = s.rpcClientChannel.Publish(
s.SysName, // exchange
"server."+appName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
AppId: s.AppId,
MessageId: msgId,
ReplyTo: s.AppName,
Type: action,
Body: query,
})
if err != nil {
Log(fmt.Sprintf("Failed to publish Rpc Request: %s", err), LogError)
}
if s.Debug {
Log(fmt.Sprintf("RPC Request: (%s)%s->%s@%s %s", msgId, s.AppName, action, appName, query), LogDebug)
}
for d := range s.cli {
_, haveKey := s.cliResMap[d.CorrelationId]
if haveKey {
query, _ := simplejson.NewJson(d.Body)
if s.Debug {
logData, _ := query.MarshalJSON()
Log(fmt.Sprintf("RPC Response: (%s)%s@%s->%s %s", d.CorrelationId, d.Type, d.ReplyTo, s.AppName, logData), LogDebug)
}
s.cliResMap[d.CorrelationId] <- query
break
}
}
}
/**
发起 RPC请求
*/
func (s *Server) SendRpc(appName, action string, params map[string]interface{}) *simplejson.Json {
if s.DisableRpcClient {
Log("Rpc Client Disabled: DisableRpcClient set true", LogError)
ret := simplejson.New()
ret.Set("rpc_error", "rpc client disabled")
return ret
}
msgId := s.randomString(20)
s.cliResMap[msgId] = make(chan *simplejson.Json)
go s.rpcClient(appName, action, params, msgId)
select {
case ret := <-s.cliResMap[msgId]:
delete(s.cliResMap, msgId)
return ret
case <-time.After(time.Second * s.RpcTimeout):
delete(s.cliResMap, msgId)
ret := simplejson.New()
ret.Set("rpc_error", "timeout")
return ret
}
}