diff --git a/lightning/custom_msg_client.go b/lightning/custom_msg_client.go new file mode 100644 index 00000000..c34ccfa3 --- /dev/null +++ b/lightning/custom_msg_client.go @@ -0,0 +1,12 @@ +package lightning + +type CustomMessage struct { + PeerId string + Type uint32 + Data []byte +} + +type CustomMsgClient interface { + Recv() (*CustomMessage, error) + Send(*CustomMessage) error +} diff --git a/lsps0/codes/code.go b/lsps0/codes/code.go new file mode 100644 index 00000000..46e0797f --- /dev/null +++ b/lsps0/codes/code.go @@ -0,0 +1,14 @@ +package codes + +type Code int32 + +const ( + OK Code = 0 + Canceled Code = 1 + Unknown Code = 2 + ParseError Code = -32700 + InvalidRequest Code = -32600 + MethodNotFound Code = -32601 + InvalidParams Code = -32602 + InternalError Code = -32603 +) diff --git a/lsps0/jsonrpc/jsonrpc.go b/lsps0/jsonrpc/jsonrpc.go new file mode 100644 index 00000000..0812cb42 --- /dev/null +++ b/lsps0/jsonrpc/jsonrpc.go @@ -0,0 +1,30 @@ +package jsonrpc + +import "encoding/json" + +var Version = "2.0" + +type Request struct { + JsonRpc string `json:"jsonrpc"` + Method string `json:"method"` + Id string `json:"id"` + Params json.RawMessage `json:"params"` +} + +type Response struct { + JsonRpc string `json:"jsonrpc"` + Id string `json:"id"` + Result json.RawMessage `json:"result"` +} + +type Error struct { + JsonRpc string `json:"jsonrpc"` + Id *string `json:"id"` + Error ErrorBody `json:"error"` +} + +type ErrorBody struct { + Code int32 `json:"code"` + Message string `json:"message"` + Data json.RawMessage `json:"data,omitempty"` +} diff --git a/lsps0/protocol_server.go b/lsps0/protocol_server.go new file mode 100644 index 00000000..f44d1b82 --- /dev/null +++ b/lsps0/protocol_server.go @@ -0,0 +1,55 @@ +package lsps0 + +import "context" + +type ProtocolServer interface { + ListProtocols( + ctx context.Context, + req *ListProtocolsRequest, + ) (*ListProtocolsResponse, error) +} +type protocolServer struct { + protocols []uint32 +} + +type ListProtocolsRequest struct{} +type ListProtocolsResponse struct { + Protocols []uint32 `json:"protocols"` +} + +func NewProtocolServer(supportedProtocols []uint32) ProtocolServer { + return &protocolServer{ + protocols: supportedProtocols, + } +} + +func (s *protocolServer) ListProtocols( + ctx context.Context, + req *ListProtocolsRequest, +) (*ListProtocolsResponse, error) { + return &ListProtocolsResponse{ + Protocols: s.protocols, + }, nil +} + +func RegisterProtocolServer(s ServiceRegistrar, p ProtocolServer) { + s.RegisterService( + &ServiceDesc{ + ServiceName: "lsps0", + HandlerType: (*ProtocolServer)(nil), + Methods: []MethodDesc{ + { + MethodName: "lsps0.listprotocols", + Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { + in := new(ListProtocolsRequest) + if err := dec(in); err != nil { + return nil, err + } + return srv.(*protocolServer).ListProtocols(ctx, in) + }, + }, + }, + }, + p, + ) +} diff --git a/lsps0/server.go b/lsps0/server.go new file mode 100644 index 00000000..55a8858a --- /dev/null +++ b/lsps0/server.go @@ -0,0 +1,267 @@ +package lsps0 + +import ( + "context" + "encoding/json" + "errors" + "log" + "sync" + + "github.com/breez/lspd/lightning" + "github.com/breez/lspd/lsps0/codes" + "github.com/breez/lspd/lsps0/jsonrpc" + "github.com/breez/lspd/lsps0/status" + "golang.org/x/exp/slices" +) + +var ErrAlreadyServing = errors.New("lsps0: already serving") +var ErrServerStopped = errors.New("lsps0: the server has been stopped") +var Lsps0MessageType uint32 = 37913 +var BadMessageFormatError string = "bad message format" +var InternalError string = "internal error" + +// ServiceDesc and is constructed from it for internal purposes. +type serviceInfo struct { + // Contains the implementation for the methods in this service. + serviceImpl interface{} + methods map[string]*MethodDesc +} + +type methodInfo struct { + service *serviceInfo + method *MethodDesc +} + +type Server struct { + mu sync.Mutex + serve bool + services map[string]*serviceInfo + methods map[string]*methodInfo +} + +func NewServer() *Server { + return &Server{ + services: make(map[string]*serviceInfo), + methods: make(map[string]*methodInfo), + } +} + +func (s *Server) Serve(lis lightning.CustomMsgClient) error { + s.mu.Lock() + if s.serve { + return ErrAlreadyServing + } + s.serve = true + s.mu.Unlock() + + defer func() { + s.mu.Lock() + s.serve = false + s.mu.Unlock() + }() + + // TODO: Add a possibility to break this loop. + for { + msg, err := lis.Recv() + if err != nil { + log.Printf("lsps0 Serve(): Recv() err != nil: %v", err) + continue + // TODO: check whether this is a fatal error. delay until next recv. + } + + // Ignore any message that is not an lsps0 message + if msg.Type != Lsps0MessageType { + continue + } + + // Make sure there are no 0 bytes + if slices.Contains(msg.Data, 0x00) { + log.Printf("UNUSUAL: Got custom message containing 0 bytes from peer '%s'.", msg.PeerId) + go sendError(lis, msg, nil, status.New(codes.ParseError, BadMessageFormatError)) + continue + } + + req := new(jsonrpc.Request) + err = json.Unmarshal(msg.Data, req) + if err != nil { + log.Printf("UNUSUAL: Failed to unmarshal custom message from peer '%s': %v", msg.PeerId, err) + go sendError(lis, msg, nil, status.New(codes.ParseError, BadMessageFormatError)) + continue + } + + // TODO: make sure there is no additional data after parsing the object + + if req == nil { + log.Printf("UNUSUAL: req == nil after unmarshal custom message from peer '%s': %v", msg.PeerId, err) + go sendError(lis, msg, nil, status.New(codes.ParseError, BadMessageFormatError)) + continue + } + + if req.JsonRpc != jsonrpc.Version { + log.Printf("UNUSUAL: jsonrpc version is '%s' in custom message from peer '%s': %v", req.JsonRpc, msg.PeerId, err) + go sendError(lis, msg, req, status.Newf(codes.InvalidRequest, "Expected jsonrpc %s, found %s", jsonrpc.Version, req.JsonRpc)) + continue + } + + m, ok := s.methods[req.Method] + if !ok { + log.Printf("UNUSUAL: peer '%s' requested method '%s', but it does not exist.", msg.PeerId, req.Method) + go sendError(lis, msg, req, status.New(codes.MethodNotFound, "method not found")) + continue + } + + df := func(v interface{}) error { + if err := json.Unmarshal(req.Params, v); err != nil { + return status.Newf(codes.InvalidParams, "invalid params").Err() + } + + return nil + } + r, err := m.method.Handler(m.service.serviceImpl, context.TODO(), df) + if err != nil { + s, ok := status.FromError(err) + if !ok { + log.Printf("Internal error when processing custom message '%s' from peer '%s': %v", string(msg.Data), msg.PeerId, err) + s = status.New(codes.InternalError, InternalError) + } + + go sendError(lis, msg, req, s) + continue + } + + go sendResponse(lis, msg, req, r) + } +} + +func sendResponse( + lis lightning.CustomMsgClient, + in *lightning.CustomMessage, + req *jsonrpc.Request, + params interface{}, +) { + rd, err := json.Marshal(params) + if err != nil { + log.Printf("Failed to mashal response params '%+v'", params) + sendError(lis, in, req, status.New(codes.InternalError, InternalError)) + return + } + + resp := &jsonrpc.Response{ + JsonRpc: jsonrpc.Version, + Id: req.Id, + Result: rd, + } + res, err := json.Marshal(resp) + if err != nil { + log.Printf("Failed to mashal response '%+v'", resp) + sendError(lis, in, req, status.New(codes.InternalError, InternalError)) + return + } + + msg := &lightning.CustomMessage{ + PeerId: in.PeerId, + Type: Lsps0MessageType, + Data: res, + } + + err = lis.Send(msg) + if err != nil { + log.Printf("Failed to send message '%s' to peer '%s': %v", string(msg.Data), msg.PeerId, err) + // TODO: Retry? + return + } +} + +func sendError( + lis lightning.CustomMsgClient, + in *lightning.CustomMessage, + req *jsonrpc.Request, + status *status.Status, +) { + var id *string + if req != nil { + id = &req.Id + } + resp := &jsonrpc.Error{ + JsonRpc: jsonrpc.Version, + Id: id, + Error: jsonrpc.ErrorBody{ + Code: int32(status.Code), + Message: status.Message, + Data: nil, + }, + } + + res, err := json.Marshal(resp) + if err != nil { + log.Printf("Failed to mashal response '%+v'", resp) + return + } + + msg := &lightning.CustomMessage{ + PeerId: in.PeerId, + Type: Lsps0MessageType, + Data: res, + } + + err = lis.Send(msg) + if err != nil { + log.Printf("Failed to send message '%s' to peer '%s': %v", string(msg.Data), msg.PeerId, err) + return + } +} + +func (s *Server) RegisterService(desc *ServiceDesc, impl interface{}) { + s.mu.Lock() + defer s.mu.Unlock() + log.Printf("RegisterService(%q)", desc.ServiceName) + if s.serve { + log.Fatalf("lsps0: Server.RegisterService after Server.Serve for %q", desc.ServiceName) + } + if _, ok := s.services[desc.ServiceName]; ok { + log.Fatalf("lsps0: Server.RegisterService found duplicate service registration for %q", desc.ServiceName) + } + info := &serviceInfo{ + serviceImpl: impl, + methods: make(map[string]*MethodDesc), + } + for i := range desc.Methods { + d := &desc.Methods[i] + if _, ok := s.methods[d.MethodName]; ok { + log.Fatalf("lsps0: Server.RegisterService found duplicate method registration for %q", d.MethodName) + } + info.methods[d.MethodName] = d + s.methods[d.MethodName] = &methodInfo{ + service: info, + method: d, + } + } + s.services[desc.ServiceName] = info +} + +type ServiceDesc struct { + ServiceName string + // The pointer to the service interface. Used to check whether the user + // provided implementation satisfies the interface requirements. + HandlerType interface{} + Methods []MethodDesc +} + +type MethodDesc struct { + MethodName string + Handler methodHandler +} + +type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) + +// ServiceRegistrar wraps a single method that supports service registration. It +// enables users to pass concrete types other than grpc.Server to the service +// registration methods exported by the IDL generated code. +type ServiceRegistrar interface { + // RegisterService registers a service and its implementation to the + // concrete type implementing this interface. It may not be called + // once the server has started serving. + // desc describes the service and its methods and handlers. impl is the + // service implementation which is passed to the method handlers. + RegisterService(desc *ServiceDesc, impl interface{}) +} diff --git a/lsps0/setup_test.go b/lsps0/setup_test.go new file mode 100644 index 00000000..6a6717ba --- /dev/null +++ b/lsps0/setup_test.go @@ -0,0 +1,62 @@ +package lsps0 + +import ( + "log" + "testing" + + "github.com/breez/lspd/lightning" +) + +type MockLightningImpl struct { + lightning.CustomMsgClient + msg *lightning.CustomMessage + err error + resp chan *lightning.CustomMessage +} + +func newMock(msg *lightning.CustomMessage, err error) *MockLightningImpl { + return &MockLightningImpl{ + msg: msg, + err: err, + resp: make(chan *lightning.CustomMessage), + } +} + +func (m *MockLightningImpl) Recv() (*lightning.CustomMessage, error) { + if m.msg != nil { + return m.msg, nil + } + + return nil, m.err +} + +func (m *MockLightningImpl) Send(c *lightning.CustomMessage) error { + m.resp <- c + return nil +} + +func TestSetup(t *testing.T) { + srv := NewServer() + pSrv := &protocolServer{} + RegisterProtocolServer(srv, pSrv) + rawMsg := `{ + "method": "lsps0.listprotocols", + "jsonrpc": "2.0", + "id": "example#3cad6a54d302edba4c9ade2f7ffac098", + "params": {} + }` + mock := newMock( + &lightning.CustomMessage{ + PeerId: "AAA", + Type: 37913, + Data: []byte(rawMsg), + }, + nil, + ) + + go srv.Serve(mock) + + resp := <-mock.resp + log.Printf("%+v", resp) + log.Print(string(resp.Data)) +} diff --git a/lsps0/status/status.go b/lsps0/status/status.go new file mode 100644 index 00000000..19ac0515 --- /dev/null +++ b/lsps0/status/status.go @@ -0,0 +1,62 @@ +package status + +import ( + "fmt" + + "github.com/breez/lspd/lsps0/codes" +) + +type Status struct { + Code codes.Code + Message string +} + +func New(c codes.Code, msg string) *Status { + return &Status{Code: c, Message: msg} +} + +func Newf(c codes.Code, format string, a ...interface{}) *Status { + return New(c, fmt.Sprintf(format, a...)) +} + +func FromError(err error) (s *Status, ok bool) { + if err == nil { + return nil, true + } + if se, ok := err.(interface { + Status() *Status + }); ok { + return se.Status(), true + } + return New(codes.Unknown, err.Error()), false +} + +// Convert is a convenience function which removes the need to handle the +// boolean return value from FromError. +func Convert(err error) *Status { + s, _ := FromError(err) + return s +} + +func (s *Status) Err() error { + if s.Code == codes.OK { + return nil + } + return &Error{s: s} +} + +func (s *Status) String() string { + return fmt.Sprintf("lsps0 error: code = %d desc = %s", int32(s.Code), s.Message) +} + +type Error struct { + s *Status +} + +func (e *Error) Error() string { + return e.s.String() +} + +func (e *Error) Lsps0Status() *Status { + return e.s +}