Skip to content

Commit

Permalink
lsps0: lsps0 server implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
JssDWt committed Aug 11, 2023
1 parent abef3b9 commit ffe7d28
Show file tree
Hide file tree
Showing 7 changed files with 502 additions and 0 deletions.
12 changes: 12 additions & 0 deletions lightning/custom_msg_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package lightning

type CustomMessage struct {
PeerId string
Type uint32
Data []byte
}

type CustomMsgClient interface {
Recv() (*CustomMessage, error)
Send(*CustomMessage) error
}
14 changes: 14 additions & 0 deletions lsps0/codes/code.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package codes

type Code int32

const (
OK Code = 0
Canceled Code = 1
Unknown Code = 2
ParseError Code = -32700
InvalidRequest Code = -32600
MethodNotFound Code = -32601
InvalidParams Code = -32602
InternalError Code = -32603
)
30 changes: 30 additions & 0 deletions lsps0/jsonrpc/jsonrpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package jsonrpc

import "encoding/json"

var Version = "2.0"

type Request struct {
JsonRpc string `json:"jsonrpc"`
Method string `json:"method"`
Id string `json:"id"`
Params json.RawMessage `json:"params"`
}

type Response struct {
JsonRpc string `json:"jsonrpc"`
Id string `json:"id"`
Result json.RawMessage `json:"result"`
}

type Error struct {
JsonRpc string `json:"jsonrpc"`
Id *string `json:"id"`
Error ErrorBody `json:"error"`
}

type ErrorBody struct {
Code int32 `json:"code"`
Message string `json:"message"`
Data json.RawMessage `json:"data,omitempty"`
}
55 changes: 55 additions & 0 deletions lsps0/protocol_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package lsps0

import "context"

type ProtocolServer interface {
ListProtocols(
ctx context.Context,
req *ListProtocolsRequest,
) (*ListProtocolsResponse, error)
}
type protocolServer struct {
protocols []uint32
}

type ListProtocolsRequest struct{}
type ListProtocolsResponse struct {
Protocols []uint32 `json:"protocols"`
}

func NewProtocolServer(supportedProtocols []uint32) ProtocolServer {
return &protocolServer{
protocols: supportedProtocols,
}
}

func (s *protocolServer) ListProtocols(
ctx context.Context,
req *ListProtocolsRequest,
) (*ListProtocolsResponse, error) {
return &ListProtocolsResponse{
Protocols: s.protocols,
}, nil
}

func RegisterProtocolServer(s ServiceRegistrar, p ProtocolServer) {
s.RegisterService(
&ServiceDesc{
ServiceName: "lsps0",
HandlerType: (*ProtocolServer)(nil),
Methods: []MethodDesc{
{
MethodName: "lsps0.listprotocols",
Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(ListProtocolsRequest)
if err := dec(in); err != nil {
return nil, err
}
return srv.(*protocolServer).ListProtocols(ctx, in)
},
},
},
},
p,
)
}
267 changes: 267 additions & 0 deletions lsps0/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package lsps0

import (
"context"
"encoding/json"
"errors"
"log"
"sync"

"github.com/breez/lspd/lightning"
"github.com/breez/lspd/lsps0/codes"
"github.com/breez/lspd/lsps0/jsonrpc"
"github.com/breez/lspd/lsps0/status"
"golang.org/x/exp/slices"
)

var ErrAlreadyServing = errors.New("lsps0: already serving")
var ErrServerStopped = errors.New("lsps0: the server has been stopped")
var Lsps0MessageType uint32 = 37913
var BadMessageFormatError string = "bad message format"
var InternalError string = "internal error"

// ServiceDesc and is constructed from it for internal purposes.
type serviceInfo struct {
// Contains the implementation for the methods in this service.
serviceImpl interface{}
methods map[string]*MethodDesc
}

type methodInfo struct {
service *serviceInfo
method *MethodDesc
}

type Server struct {
mu sync.Mutex
serve bool
services map[string]*serviceInfo
methods map[string]*methodInfo
}

func NewServer() *Server {
return &Server{
services: make(map[string]*serviceInfo),
methods: make(map[string]*methodInfo),
}
}

func (s *Server) Serve(lis lightning.CustomMsgClient) error {
s.mu.Lock()
if s.serve {
return ErrAlreadyServing
}
s.serve = true
s.mu.Unlock()

defer func() {
s.mu.Lock()
s.serve = false
s.mu.Unlock()
}()

// TODO: Add a possibility to break this loop.
for {
msg, err := lis.Recv()
if err != nil {
log.Printf("lsps0 Serve(): Recv() err != nil: %v", err)
continue
// TODO: check whether this is a fatal error. delay until next recv.
}

// Ignore any message that is not an lsps0 message
if msg.Type != Lsps0MessageType {
continue
}

// Make sure there are no 0 bytes
if slices.Contains(msg.Data, 0x00) {
log.Printf("UNUSUAL: Got custom message containing 0 bytes from peer '%s'.", msg.PeerId)
go sendError(lis, msg, nil, status.New(codes.ParseError, BadMessageFormatError))
continue
}

req := new(jsonrpc.Request)
err = json.Unmarshal(msg.Data, req)
if err != nil {
log.Printf("UNUSUAL: Failed to unmarshal custom message from peer '%s': %v", msg.PeerId, err)
go sendError(lis, msg, nil, status.New(codes.ParseError, BadMessageFormatError))
continue
}

// TODO: make sure there is no additional data after parsing the object

if req == nil {
log.Printf("UNUSUAL: req == nil after unmarshal custom message from peer '%s': %v", msg.PeerId, err)
go sendError(lis, msg, nil, status.New(codes.ParseError, BadMessageFormatError))
continue
}

if req.JsonRpc != jsonrpc.Version {
log.Printf("UNUSUAL: jsonrpc version is '%s' in custom message from peer '%s': %v", req.JsonRpc, msg.PeerId, err)
go sendError(lis, msg, req, status.Newf(codes.InvalidRequest, "Expected jsonrpc %s, found %s", jsonrpc.Version, req.JsonRpc))
continue
}

m, ok := s.methods[req.Method]
if !ok {
log.Printf("UNUSUAL: peer '%s' requested method '%s', but it does not exist.", msg.PeerId, req.Method)
go sendError(lis, msg, req, status.New(codes.MethodNotFound, "method not found"))
continue
}

df := func(v interface{}) error {
if err := json.Unmarshal(req.Params, v); err != nil {
return status.Newf(codes.InvalidParams, "invalid params").Err()
}

return nil
}
r, err := m.method.Handler(m.service.serviceImpl, context.TODO(), df)
if err != nil {
s, ok := status.FromError(err)
if !ok {
log.Printf("Internal error when processing custom message '%s' from peer '%s': %v", string(msg.Data), msg.PeerId, err)
s = status.New(codes.InternalError, InternalError)
}

go sendError(lis, msg, req, s)
continue
}

go sendResponse(lis, msg, req, r)
}
}

func sendResponse(
lis lightning.CustomMsgClient,
in *lightning.CustomMessage,
req *jsonrpc.Request,
params interface{},
) {
rd, err := json.Marshal(params)
if err != nil {
log.Printf("Failed to mashal response params '%+v'", params)
sendError(lis, in, req, status.New(codes.InternalError, InternalError))
return
}

resp := &jsonrpc.Response{
JsonRpc: jsonrpc.Version,
Id: req.Id,
Result: rd,
}
res, err := json.Marshal(resp)
if err != nil {
log.Printf("Failed to mashal response '%+v'", resp)
sendError(lis, in, req, status.New(codes.InternalError, InternalError))
return
}

msg := &lightning.CustomMessage{
PeerId: in.PeerId,
Type: Lsps0MessageType,
Data: res,
}

err = lis.Send(msg)
if err != nil {
log.Printf("Failed to send message '%s' to peer '%s': %v", string(msg.Data), msg.PeerId, err)
// TODO: Retry?
return
}
}

func sendError(
lis lightning.CustomMsgClient,
in *lightning.CustomMessage,
req *jsonrpc.Request,
status *status.Status,
) {
var id *string
if req != nil {
id = &req.Id
}
resp := &jsonrpc.Error{
JsonRpc: jsonrpc.Version,
Id: id,
Error: jsonrpc.ErrorBody{
Code: int32(status.Code),
Message: status.Message,
Data: nil,
},
}

res, err := json.Marshal(resp)
if err != nil {
log.Printf("Failed to mashal response '%+v'", resp)
return
}

msg := &lightning.CustomMessage{
PeerId: in.PeerId,
Type: Lsps0MessageType,
Data: res,
}

err = lis.Send(msg)
if err != nil {
log.Printf("Failed to send message '%s' to peer '%s': %v", string(msg.Data), msg.PeerId, err)
return
}
}

func (s *Server) RegisterService(desc *ServiceDesc, impl interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
log.Printf("RegisterService(%q)", desc.ServiceName)
if s.serve {
log.Fatalf("lsps0: Server.RegisterService after Server.Serve for %q", desc.ServiceName)
}
if _, ok := s.services[desc.ServiceName]; ok {
log.Fatalf("lsps0: Server.RegisterService found duplicate service registration for %q", desc.ServiceName)
}
info := &serviceInfo{
serviceImpl: impl,
methods: make(map[string]*MethodDesc),
}
for i := range desc.Methods {
d := &desc.Methods[i]
if _, ok := s.methods[d.MethodName]; ok {
log.Fatalf("lsps0: Server.RegisterService found duplicate method registration for %q", d.MethodName)
}
info.methods[d.MethodName] = d
s.methods[d.MethodName] = &methodInfo{
service: info,
method: d,
}
}
s.services[desc.ServiceName] = info
}

type ServiceDesc struct {
ServiceName string
// The pointer to the service interface. Used to check whether the user
// provided implementation satisfies the interface requirements.
HandlerType interface{}
Methods []MethodDesc
}

type MethodDesc struct {
MethodName string
Handler methodHandler
}

type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error)

// ServiceRegistrar wraps a single method that supports service registration. It
// enables users to pass concrete types other than grpc.Server to the service
// registration methods exported by the IDL generated code.
type ServiceRegistrar interface {
// RegisterService registers a service and its implementation to the
// concrete type implementing this interface. It may not be called
// once the server has started serving.
// desc describes the service and its methods and handlers. impl is the
// service implementation which is passed to the method handlers.
RegisterService(desc *ServiceDesc, impl interface{})
}
Loading

0 comments on commit ffe7d28

Please sign in to comment.