-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsend_recv.go
119 lines (100 loc) · 2.02 KB
/
send_recv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package netchan
import (
"net"
"net/http"
"net/rpc"
"sync"
)
type RecvServer struct {
mu sync.Mutex
m map[string]chan []byte
}
type Request struct {
Name string
Data []byte
}
func (s *RecvServer) getOrCreateCh(name string) chan []byte {
s.mu.Lock()
ch := s.m[name]
if ch == nil {
ch = make(chan []byte, 100)
s.m[name] = ch
}
s.mu.Unlock()
return ch
}
func (s *RecvServer) Put(req Request, _ *int) error {
ch := s.getOrCreateCh(req.Name)
ch <- req.Data
return nil
}
type SendRecver interface {
Send(network, address, name string, body []byte) error
Recv(name string) []byte
}
type SendRecv struct {
network string
server *RecvServer
mu sync.Mutex
clients map[string]*rpc.Client
}
func NewSendRecv() *SendRecv {
return &SendRecv{
server: &RecvServer{m: make(map[string]chan []byte)},
clients: make(map[string]*rpc.Client),
}
}
func (s *SendRecv) ListenAndServe(network, address string) error {
rpcServer := rpc.NewServer()
err := rpcServer.Register(s.server)
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle(rpc.DefaultRPCPath, rpcServer)
mux.Handle(rpc.DefaultDebugPath, rpcServer)
l, err := net.Listen(network, address)
if err != nil {
return err
}
return http.Serve(l, mux)
}
func (s *SendRecv) Recv(name string) []byte {
ch := s.server.getOrCreateCh(name)
return <-ch
}
func (s *SendRecv) Send(network, address, name string, body []byte) error {
s.mu.Lock()
client := s.clients[address]
s.mu.Unlock()
if client == nil {
var err error
client, err = rpc.DialHTTP(network, address)
if err != nil {
return err
}
}
var oldClient *rpc.Client
s.mu.Lock()
if s.clients[address] == nil {
s.clients[address] = client
} else {
if client != s.clients[address] {
oldClient = client
client = s.clients[address]
}
}
s.mu.Unlock()
if oldClient != nil {
err := oldClient.Close()
if err != nil {
return err
}
}
req := Request{Name: name, Data: body}
err := client.Call("RecvServer.Put", req, nil)
if err != nil {
return err
}
return nil
}