-
Notifications
You must be signed in to change notification settings - Fork 88
rpcclient: manage subscriptions sequentially #3893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
So if the idea of this PR is right, then an alternative (which looks simpler to me) is: anna@kiwi:~/Documents/GitProjects/nspcc-dev/neo-go$ git diff
diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go
index 887e6aa59..2119ef978 100644
--- a/pkg/rpcclient/wsclient.go
+++ b/pkg/rpcclient/wsclient.go
@@ -71,8 +71,9 @@ type WSClient struct {
closeErrLock sync.RWMutex
closeErr error
- subscriptionsLock sync.RWMutex
- subscriptions map[string]notificationReceiver
+ subscriptionsOrderLock sync.Mutex
+ subscriptionsLock sync.RWMutex
+ subscriptions map[string]notificationReceiver
// receivers is a mapping from receiver channel to a set of corresponding subscription IDs.
// It must be accessed with subscriptionsLock taken. Its keys must be used to deliver
// notifications, if channel is not in the receivers list and corresponding subscription
@@ -783,6 +784,9 @@ func (c *WSClient) performSubscription(params []any, rcvr notificationReceiver)
return "", err
}
}
+
+ c.subscriptionsOrderLock.Lock()
+ defer c.subscriptionsOrderLock.Unlock()
if err := c.performRequest("subscribe", params, &resp); err != nil {
return "", err
}
@@ -972,13 +976,8 @@ func (c *WSClient) UnsubscribeAll() error {
// after WS RPC unsubscription request is completed. Until then the subscriber channel
// may still receive WS notifications.
func (c *WSClient) performUnsubscription(id string) error {
- c.subscriptionsLock.RLock()
- rcvrWas, ok := c.subscriptions[id]
- c.subscriptionsLock.RUnlock()
-
- if !ok {
- return errors.New("no subscription with this ID")
- }
+ c.subscriptionsOrderLock.Lock()
+ defer c.subscriptionsOrderLock.Unlock()
var resp bool
if err := c.performRequest("unsubscribe", []any{id}, &resp); err != nil {
@@ -996,14 +995,6 @@ func (c *WSClient) performUnsubscription(id string) error {
return errors.New("no subscription with this ID")
}
- cleanUpSubscriptions := true
- if rcvrWas.Receiver() != rcvr.Receiver() {
- // concurrent subscription has been done and been overwritten; this
- // is not this routine's subscription, cleanup only receivers map
- rcvr = rcvrWas
- cleanUpSubscriptions = false
- }
-
ch := rcvr.Receiver()
ids := c.receivers[ch]
for i, rcvrID := range ids {
@@ -1017,9 +1008,7 @@ func (c *WSClient) performUnsubscription(id string) error {
} else {
c.receivers[ch] = ids
}
- if cleanUpSubscriptions {
- delete(c.subscriptions, id)
- }
+ delete(c.subscriptions, id)
return nil
} |
Let's go with the simplest solution for now (aka mutex), we can get back to the wsReader one later if needed (it can handle concurrent sub/unsub requests more efficiently). |
50ea3c5
to
ab70af5
Compare
Ready for review. |
Linter is failing, but not sue to changes in this PR:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a sequential processing mechanism for WebSocket subscription and unsubscription requests to avoid server-side subscription ID conflicts.
- Added subscriptionsOrderLock in WSClient to manage ordered subscribe/unsubscribe processing.
- Wrapped performSubscription and performUnsubscription operations with the new mutex for sequential execution.
- Removed redundant receiver comparison logic in performUnsubscription.
Close #3093. Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #3893 +/- ##
==========================================
+ Coverage 82.66% 82.67% +0.01%
==========================================
Files 342 342
Lines 48349 48347 -2
==========================================
+ Hits 39967 39973 +6
+ Misses 6732 6726 -6
+ Partials 1650 1648 -2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Close #3093.
@roman-khimov, it's the best option that I can offer based on #3093 (comment) since I still don't understand the source of the problem. The same effect can be reached with additional mutex without any
subscriptionDispatcher
routine, so probably I'm doing something wrong, need your review.Also, I'm still trying to reproduce the original problem locally with privnet.