From c0bb3a8b7aa41b65f2f1dbf2a713e27b20823aba Mon Sep 17 00:00:00 2001 From: philhofer Date: Sat, 25 Jul 2015 10:47:50 -0700 Subject: [PATCH 1/6] wip --- client.go | 12 +++ client_test.go | 39 +++++++- command.go | 52 +++++++++- server.go | 24 +++-- service.go | 143 +++++++++++++++++++++++++++ service_gen.go | 236 ++++++++++++++++++++++++++++++++++++++++++++ service_gen_test.go | 185 ++++++++++++++++++++++++++++++++++ setup_test.go | 4 +- 8 files changed, 679 insertions(+), 16 deletions(-) create mode 100644 service.go create mode 100644 service_gen.go create mode 100644 service_gen_test.go diff --git a/client.go b/client.go index a7c9c48..71f4784 100644 --- a/client.go +++ b/client.go @@ -87,6 +87,8 @@ func NewClient(c net.Conn, timeout time.Duration) (*Client, error) { cl.Close() return nil, fmt.Errorf("synapse: ping failed: %s", err) } + // sync links asap + go cl.syncLinks() return cl, nil } @@ -94,6 +96,7 @@ func NewClient(c net.Conn, timeout time.Duration) (*Client, error) { // Client is a client to // a single synapse server. type Client struct { + svc string conn net.Conn // connection wlock sync.Mutex // write lock csn uint64 // sequence number; atomic @@ -104,6 +107,10 @@ type Client struct { pending wMap // map seq number to waiting handler } +func (c *Client) Service() string { + return c.svc +} + // used to transfer control // flow to blocking goroutines type waiter struct { @@ -455,3 +462,8 @@ func (c *Client) sendCommand(cmd command, msg []byte) error { func (c *Client) ping() error { return c.sendCommand(cmdPing, nil) } + +// sync known service addresses +func (c *Client) syncLinks() { + c.sendCommand(cmdListLinks, svclistbytes()) +} diff --git a/client_test.go b/client_test.go index 2bceec8..e0acdb7 100644 --- a/client_test.go +++ b/client_test.go @@ -15,6 +15,36 @@ func isCode(err error, c Status) bool { return false } +func TestClientServiceName(t *testing.T) { + if tcpClient.Service() != "test-endpoint" { + t.Errorf("expected service endpoint to be %q, but got %q", "test-endpoint", tcpClient.Service()) + } + if unxClient.Service() != "test-endpoint" { + t.Errorf("expected service endpoint to be %q, but got %q", "test-endpoint", unxClient.Service()) + } +} + +func TestNearest(t *testing.T) { + svc := Nearest("test-endpoint") + if svc == nil { + t.Error("expected Nearest(test-endpoint) to return something") + } else { + c, err := svc.Connect(5 * time.Millisecond) + if err != nil { + t.Errorf("couldn't connect to tcp service: %s", err) + } + c.Close() + } + + all := Services("test-endpoint") + if len(all) != 2 { + for _, s := range all { + t.Logf("service: %#v", *s) + } + t.Errorf("expected Services(test-endpoint) to return 2 elements; found %d", len(all)) + } +} + // open up a client and server; make // some concurrent requests func TestClient(t *testing.T) { @@ -81,7 +111,7 @@ func BenchmarkTCPEcho(b *testing.B) { time.Sleep(1 * time.Millisecond) }() - go Serve(l, EchoHandler{}) + go Serve(l, "bench-endpoint", EchoHandler{}) cl, err := Dial("tcp", "localhost:7000", 50*time.Millisecond) if err != nil { b.Fatal(err) @@ -116,8 +146,9 @@ func BenchmarkUnixNoop(b *testing.B) { l.Close() time.Sleep(1 * time.Millisecond) }() - go Serve(l, NopHandler{}) - cl, err := Dial("unix", "bench", 50*time.Millisecond) + + go Serve(l, "bench-endpoint", NopHandler{}) + cl, err := Dial("unix", "bench", 1*time.Millisecond) if err != nil { b.Fatal(err) } @@ -140,7 +171,7 @@ func BenchmarkUnixNoop(b *testing.B) { func BenchmarkPipeNoop(b *testing.B) { srv, cln := net.Pipe() - go ServeConn(srv, NopHandler{}) + go ServeConn(srv, "pipe", NopHandler{}) defer srv.Close() diff --git a/command.go b/command.go index aa108b7..0d74503 100644 --- a/command.go +++ b/command.go @@ -41,7 +41,8 @@ type command uint8 // cmdDirectory is a map of all the commands // to their respective actions var cmdDirectory = [_maxcommand]action{ - cmdPing: ping{}, + cmdPing: ping{}, + cmdListLinks: links{}, } // an action is the consequence @@ -72,6 +73,8 @@ const ( // command cmdPing + cmdListLinks + // a command >= _maxcommand // is invalid _maxcommand @@ -80,6 +83,49 @@ const ( // ping is a no-op on both sides type ping struct{} -func (p ping) Client(cl *Client, res []byte) {} +func (p ping) Client(cl *Client, res []byte) { + name := string(res) + r := cl.conn.RemoteAddr() + s := Service{ + name: name, + net: r.Network(), + addr: r.String(), + } + cache(&s) + cl.svc = name +} + +func (p ping) Server(ch *connHandler, body []byte) ([]byte, error) { + return ch.svcname, nil +} + +type links struct{} + +func (l links) Client(cl *Client, res []byte) { + var sl serviceList + _, err := sl.UnmarshalMsg(res) + if err != nil { + return + } + cachelist(sl) +} -func (p ping) Server(ch *connHandler, body []byte) ([]byte, error) { return nil, nil } +func (l links) Server(ch *connHandler, body []byte) ([]byte, error) { + var sl serviceList + _, err := sl.UnmarshalMsg(body) + if err != nil { + return nil, err + } + svcCache.Lock() + body, _ = svcCache.tab.MarshalMsg(body[:0]) + for _, sv := range sl { + // servers are responsibles + // for incrementing the distance + // counter when they receive an + // endpoint. + sv.distance++ + svcCache.tab[sv.name] = addSvc(svcCache.tab[sv.name], sv) + } + svcCache.Unlock() + return body, nil +} diff --git a/server.go b/server.go index 5e8b80f..9d39af7 100644 --- a/server.go +++ b/server.go @@ -31,13 +31,21 @@ const ( // Serve starts a Server on 'l' that serves // the supplied handler. It blocks until the // listener closes. -func Serve(l net.Listener, h Handler) error { +func Serve(l net.Listener, service string, h Handler) error { + a := l.Addr() + s := Service{ + name: service, + net: a.Network(), + addr: a.String(), + } + cache(&s) for { c, err := l.Accept() if err != nil { + uncache(&s) return err } - go ServeConn(c, h) + go ServeConn(c, service, h) } } @@ -47,7 +55,7 @@ func Serve(l net.Listener, h Handler) error { // Server must be provided. If the certificate is signed by a // certificate authority, the certFile should be the concatenation of // the server's certificate followed by the CA's certificate. -func ListenAndServeTLS(network, laddr string, certFile, keyFile string, h Handler) error { +func ListenAndServeTLS(network, laddr, service, certFile, keyFile string, h Handler) error { cert, err := tls.LoadX509KeyPair(certFile, keyFile) if err != nil { return err @@ -59,7 +67,7 @@ func ListenAndServeTLS(network, laddr string, certFile, keyFile string, h Handle if err != nil { return err } - return Serve(l, h) + return Serve(l, service, h) } // ListenAndServe opens up a network listener @@ -67,19 +75,20 @@ func ListenAndServeTLS(network, laddr string, certFile, keyFile string, h Handle // and begins serving with the provided handler. // ListenAndServe blocks until there is a fatal // listener error. -func ListenAndServe(network string, laddr string, h Handler) error { +func ListenAndServe(network, laddr, service string, h Handler) error { l, err := net.Listen(network, laddr) if err != nil { return err } - return Serve(l, h) + return Serve(l, service, h) } // ServeConn serves an individual network // connection. It blocks until the connection // is closed or it encounters a fatal error. -func ServeConn(c net.Conn, h Handler) { +func ServeConn(c net.Conn, service string, h Handler) { ch := connHandler{ + svcname: []byte(service), conn: c, h: h, remote: c.RemoteAddr(), @@ -120,6 +129,7 @@ func putFrame(bts []byte, seq uint64, ft fType, sz int) { // connections and multiplexes requests // to connWrappers type connHandler struct { + svcname []byte h Handler conn net.Conn remote net.Addr diff --git a/service.go b/service.go new file mode 100644 index 0000000..05c241e --- /dev/null +++ b/service.go @@ -0,0 +1,143 @@ +package synapse + +import ( + "net" + "sort" + "sync" + "time" +) + +//go:generate msgp -unexported -io=false + +// Nearest finds the "nearest" service +// with the given name. +func Nearest(service string) (s *Service) { + svcCache.Lock() + l := svcCache.tab[service] + if len(l) > 0 { + s = l[0] + } + svcCache.Unlock() + return +} + +// Services lists all of the known +// services endpoints for a given +// service name. +func Services(name string) []*Service { + svcCache.Lock() + l := svcCache.tab[name] + if len(l) == 0 { + svcCache.Unlock() + return nil + } + dup := make([]*Service, len(l)) + copy(dup, l) + svcCache.Unlock() + return dup +} + +// Service represents a unique address +// associated with a service. +type Service struct { + name, net, addr string + distance int32 +} + +func (s *Service) eqaddr(g *Service) bool { + return s.net == g.net && s.addr == g.addr +} + +func (s *Service) String() string { + return s.name + "@" + s.net + ":" + s.addr +} + +func (s *Service) Addr() (net, addr string) { + net, addr = s.net, s.addr + return +} + +func (s *Service) Connect(timeout time.Duration) (*Client, error) { + conn, err := net.Dial(s.net, s.addr) + if err != nil { + uncache(s) + return nil, err + } + return NewClient(conn, timeout) +} + +type serviceList []*Service + +func (s serviceList) Len() int { return len(s) } +func (s serviceList) Less(i, j int) bool { return s[i].distance < s[j].distance } +func (s serviceList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func addSvc(list []*Service, s *Service) []*Service { + if len(list) == 0 { + return []*Service{s} + } + for _, sv := range list { + if sv.eqaddr(s) { + if s.distance < sv.distance { + sv.distance = s.distance + } + return list + } + } + out := append(list, s) + sort.Sort(serviceList(out)) + return out +} + +func removeSvc(list []*Service, s *Service) []*Service { + ll := len(list) + if ll == 0 { + return nil + } + for i, sv := range list { + if sv.eqaddr(s) { + list[i], list[ll-1], list = list[ll-1], nil, list[:ll-1] + sort.Sort(serviceList(list)) + return list + } + } + return list +} + +type serviceTable map[string]serviceList + +var svcCache struct { + sync.Mutex + tab serviceTable +} + +func init() { + svcCache.tab = make(serviceTable) +} + +func cache(s *Service) { + svcCache.Lock() + svcCache.tab[s.name] = addSvc(svcCache.tab[s.name], s) + svcCache.Unlock() +} + +func cachelist(l serviceList) { + svcCache.Lock() + for _, sv := range l { + svcCache.tab[sv.name] = addSvc(svcCache.tab[sv.name], sv) + } + svcCache.Unlock() +} + +func uncache(s *Service) { + svcCache.Lock() + svcCache.tab[s.name] = removeSvc(svcCache.tab[s.name], s) + svcCache.Unlock() +} + +func svclistbytes() []byte { + svcCache.Lock() + data, _ := svcCache.tab.MarshalMsg(nil) + svcCache.Unlock() + return data +} diff --git a/service_gen.go b/service_gen.go new file mode 100644 index 0000000..9db0f8f --- /dev/null +++ b/service_gen.go @@ -0,0 +1,236 @@ +package synapse + +// NOTE: THIS FILE WAS PRODUCED BY THE +// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) +// DO NOT EDIT + +import ( + "github.com/tinylib/msgp/msgp" +) + +// MarshalMsg implements msgp.Marshaler +func (z *Service) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 4 + // string "name" + o = append(o, 0x84, 0xa4, 0x6e, 0x61, 0x6d, 0x65) + o = msgp.AppendString(o, z.name) + // string "net" + o = append(o, 0xa3, 0x6e, 0x65, 0x74) + o = msgp.AppendString(o, z.net) + // string "addr" + o = append(o, 0xa4, 0x61, 0x64, 0x64, 0x72) + o = msgp.AppendString(o, z.addr) + // string "distance" + o = append(o, 0xa8, 0x64, 0x69, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65) + o = msgp.AppendInt32(o, z.distance) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *Service) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var isz uint32 + isz, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for isz > 0 { + isz-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "name": + z.name, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + case "net": + z.net, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + case "addr": + z.addr, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + case "distance": + z.distance, bts, err = msgp.ReadInt32Bytes(bts) + if err != nil { + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +func (z *Service) Msgsize() (s int) { + s = 1 + 5 + msgp.StringPrefixSize + len(z.name) + 4 + msgp.StringPrefixSize + len(z.net) + 5 + msgp.StringPrefixSize + len(z.addr) + 9 + msgp.Int32Size + return +} + +// MarshalMsg implements msgp.Marshaler +func (z serviceList) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendArrayHeader(o, uint32(len(z))) + for xvk := range z { + if z[xvk] == nil { + o = msgp.AppendNil(o) + } else { + o, err = z[xvk].MarshalMsg(o) + if err != nil { + return + } + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *serviceList) UnmarshalMsg(bts []byte) (o []byte, err error) { + var xsz uint32 + xsz, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + return + } + if cap((*z)) >= int(xsz) { + (*z) = (*z)[:xsz] + } else { + (*z) = make(serviceList, xsz) + } + for bzg := range *z { + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + (*z)[bzg] = nil + } else { + if (*z)[bzg] == nil { + (*z)[bzg] = new(Service) + } + bts, err = (*z)[bzg].UnmarshalMsg(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +func (z serviceList) Msgsize() (s int) { + s = msgp.ArrayHeaderSize + for bai := range z { + if z[bai] == nil { + s += msgp.NilSize + } else { + s += z[bai].Msgsize() + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z serviceTable) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendMapHeader(o, uint32(len(z))) + for cmr, ajw := range z { + o = msgp.AppendString(o, cmr) + o = msgp.AppendArrayHeader(o, uint32(len(ajw))) + for wht := range ajw { + if ajw[wht] == nil { + o = msgp.AppendNil(o) + } else { + o, err = ajw[wht].MarshalMsg(o) + if err != nil { + return + } + } + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *serviceTable) UnmarshalMsg(bts []byte) (o []byte, err error) { + var msz uint32 + msz, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + if (*z) == nil && msz > 0 { + (*z) = make(serviceTable, msz) + } else if len((*z)) > 0 { + for key, _ := range *z { + delete((*z), key) + } + } + for msz > 0 { + var hct string + var cua serviceList + msz-- + hct, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + var xsz uint32 + xsz, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + return + } + if cap(cua) >= int(xsz) { + cua = cua[:xsz] + } else { + cua = make(serviceList, xsz) + } + for xhx := range cua { + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + cua[xhx] = nil + } else { + if cua[xhx] == nil { + cua[xhx] = new(Service) + } + bts, err = cua[xhx].UnmarshalMsg(bts) + if err != nil { + return + } + } + } + (*z)[hct] = cua + } + o = bts + return +} + +func (z serviceTable) Msgsize() (s int) { + s = msgp.MapHeaderSize + if z != nil { + for lqf, daf := range z { + _ = daf + s += msgp.StringPrefixSize + len(lqf) + msgp.ArrayHeaderSize + for pks := range daf { + if daf[pks] == nil { + s += msgp.NilSize + } else { + s += daf[pks].Msgsize() + } + } + } + } + return +} diff --git a/service_gen_test.go b/service_gen_test.go new file mode 100644 index 0000000..ea5eb54 --- /dev/null +++ b/service_gen_test.go @@ -0,0 +1,185 @@ +package synapse + +// NOTE: THIS FILE WAS PRODUCED BY THE +// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) +// DO NOT EDIT + +import ( + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalService(t *testing.T) { + v := Service{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgService(b *testing.B) { + v := Service{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgService(b *testing.B) { + v := Service{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalService(b *testing.B) { + v := Service{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalserviceList(t *testing.T) { + v := serviceList{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgserviceList(b *testing.B) { + v := serviceList{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgserviceList(b *testing.B) { + v := serviceList{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalserviceList(b *testing.B) { + v := serviceList{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalserviceTable(t *testing.T) { + v := serviceTable{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgserviceTable(b *testing.B) { + v := serviceTable{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgserviceTable(b *testing.B) { + v := serviceTable{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalserviceTable(b *testing.B) { + v := serviceTable{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/setup_test.go b/setup_test.go index ef72d21..f9111af 100644 --- a/setup_test.go +++ b/setup_test.go @@ -97,14 +97,14 @@ func TestMain(m *testing.M) { panic(err) } - go Serve(l, rt) + go Serve(l, "test-endpoint", rt) ul, err := net.Listen("unix", "synapse") if err != nil { panic(err) } - go Serve(ul, rt) + go Serve(ul, "test-endpoint", rt) tcpClient, err = Dial("tcp", ":7070", 5*time.Millisecond) if err != nil { From 1d836f24561bb002882635bdfad99c5ebeab34fb Mon Sep 17 00:00:00 2001 From: philhofer Date: Sun, 26 Jul 2015 19:27:57 -0700 Subject: [PATCH 2/6] passing tests --- client.go | 8 +--- client_test.go | 20 +++++----- command.go | 101 +++++++++++++++++++++++++++++-------------------- server.go | 50 +++++++++++++++++++++--- service.go | 71 ++++++++++++++++++++++------------ service_gen.go | 24 ++++++++---- 6 files changed, 178 insertions(+), 96 deletions(-) diff --git a/client.go b/client.go index 71f4784..fc6ad2e 100644 --- a/client.go +++ b/client.go @@ -441,17 +441,11 @@ func (c *Client) sendCommand(cmd command, msg []byte) error { ret := command(w.in[0]) if ret == cmdInvalid || ret >= _maxcommand { - waiters.push(w) - return errInvalidCmd - } - - act := cmdDirectory[ret] - if act == nil { waiters.push(w) return errUnknownCmd } - act.Client(c, w.in[1:]) + cmdDirectory[ret].done(c, w.in[1:]) waiters.push(w) return nil } diff --git a/client_test.go b/client_test.go index e0acdb7..dc05f7e 100644 --- a/client_test.go +++ b/client_test.go @@ -28,20 +28,20 @@ func TestNearest(t *testing.T) { svc := Nearest("test-endpoint") if svc == nil { t.Error("expected Nearest(test-endpoint) to return something") - } else { - c, err := svc.Connect(5 * time.Millisecond) - if err != nil { - t.Errorf("couldn't connect to tcp service: %s", err) - } - c.Close() } all := Services("test-endpoint") - if len(all) != 2 { - for _, s := range all { - t.Logf("service: %#v", *s) + for _, s := range all { + t.Logf("found service: %v", s) + if s.Name() != "test-endpoint" { + t.Errorf("expected name %q -- got %q", "test-endpoint", s.Name()) + } + if s.HostID() != hostid { + t.Errorf("expected host id %d; got %d", hostid, s.HostID()) } - t.Errorf("expected Services(test-endpoint) to return 2 elements; found %d", len(all)) + } + if len(all) < 2 { + t.Errorf("expected at least 2 services; found %d", len(all)) } } diff --git a/command.go b/command.go index 0d74503..cf606fd 100644 --- a/command.go +++ b/command.go @@ -38,28 +38,32 @@ const ( // to the other type command uint8 -// cmdDirectory is a map of all the commands -// to their respective actions -var cmdDirectory = [_maxcommand]action{ - cmdPing: ping{}, - cmdListLinks: links{}, +// Global directory of command handlers. +// +// If you need to implement a new command, +// add it here. +var cmdDirectory = [_maxcommand]cmdact{ + cmdInvalid: {badhandle, recvbad}, + cmdPing: {pinghandle, recvping}, + cmdListLinks: {sendlinks, recvlinks}, } -// an action is the consequence -// of a command - commands are -// mapped to actions -type action interface { - // Client is the action carried out on the client side - // when it receives a command response from a server - Client(c *Client, msg []byte) - - // Sever is the action carried out on the server side. It - // should return the reponse message (if any), and any - // error encountered. Errors will result in cmdInvalid - // sent to the client. - Server(ch *connHandler, msg []byte) (res []byte, err error) +type cmdact struct { + // handle is called by the server when + // receiving a particular command + handle func(ch *connHandler, msg []byte) ([]byte, error) + + // done is called as the client-side finalizer + done func(c *Client, msg []byte) } +// no-op handlers +func badhandle(ch *connHandler, msg []byte) ([]byte, error) { + return []byte{byte(cmdInvalid)}, nil +} + +func recvbad(c *Client, msg []byte) {} + // list of commands const ( // cmdInvalid is used @@ -80,50 +84,65 @@ const ( _maxcommand ) -// ping is a no-op on both sides -type ping struct{} - -func (p ping) Client(cl *Client, res []byte) { - name := string(res) - r := cl.conn.RemoteAddr() - s := Service{ - name: name, - net: r.Network(), - addr: r.String(), +// client-side ping finalizer +func recvping(cl *Client, res []byte) { + var s Service + _, err := s.UnmarshalMsg(res) + if err != nil { + return } + r := cl.conn.RemoteAddr() + s.net = r.Network() + s.addr = r.String() cache(&s) - cl.svc = name + cl.svc = s.name } -func (p ping) Server(ch *connHandler, body []byte) ([]byte, error) { - return ch.svcname, nil +// server-side ping handler +func pinghandle(ch *connHandler, body []byte) ([]byte, error) { + s := Service{ + name: string(ch.svcname), + host: hostid, + } + return s.MarshalMsg(nil) } -type links struct{} - -func (l links) Client(cl *Client, res []byte) { +func recvlinks(cl *Client, res []byte) { var sl serviceList _, err := sl.UnmarshalMsg(res) if err != nil { return } - cachelist(sl) + svcCache.Lock() + for _, sv := range sl { + if sv.host == hostid || !isRoutable(sv) { + continue + } + svcCache.tab[sv.name] = addSvc(svcCache.tab[sv.name], sv) + } + svcCache.Unlock() } -func (l links) Server(ch *connHandler, body []byte) ([]byte, error) { +func sendlinks(ch *connHandler, body []byte) ([]byte, error) { var sl serviceList _, err := sl.UnmarshalMsg(body) if err != nil { return nil, err } + if ch.route != routeOSLocal { + // for each non-os-local + // service, increment the + // hop counter + for _, sv := range sl { + sv.dist++ + } + } svcCache.Lock() body, _ = svcCache.tab.MarshalMsg(body[:0]) for _, sv := range sl { - // servers are responsibles - // for incrementing the distance - // counter when they receive an - // endpoint. - sv.distance++ + if sv.host == hostid { + continue + } svcCache.tab[sv.name] = addSvc(svcCache.tab[sv.name], sv) } svcCache.Unlock() diff --git a/server.go b/server.go index 9d39af7..8a30801 100644 --- a/server.go +++ b/server.go @@ -37,6 +37,7 @@ func Serve(l net.Listener, service string, h Handler) error { name: service, net: a.Network(), addr: a.String(), + host: hostid, } cache(&s) for { @@ -88,11 +89,12 @@ func ListenAndServe(network, laddr, service string, h Handler) error { // is closed or it encounters a fatal error. func ServeConn(c net.Conn, service string, h Handler) { ch := connHandler{ - svcname: []byte(service), + svcname: service, conn: c, h: h, remote: c.RemoteAddr(), writing: make(chan *connWrapper, 32), + route: getRoute(c), } go ch.writeLoop() ch.connLoop() // returns on connection close @@ -125,16 +127,54 @@ func putFrame(bts []byte, seq uint64, ft fType, sz int) { bts[10] = byte(usz) } +// route is the network pathway +// assocaited with a particular +// conneciton (global, link-local, +// loopback, etc.) +type route uint16 + +const ( + routeUnknown route = iota + routeGlobal // globally routable + routeLinkLocal // link-local + routeOSLocal // same machine (unix socket / loopback) +) + +func getRoute(c net.Conn) route { + raddr := c.RemoteAddr() + nwk := raddr.Network() + switch nwk { + case "unix": + return routeOSLocal + case "tcp", "tcp4", "tcp6": + a, err := net.ResolveIPAddr(nwk, raddr.String()) + if err != nil { + return routeUnknown + } + if a.IP.IsLoopback() { + return routeOSLocal + } else if a.IP.IsLinkLocalUnicast() { + return routeLinkLocal + } else if a.IP.IsGlobalUnicast() { + return routeGlobal + } + fallthrough + default: + return routeUnknown + } +} + // connHandler handles network // connections and multiplexes requests // to connWrappers type connHandler struct { - svcname []byte + svcname string h Handler conn net.Conn remote net.Addr wg sync.WaitGroup // outstanding handlers writing chan *connWrapper // write queue + route route // from whence? } func (c *connHandler) writeLoop() error { @@ -323,15 +363,13 @@ func handleCmd(c *connHandler, seq uint64, cmd command, body []byte) { if cmd == cmdInvalid || cmd >= _maxcommand { return } - - act := cmdDirectory[cmd] resbyte := byte(cmd) var res []byte var err error - if act == nil { + if cmd == cmdInvalid || cmd >= _maxcommand { resbyte = byte(cmdInvalid) } else { - res, err = act.Server(c, body) + res, err = cmdDirectory[cmd].handle(c, body) if err != nil { resbyte = byte(cmdInvalid) } diff --git a/service.go b/service.go index 05c241e..30573cb 100644 --- a/service.go +++ b/service.go @@ -1,10 +1,12 @@ package synapse import ( + "crypto/rand" + "encoding/binary" + "fmt" "net" "sort" "sync" - "time" ) //go:generate msgp -unexported -io=false @@ -40,16 +42,23 @@ func Services(name string) []*Service { // Service represents a unique address // associated with a service. type Service struct { - name, net, addr string - distance int32 + host uint64 // host id + name, net, addr string // address + dist int32 // distance } +func (s *Service) HostID() uint64 { return s.host } + +func (s *Service) Name() string { return s.name } + func (s *Service) eqaddr(g *Service) bool { return s.net == g.net && s.addr == g.addr } +// String returns {Name}#{HostID}@{net}:{addr} +// e.g. echo#9081234973@tcp:localhost:7000 func (s *Service) String() string { - return s.name + "@" + s.net + ":" + s.addr + return fmt.Sprintf("%s#%d@%s:%s", s.name, s.host, s.net, s.addr) } func (s *Service) Addr() (net, addr string) { @@ -57,19 +66,10 @@ func (s *Service) Addr() (net, addr string) { return } -func (s *Service) Connect(timeout time.Duration) (*Client, error) { - conn, err := net.Dial(s.net, s.addr) - if err != nil { - uncache(s) - return nil, err - } - return NewClient(conn, timeout) -} - type serviceList []*Service func (s serviceList) Len() int { return len(s) } -func (s serviceList) Less(i, j int) bool { return s[i].distance < s[j].distance } +func (s serviceList) Less(i, j int) bool { return s[i].dist < s[j].dist } func (s serviceList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func addSvc(list []*Service, s *Service) []*Service { @@ -78,8 +78,8 @@ func addSvc(list []*Service, s *Service) []*Service { } for _, sv := range list { if sv.eqaddr(s) { - if s.distance < sv.distance { - sv.distance = s.distance + if s.dist < sv.dist { + sv.dist = s.dist } return list } @@ -106,26 +106,49 @@ func removeSvc(list []*Service, s *Service) []*Service { type serviceTable map[string]serviceList +// svcCache stores all of the +// known services to this hostid. +// var svcCache struct { sync.Mutex tab serviceTable } +// host id +var hostid uint64 + +func HostID() uint64 { return hostid } + func init() { svcCache.tab = make(serviceTable) + + var buf [8]byte + rand.Read(buf[:]) + hostid = binary.LittleEndian.Uint64(buf[:]) } -func cache(s *Service) { - svcCache.Lock() - svcCache.tab[s.name] = addSvc(svcCache.tab[s.name], s) - svcCache.Unlock() +func isRoutable(s *Service) bool { + // unix sockets, etc. can + // be connected to on the + // same machine + if s.dist == 0 { + return true + } + + switch s.net { + case "tcp", "tcp6", "tcp4": + a, err := net.ResolveTCPAddr(s.net, s.addr) + + // TODO: link-local addresses + return err == nil && a.IP.IsGlobalUnicast() + default: + return false + } } -func cachelist(l serviceList) { +func cache(s *Service) { svcCache.Lock() - for _, sv := range l { - svcCache.tab[sv.name] = addSvc(svcCache.tab[sv.name], sv) - } + svcCache.tab[s.name] = addSvc(svcCache.tab[s.name], s) svcCache.Unlock() } diff --git a/service_gen.go b/service_gen.go index 9db0f8f..497909f 100644 --- a/service_gen.go +++ b/service_gen.go @@ -11,9 +11,12 @@ import ( // MarshalMsg implements msgp.Marshaler func (z *Service) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 4 + // map header, size 5 + // string "host" + o = append(o, 0x85, 0xa4, 0x68, 0x6f, 0x73, 0x74) + o = msgp.AppendUint64(o, z.host) // string "name" - o = append(o, 0x84, 0xa4, 0x6e, 0x61, 0x6d, 0x65) + o = append(o, 0xa4, 0x6e, 0x61, 0x6d, 0x65) o = msgp.AppendString(o, z.name) // string "net" o = append(o, 0xa3, 0x6e, 0x65, 0x74) @@ -21,9 +24,9 @@ func (z *Service) MarshalMsg(b []byte) (o []byte, err error) { // string "addr" o = append(o, 0xa4, 0x61, 0x64, 0x64, 0x72) o = msgp.AppendString(o, z.addr) - // string "distance" - o = append(o, 0xa8, 0x64, 0x69, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65) - o = msgp.AppendInt32(o, z.distance) + // string "dist" + o = append(o, 0xa4, 0x64, 0x69, 0x73, 0x74) + o = msgp.AppendInt32(o, z.dist) return } @@ -43,6 +46,11 @@ func (z *Service) UnmarshalMsg(bts []byte) (o []byte, err error) { return } switch msgp.UnsafeString(field) { + case "host": + z.host, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } case "name": z.name, bts, err = msgp.ReadStringBytes(bts) if err != nil { @@ -58,8 +66,8 @@ func (z *Service) UnmarshalMsg(bts []byte) (o []byte, err error) { if err != nil { return } - case "distance": - z.distance, bts, err = msgp.ReadInt32Bytes(bts) + case "dist": + z.dist, bts, err = msgp.ReadInt32Bytes(bts) if err != nil { return } @@ -75,7 +83,7 @@ func (z *Service) UnmarshalMsg(bts []byte) (o []byte, err error) { } func (z *Service) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(z.name) + 4 + msgp.StringPrefixSize + len(z.net) + 5 + msgp.StringPrefixSize + len(z.addr) + 9 + msgp.Int32Size + s = 1 + 5 + msgp.Uint64Size + 5 + msgp.StringPrefixSize + len(z.name) + 4 + msgp.StringPrefixSize + len(z.net) + 5 + msgp.StringPrefixSize + len(z.addr) + 5 + msgp.Int32Size return } From 5b3111f85b6abe08b43626d07a153172878346d4 Mon Sep 17 00:00:00 2001 From: philhofer Date: Sun, 26 Jul 2015 19:54:44 -0700 Subject: [PATCH 3/6] prefer unix sockets to tcp loopback --- client_test.go | 3 +++ command.go | 2 ++ server.go | 11 ++++------- service.go | 18 +++++++++++++++--- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/client_test.go b/client_test.go index dc05f7e..c4ce2ce 100644 --- a/client_test.go +++ b/client_test.go @@ -29,6 +29,9 @@ func TestNearest(t *testing.T) { if svc == nil { t.Error("expected Nearest(test-endpoint) to return something") } + if nwk, _ := svc.Addr(); nwk != "unix" { + t.Error("expected nearest endpoint to be a unix socket") + } all := Services("test-endpoint") for _, s := range all { diff --git a/command.go b/command.go index cf606fd..cde868f 100644 --- a/command.go +++ b/command.go @@ -77,6 +77,8 @@ const ( // command cmdPing + // sync service addresses + // between client and server cmdListLinks // a command >= _maxcommand diff --git a/server.go b/server.go index 8a30801..dbe9381 100644 --- a/server.go +++ b/server.go @@ -134,10 +134,10 @@ func putFrame(bts []byte, seq uint64, ft fType, sz int) { type route uint16 const ( - routeUnknown route = iota - routeGlobal // globally routable - routeLinkLocal // link-local - routeOSLocal // same machine (unix socket / loopback) + routeUnknown route = iota // no idea + routeGlobal // globally routable + routeLinkLocal // link-local + routeOSLocal // same machine (unix socket / loopback) ) func getRoute(c net.Conn) route { @@ -360,9 +360,6 @@ func (c *connHandler) handleReq(cw *connWrapper) { } func handleCmd(c *connHandler, seq uint64, cmd command, body []byte) { - if cmd == cmdInvalid || cmd >= _maxcommand { - return - } resbyte := byte(cmd) var res []byte var err error diff --git a/service.go b/service.go index 30573cb..2df1936 100644 --- a/service.go +++ b/service.go @@ -68,9 +68,21 @@ func (s *Service) Addr() (net, addr string) { type serviceList []*Service -func (s serviceList) Len() int { return len(s) } -func (s serviceList) Less(i, j int) bool { return s[i].dist < s[j].dist } -func (s serviceList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s serviceList) Len() int { return len(s) } +func (s serviceList) Less(i, j int) bool { + + // for os-local addresses, prefer unix sockets + // over loopback tcp + if s[i].dist == 0 && s[i].dist == s[j].dist { + if s[i].net == "unix" && s[j].net != "unix" { + return true + } + } + + return s[i].dist < s[j].dist +} + +func (s serviceList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func addSvc(list []*Service, s *Service) []*Service { if len(list) == 0 { From 627c94826d2bd4fd999508f1ff1e7a1f992c0a81 Mon Sep 17 00:00:00 2001 From: philhofer Date: Sun, 26 Jul 2015 22:47:45 -0700 Subject: [PATCH 4/6] write a primitive multi-host test --- command.go | 39 ++++++++++---------- svc_test/sstc/main.go | 82 +++++++++++++++++++++++++++++++++++++++++++ svc_test/ssts/main.go | 73 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 176 insertions(+), 18 deletions(-) create mode 100644 svc_test/sstc/main.go create mode 100644 svc_test/ssts/main.go diff --git a/command.go b/command.go index cde868f..fe0defa 100644 --- a/command.go +++ b/command.go @@ -110,42 +110,45 @@ func pinghandle(ch *connHandler, body []byte) ([]byte, error) { } func recvlinks(cl *Client, res []byte) { - var sl serviceList + sl := serviceTable{} _, err := sl.UnmarshalMsg(res) if err != nil { return } svcCache.Lock() - for _, sv := range sl { - if sv.host == hostid || !isRoutable(sv) { - continue + for name, sv := range sl { + list := svcCache.tab[name] + for _, s := range sv { + if s.host == hostid || !isRoutable(s) { + continue + } + list = addSvc(list, s) } - svcCache.tab[sv.name] = addSvc(svcCache.tab[sv.name], sv) + svcCache.tab[name] = list } svcCache.Unlock() } func sendlinks(ch *connHandler, body []byte) ([]byte, error) { - var sl serviceList + sl := serviceTable{} _, err := sl.UnmarshalMsg(body) if err != nil { return nil, err } - if ch.route != routeOSLocal { - // for each non-os-local - // service, increment the - // hop counter - for _, sv := range sl { - sv.dist++ - } - } svcCache.Lock() body, _ = svcCache.tab.MarshalMsg(body[:0]) - for _, sv := range sl { - if sv.host == hostid { - continue + for name, sv := range sl { + list := svcCache.tab[name] + for _, s := range sv { + if s.host == hostid { + continue + } + if ch.route != routeOSLocal { + s.dist++ + } + list = addSvc(list, s) } - svcCache.tab[sv.name] = addSvc(svcCache.tab[sv.name], sv) + svcCache.tab[name] = list } svcCache.Unlock() return body, nil diff --git a/svc_test/sstc/main.go b/svc_test/sstc/main.go new file mode 100644 index 0000000..c04fedd --- /dev/null +++ b/svc_test/sstc/main.go @@ -0,0 +1,82 @@ +package main + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/tinylib/synapse" +) + +var ( + port = flag.String("port", ":7000", "tcp port to dial") +) + +func fatalln(str string) { + fmt.Println(str) + os.Exit(1) +} + +func perror(str string, err error) { + fmt.Println(str, err) + os.Exit(1) +} + +func dumpservices(srv string) { + fmt.Print("Known addresses for service", srv) + sv := synapse.Services(srv) + if len(sv) == 0 { + fmt.Print(": None.\n") + return + } + fmt.Print("\n") + for _, s := range sv { + fmt.Println("\t", s) + } +} + +func main() { + cl, err := synapse.Dial("tcp", *port, 25*time.Millisecond) + if err != nil { + perror("dial failure:", err) + } + fmt.Println("Connected to service", cl.Service()) + + err = cl.Call("echo", synapse.String("hello!"), nil) + if err != nil { + perror("call error:", err) + } + + // wait for service lists to synchronize + time.Sleep(50 * time.Millisecond) + + dumpservices(cl.Service()) + + ss := synapse.Nearest(cl.Service()) + if ss == nil { + fatalln("Nearest() returned nil") + } + + nwk, sock := ss.Addr() + if nwk != "unix" { + fatalln("Nearest(service).Addr() didn't return a unix socket") + } + + fmt.Println("Found socket", sock) + var cl2 *synapse.Client + + cl2, err = synapse.Dial(nwk, sock, 25*time.Millisecond) + if err != nil { + perror("couldn't dial socket:", err) + } + err = cl2.Close() + if err != nil { + perror("close error:", err) + } + + err = cl.Close() + if err != nil { + perror("close error:", err) + } +} diff --git a/svc_test/ssts/main.go b/svc_test/ssts/main.go new file mode 100644 index 0000000..10c4de9 --- /dev/null +++ b/svc_test/ssts/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "flag" + "fmt" + "net" + "os" + "os/signal" + "strings" + + "github.com/tinylib/msgp/msgp" + "github.com/tinylib/synapse" +) + +var ( + sock = flag.String("sock", "ssvc_sock", "unix socket to listen on") + port = flag.String("port", ":7000", "tcp port to dial") +) + +func fatalln(str string) { + fmt.Println(str) + os.Exit(1) +} + +func perror(str string, err error) { + fmt.Println(str, err) + os.Exit(1) +} + +type echoHandler struct{} + +func (e echoHandler) ServeCall(req synapse.Request, res synapse.ResponseWriter) { + var r msgp.Raw + if err := req.Decode(&r); err != nil { + perror("error on Request.Decode():", err) + } + if err := res.Send(r); err != nil { + perror("error on ResponseWriter.Send():", err) + } + + sv := synapse.Services("service-test") + for _, s := range sv { + fmt.Println("Server: known service:", s) + } +} + +func main() { + go func() { + err := synapse.ListenAndServe("tcp", *port, "service-test", echoHandler{}) + if err != nil { + perror("listen tcp error:", err) + } + }() + ul, err := net.Listen("unix", *sock) + if err != nil { + perror("listen unix error:", err) + } + + // cleanup unix socket + // on kill so we don't + // leave it lying around + go func() { + in := make(chan os.Signal, 1) + signal.Notify(in, os.Interrupt) + <-in + ul.Close() + }() + + err = synapse.Serve(ul, "service-test", echoHandler{}) + if err != nil && !strings.Contains(err.Error(), "closed") { + perror("serve unix error:", err) + } +} From 797fd208b5039e0537f6477469c7a7e2d274a63b Mon Sep 17 00:00:00 2001 From: philhofer Date: Mon, 27 Jul 2015 16:25:17 -0700 Subject: [PATCH 5/6] add service test script --- .travis.yml | 8 +++++++- svc-test.bash | 9 +++++++++ svc_test/sstc/main.go | 23 ++++++++++++----------- svc_test/ssts/main.go | 16 +++++++++------- 4 files changed, 37 insertions(+), 19 deletions(-) create mode 100755 svc-test.bash diff --git a/.travis.yml b/.travis.yml index aa1b91f..df6c8b5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,10 @@ language: go go: - - 1.4 \ No newline at end of file + - 1.4 + - tip + +script: + - go install ./... + - go test -v + - ./svc-test.bash diff --git a/svc-test.bash b/svc-test.bash new file mode 100755 index 0000000..fc47324 --- /dev/null +++ b/svc-test.bash @@ -0,0 +1,9 @@ +#! /bin/bash + +ssts & +SSTS_PID=$! +sleep 0.1 +sstc +RET=$? +eval kill -s SIGINT $SSTS_PID || echo "Couldn't kill server." +exit $RET diff --git a/svc_test/sstc/main.go b/svc_test/sstc/main.go index c04fedd..da11194 100644 --- a/svc_test/sstc/main.go +++ b/svc_test/sstc/main.go @@ -10,7 +10,7 @@ import ( ) var ( - port = flag.String("port", ":7000", "tcp port to dial") + port = flag.String("port", ":7005", "tcp port to dial") ) func fatalln(str string) { @@ -24,7 +24,7 @@ func perror(str string, err error) { } func dumpservices(srv string) { - fmt.Print("Known addresses for service", srv) + fmt.Print("Known addresses for service ", srv) sv := synapse.Services(srv) if len(sv) == 0 { fmt.Print(": None.\n") @@ -39,13 +39,13 @@ func dumpservices(srv string) { func main() { cl, err := synapse.Dial("tcp", *port, 25*time.Millisecond) if err != nil { - perror("dial failure:", err) + perror("client: dial failure:", err) } - fmt.Println("Connected to service", cl.Service()) + fmt.Println("client: connected to service", cl.Service()) err = cl.Call("echo", synapse.String("hello!"), nil) if err != nil { - perror("call error:", err) + perror("client: call error:", err) } // wait for service lists to synchronize @@ -55,28 +55,29 @@ func main() { ss := synapse.Nearest(cl.Service()) if ss == nil { - fatalln("Nearest() returned nil") + fatalln("client: Nearest() returned nil") } nwk, sock := ss.Addr() if nwk != "unix" { - fatalln("Nearest(service).Addr() didn't return a unix socket") + fatalln("client: Nearest(service).Addr() didn't return a unix socket") } - fmt.Println("Found socket", sock) + fmt.Println("client: found socket", sock) var cl2 *synapse.Client cl2, err = synapse.Dial(nwk, sock, 25*time.Millisecond) if err != nil { - perror("couldn't dial socket:", err) + perror("client: couldn't dial socket:", err) } err = cl2.Close() if err != nil { - perror("close error:", err) + perror("client: close error:", err) } err = cl.Close() if err != nil { - perror("close error:", err) + perror("client: close error:", err) } + fmt.Println("client: exiting successfully") } diff --git a/svc_test/ssts/main.go b/svc_test/ssts/main.go index 10c4de9..a2bc115 100644 --- a/svc_test/ssts/main.go +++ b/svc_test/ssts/main.go @@ -14,7 +14,7 @@ import ( var ( sock = flag.String("sock", "ssvc_sock", "unix socket to listen on") - port = flag.String("port", ":7000", "tcp port to dial") + port = flag.String("port", ":7005", "tcp port to dial") ) func fatalln(str string) { @@ -32,15 +32,15 @@ type echoHandler struct{} func (e echoHandler) ServeCall(req synapse.Request, res synapse.ResponseWriter) { var r msgp.Raw if err := req.Decode(&r); err != nil { - perror("error on Request.Decode():", err) + perror("server: error on Request.Decode():", err) } if err := res.Send(r); err != nil { - perror("error on ResponseWriter.Send():", err) + perror("server: error on ResponseWriter.Send():", err) } sv := synapse.Services("service-test") for _, s := range sv { - fmt.Println("Server: known service:", s) + fmt.Println("serrver: known service:", s) } } @@ -48,12 +48,12 @@ func main() { go func() { err := synapse.ListenAndServe("tcp", *port, "service-test", echoHandler{}) if err != nil { - perror("listen tcp error:", err) + perror("server: listen tcp error:", err) } }() ul, err := net.Listen("unix", *sock) if err != nil { - perror("listen unix error:", err) + perror("server: listen unix error:", err) } // cleanup unix socket @@ -64,10 +64,12 @@ func main() { signal.Notify(in, os.Interrupt) <-in ul.Close() + fmt.Println("server: exiting successfully") + os.Exit(0) }() err = synapse.Serve(ul, "service-test", echoHandler{}) if err != nil && !strings.Contains(err.Error(), "closed") { - perror("serve unix error:", err) + perror("server: serve unix error:", err) } } From 3a474dc359aab1511647107521afff2dc33f6bb4 Mon Sep 17 00:00:00 2001 From: philhofer Date: Mon, 27 Jul 2015 18:06:43 -0700 Subject: [PATCH 6/6] optionally log errors (used in testing hook to catch non-fatal errors) --- client.go | 28 +++++++++++++++++++++++++++- client_test.go | 5 +++++ command.go | 4 +++- server.go | 7 +++---- service.go | 7 +++++-- setup_test.go | 25 +++++++++++++++++++++++++ svc-test.bash | 3 ++- svc_test/sstc/main.go | 11 ++++++++++- svc_test/ssts/main.go | 5 +++++ 9 files changed, 85 insertions(+), 10 deletions(-) diff --git a/client.go b/client.go index fc6ad2e..6f585f6 100644 --- a/client.go +++ b/client.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "errors" "fmt" + "log" "net" "sync" "sync/atomic" @@ -41,6 +42,27 @@ var ( ErrTooLarge = errors.New("synapse: message body too large") ) +// ErrorLogger is the logger used +// by the package to log protocol +// errors. (In general, protocol-level +// errors are not returned directly +// to the client.) It can be set during +// initialization. If it is left +// as nil, nothing will be logged. +var ErrorLogger *log.Logger + +func errorln(s string) { + if ErrorLogger != nil { + ErrorLogger.Println(s) + } +} + +func errorf(s string, args ...interface{}) { + if ErrorLogger != nil { + ErrorLogger.Printf(s, args...) + } +} + // Dial creates a new client by dialing // the provided network and remote address. // The provided timeout is used as the timeout @@ -200,6 +222,7 @@ func (c *Client) readLoop() { // they are routed to waiters // precisely the same way if frame != fCMD && frame != fRES { + errorf("server at addr %s sent a bad frame", c.conn.RemoteAddr()) // ignore if !c.do(bwr.Skip(sz)) { return @@ -459,5 +482,8 @@ func (c *Client) ping() error { // sync known service addresses func (c *Client) syncLinks() { - c.sendCommand(cmdListLinks, svclistbytes()) + err := c.sendCommand(cmdListLinks, svclistbytes()) + if err != nil { + errorf("error synchronizing links: %s", err) + } } diff --git a/client_test.go b/client_test.go index c4ce2ce..1a6519d 100644 --- a/client_test.go +++ b/client_test.go @@ -16,6 +16,7 @@ func isCode(err error, c Status) bool { } func TestClientServiceName(t *testing.T) { + setTestLog(t) if tcpClient.Service() != "test-endpoint" { t.Errorf("expected service endpoint to be %q, but got %q", "test-endpoint", tcpClient.Service()) } @@ -25,6 +26,7 @@ func TestClientServiceName(t *testing.T) { } func TestNearest(t *testing.T) { + setTestLog(t) svc := Nearest("test-endpoint") if svc == nil { t.Error("expected Nearest(test-endpoint) to return something") @@ -51,6 +53,7 @@ func TestNearest(t *testing.T) { // open up a client and server; make // some concurrent requests func TestClient(t *testing.T) { + setTestLog(t) const concurrent = 5 wg := new(sync.WaitGroup) @@ -83,6 +86,7 @@ func TestClient(t *testing.T) { // the output of the debug handler // is only visible if '-v' is set func TestDebugClient(t *testing.T) { + setTestLog(t) instr := String("here's a message body!") var outstr String err := tcpClient.Call(DebugEcho, &instr, &outstr) @@ -97,6 +101,7 @@ func TestDebugClient(t *testing.T) { // test that 'nil' is a safe // argument to requests and responses func TestNop(t *testing.T) { + setTestLog(t) err := tcpClient.Call(Nop, nil, nil) if err != nil { t.Fatal(err) diff --git a/command.go b/command.go index fe0defa..c0b3fd1 100644 --- a/command.go +++ b/command.go @@ -88,12 +88,13 @@ const ( // client-side ping finalizer func recvping(cl *Client, res []byte) { + r := cl.conn.RemoteAddr() var s Service _, err := s.UnmarshalMsg(res) if err != nil { + errorf("server at addr %s sent a malformed ping response", r) return } - r := cl.conn.RemoteAddr() s.net = r.Network() s.addr = r.String() cache(&s) @@ -113,6 +114,7 @@ func recvlinks(cl *Client, res []byte) { sl := serviceTable{} _, err := sl.UnmarshalMsg(res) if err != nil { + errorf("server at addr %s sent malformed links: %s", cl.conn.RemoteAddr(), err) return } svcCache.Lock() diff --git a/server.go b/server.go index dbe9381..12ceaab 100644 --- a/server.go +++ b/server.go @@ -94,7 +94,7 @@ func ServeConn(c net.Conn, service string, h Handler) { h: h, remote: c.RemoteAddr(), writing: make(chan *connWrapper, 32), - route: getRoute(c), + route: getRoute(c.RemoteAddr()), } go ch.writeLoop() ch.connLoop() // returns on connection close @@ -140,14 +140,13 @@ const ( routeOSLocal // same machine (unix socket / loopback) ) -func getRoute(c net.Conn) route { - raddr := c.RemoteAddr() +func getRoute(raddr net.Addr) route { nwk := raddr.Network() switch nwk { case "unix": return routeOSLocal case "tcp", "tcp4", "tcp6": - a, err := net.ResolveIPAddr(nwk, raddr.String()) + a, err := net.ResolveTCPAddr(nwk, raddr.String()) if err != nil { return routeUnknown } diff --git a/service.go b/service.go index 2df1936..49d9ee5 100644 --- a/service.go +++ b/service.go @@ -150,9 +150,12 @@ func isRoutable(s *Service) bool { switch s.net { case "tcp", "tcp6", "tcp4": a, err := net.ResolveTCPAddr(s.net, s.addr) - + if err != nil { + errorf("couldn't resolve tcp addr %s: %s", s.addr, err) + return false + } // TODO: link-local addresses - return err == nil && a.IP.IsGlobalUnicast() + return a.IP.IsGlobalUnicast() default: return false } diff --git a/setup_test.go b/setup_test.go index f9111af..c3efde9 100644 --- a/setup_test.go +++ b/setup_test.go @@ -6,6 +6,7 @@ import ( "log" "net" "os" + "sync" "testing" "time" @@ -30,8 +31,32 @@ var ( rt *RouteTable ct testing.T + + loglock sync.Mutex ) +type faillog struct { + t *testing.T +} + +func (f *faillog) Write(b []byte) (int, error) { + loglock.Lock() + f.t.Error("error logged") + i, err := os.Stderr.Write(b) + loglock.Unlock() + return i, err +} + +func fail(t *testing.T) io.Writer { + return &faillog{t} +} + +func setTestLog(t *testing.T) { + loglock.Lock() + ErrorLogger = log.New(fail(t), "synapse-error-log: ", log.LstdFlags) + loglock.Unlock() +} + type testData []byte func (s *testData) MarshalMsg(b []byte) ([]byte, error) { diff --git a/svc-test.bash b/svc-test.bash index fc47324..9aaf08d 100755 --- a/svc-test.bash +++ b/svc-test.bash @@ -1,8 +1,9 @@ #! /bin/bash - +set -e ssts & SSTS_PID=$! sleep 0.1 +set +e sstc RET=$? eval kill -s SIGINT $SSTS_PID || echo "Couldn't kill server." diff --git a/svc_test/sstc/main.go b/svc_test/sstc/main.go index da11194..f90af9a 100644 --- a/svc_test/sstc/main.go +++ b/svc_test/sstc/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "log" "os" "time" @@ -36,6 +37,10 @@ func dumpservices(srv string) { } } +func init() { + synapse.ErrorLogger = log.New(os.Stderr, "syn-client-log: ", log.LstdFlags) +} + func main() { cl, err := synapse.Dial("tcp", *port, 25*time.Millisecond) if err != nil { @@ -43,7 +48,7 @@ func main() { } fmt.Println("client: connected to service", cl.Service()) - err = cl.Call("echo", synapse.String("hello!"), nil) + err = cl.Call(0, synapse.String("hello!"), nil) if err != nil { perror("client: call error:", err) } @@ -70,6 +75,10 @@ func main() { if err != nil { perror("client: couldn't dial socket:", err) } + // give the second client time + // to complete another round of + // list synchronization. + time.Sleep(50 * time.Millisecond) err = cl2.Close() if err != nil { perror("client: close error:", err) diff --git a/svc_test/ssts/main.go b/svc_test/ssts/main.go index a2bc115..a5b35d7 100644 --- a/svc_test/ssts/main.go +++ b/svc_test/ssts/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "log" "net" "os" "os/signal" @@ -44,6 +45,10 @@ func (e echoHandler) ServeCall(req synapse.Request, res synapse.ResponseWriter) } } +func init() { + synapse.ErrorLogger = log.New(os.Stderr, "syn-server-log: ", log.LstdFlags) +} + func main() { go func() { err := synapse.ListenAndServe("tcp", *port, "service-test", echoHandler{})