Skip to content

Commit

Permalink
cln_plugin: rename htlc related fields/functions
Browse files Browse the repository at this point in the history
  • Loading branch information
JssDWt committed Aug 10, 2023
1 parent 72c2b47 commit 0df936d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 34 deletions.
12 changes: 7 additions & 5 deletions cln_plugin/cln_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ func (c *ClnPlugin) listenRequests() error {
}

// Listens to responses to htlc_accepted requests from the grpc server.
func (c *ClnPlugin) listenServer() {
func (c *ClnPlugin) htlcListenServer() {
for {
select {
case <-c.done:
return
default:
id, result := c.server.Receive()
id, result := c.server.ReceiveHtlcResolution()

// The server may return nil if it is stopped.
if result == nil {
Expand Down Expand Up @@ -227,6 +227,8 @@ func (c *ClnPlugin) processRequest(request *Request) {
})
case "setchannelacceptscript":
c.handleSetChannelAcceptScript(request)
case "custommsg":
c.handleCustomMsg(request)
default:
c.sendError(
request.Id,
Expand Down Expand Up @@ -404,8 +406,8 @@ func (c *ClnPlugin) handleInit(request *Request) {
return
}

// Listen for responses from the grpc server.
go c.listenServer()
// Listen for htlc responses from the grpc server.
go c.htlcListenServer()

// Let cln know the plugin is initialized.
c.sendToCln(&Response{
Expand Down Expand Up @@ -436,7 +438,7 @@ func (c *ClnPlugin) handleHtlcAccepted(request *Request) {
return
}

c.server.Send(idToString(request.Id), &htlc)
c.server.SendHtlcAccepted(idToString(request.Id), &htlc)
}

func (c *ClnPlugin) handleSetChannelAcceptScript(request *Request) {
Expand Down
58 changes: 29 additions & 29 deletions cln_plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ type server struct {
subscriberTimeout time.Duration
grpcServer *grpc.Server
mtx sync.Mutex
stream proto.ClnPlugin_HtlcStreamServer
newSubscriber chan struct{}
started chan struct{}
done chan struct{}
completed chan struct{}
startError chan error
sendQueue chan *htlcAcceptedMsg
recvQueue chan *htlcResultMsg
htlcnewSubscriber chan struct{}
htlcStream proto.ClnPlugin_HtlcStreamServer
htlcSendQueue chan *htlcAcceptedMsg
htlcRecvQueue chan *htlcResultMsg
}

// Creates a new grpc server
Expand All @@ -48,13 +48,13 @@ func NewServer(listenAddress string, subscriberTimeout time.Duration) *server {
listenAddress: listenAddress,
subscriberTimeout: subscriberTimeout,
// The send queue exists to buffer messages until a subscriber is active.
sendQueue: make(chan *htlcAcceptedMsg, 10000),
htlcSendQueue: make(chan *htlcAcceptedMsg, 10000),
// The receive queue exists mainly to allow returning timeouts to the
// cln plugin. If there is no subscriber active within the subscriber
// timeout period these results can be put directly on the receive queue.
recvQueue: make(chan *htlcResultMsg, 10000),
started: make(chan struct{}),
startError: make(chan error, 1),
htlcRecvQueue: make(chan *htlcResultMsg, 10000),
started: make(chan struct{}),
startError: make(chan error, 1),
}
}

Expand All @@ -78,7 +78,7 @@ func (s *server) Start() error {

s.done = make(chan struct{})
s.completed = make(chan struct{})
s.newSubscriber = make(chan struct{})
s.htlcnewSubscriber = make(chan struct{})
s.grpcServer = grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: time.Duration(1) * time.Second,
Expand Down Expand Up @@ -132,7 +132,7 @@ func (s *server) Stop() {
// from or to the subscriber, the subscription is closed.
func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
s.mtx.Lock()
if s.stream == nil {
if s.htlcStream == nil {
log.Printf("Got a new HTLC stream subscription request.")
} else {
s.mtx.Unlock()
Expand All @@ -141,28 +141,28 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
return fmt.Errorf("already subscribed")
}

s.stream = stream
s.htlcStream = stream

// Notify listeners that a new subscriber is active. Replace the chan with
// a new one immediately in case this subscriber is dropped later.
close(s.newSubscriber)
s.newSubscriber = make(chan struct{})
close(s.htlcnewSubscriber)
s.htlcnewSubscriber = make(chan struct{})
s.mtx.Unlock()

<-stream.Context().Done()
log.Printf("HtlcStream context is done. Return: %v", stream.Context().Err())

// Remove the subscriber.
s.mtx.Lock()
s.stream = nil
s.htlcStream = nil
s.mtx.Unlock()

return stream.Context().Err()
}

// Enqueues a htlc_accepted message for send to the grpc client.
func (s *server) Send(id string, h *HtlcAccepted) {
s.sendQueue <- &htlcAcceptedMsg{
func (s *server) SendHtlcAccepted(id string, h *HtlcAccepted) {
s.htlcSendQueue <- &htlcAcceptedMsg{
id: id,
htlc: h,
timeout: time.Now().Add(s.subscriberTimeout),
Expand All @@ -173,11 +173,11 @@ func (s *server) Send(id string, h *HtlcAccepted) {
// and message. Blocks until a message is available. Returns a nil message if
// the server is done. This function effectively waits until a subscriber is
// active and has sent a message.
func (s *server) Receive() (string, interface{}) {
func (s *server) ReceiveHtlcResolution() (string, interface{}) {
select {
case <-s.done:
return "", nil
case msg := <-s.recvQueue:
case msg := <-s.htlcRecvQueue:
return msg.id, msg.result
}
}
Expand All @@ -191,7 +191,7 @@ func (s *server) listenHtlcRequests() {
case <-s.done:
log.Printf("listenHtlcRequests received done. Stop listening.")
return
case msg := <-s.sendQueue:
case msg := <-s.htlcSendQueue:
s.handleHtlcAccepted(msg)
}
}
Expand All @@ -202,8 +202,8 @@ func (s *server) listenHtlcRequests() {
func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) {
for {
s.mtx.Lock()
stream := s.stream
ns := s.newSubscriber
stream := s.htlcStream
ns := s.htlcnewSubscriber
s.mtx.Unlock()

// If there is no active subscription, wait until there is a new
Expand All @@ -228,7 +228,7 @@ func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) {
// If the subscriber timeout expires while holding the htlc
// we short circuit the htlc by sending the default result
// (continue) to cln.
s.recvQueue <- &htlcResultMsg{
s.htlcRecvQueue <- &htlcResultMsg{
id: msg.id,
result: s.defaultResult(),
}
Expand Down Expand Up @@ -283,10 +283,10 @@ func (s *server) listenHtlcResponses() {
log.Printf("listenHtlcResponses received done. Stopping listening.")
return
default:
resp := s.recv()
s.recvQueue <- &htlcResultMsg{
resp := s.recvHtlcResolution()
s.htlcRecvQueue <- &htlcResultMsg{
id: resp.Correlationid,
result: s.mapResult(resp.Outcome),
result: s.mapHtlcResult(resp.Outcome),
}
}
}
Expand All @@ -295,14 +295,14 @@ func (s *server) listenHtlcResponses() {
// Helper function that blocks until a message from a grpc client is received
// or the server stops. Either returns a received message, or nil if the server
// has stopped.
func (s *server) recv() *proto.HtlcResolution {
func (s *server) recvHtlcResolution() *proto.HtlcResolution {
for {
// make a copy of the used fields, to make sure state updates don't
// surprise us. The newSubscriber chan is swapped whenever a new
// subscriber arrives.
s.mtx.Lock()
stream := s.stream
ns := s.newSubscriber
stream := s.htlcStream
ns := s.htlcnewSubscriber
s.mtx.Unlock()

if stream == nil {
Expand Down Expand Up @@ -336,7 +336,7 @@ func (s *server) recv() *proto.HtlcResolution {

// Maps a grpc result to the corresponding result for cln. The cln message
// is a raw json message, so it's easiest to use a map directly.
func (s *server) mapResult(outcome interface{}) interface{} {
func (s *server) mapHtlcResult(outcome interface{}) interface{} {
// result: continue
cont, ok := outcome.(*proto.HtlcResolution_Continue)
if ok {
Expand Down

0 comments on commit 0df936d

Please sign in to comment.