@@ -3,7 +3,6 @@ package streaming
3
3
import (
4
4
"fmt"
5
5
"sync"
6
- "sync/atomic"
7
6
"time"
8
7
9
8
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
@@ -56,8 +55,8 @@ type FullNodeStreamingManagerImpl struct {
56
55
type OrderbookSubscription struct {
57
56
subscriptionId uint32
58
57
59
- // Whether the subscription is initialized with snapshot .
60
- initialized * atomic. Bool
58
+ // Initialize the subscription with orderbook snapshots .
59
+ initialize * sync. Once
61
60
62
61
// Clob pair ids to subscribe to.
63
62
clobPairIds []uint32
@@ -76,10 +75,6 @@ type OrderbookSubscription struct {
76
75
nextSnapshotBlock uint32
77
76
}
78
77
79
- func (sub * OrderbookSubscription ) IsInitialized () bool {
80
- return sub .initialized .Load ()
81
- }
82
-
83
78
func NewFullNodeStreamingManager (
84
79
logger log.Logger ,
85
80
flushIntervalMs uint32 ,
@@ -164,7 +159,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
164
159
}
165
160
subscription := & OrderbookSubscription {
166
161
subscriptionId : sm .nextSubscriptionId ,
167
- initialized : & atomic. Bool {}, // False by default.
162
+ initialize : & sync. Once {},
168
163
clobPairIds : clobPairIds ,
169
164
subaccountIds : sIds ,
170
165
messageSender : messageSender ,
@@ -516,23 +511,19 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
516
511
)
517
512
}
518
513
519
- // SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
514
+ // SendSubaccountUpdates groups subaccount updates by their subaccount ids and
520
515
// sends messages to the subscribers.
521
- func (sm * FullNodeStreamingManagerImpl ) SendFinalizedSubaccountUpdates (
516
+ func (sm * FullNodeStreamingManagerImpl ) SendSubaccountUpdates (
522
517
subaccountUpdates []satypes.StreamSubaccountUpdate ,
523
518
blockHeight uint32 ,
524
519
execMode sdk.ExecMode ,
525
520
) {
526
521
defer metrics .ModuleMeasureSince (
527
522
metrics .FullNodeGrpc ,
528
- metrics .GrpcSendFinalizedSubaccountUpdatesLatency ,
523
+ metrics .GrpcSendSubaccountUpdatesLatency ,
529
524
time .Now (),
530
525
)
531
526
532
- if execMode != sdk .ExecModeFinalize {
533
- panic ("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize" )
534
- }
535
-
536
527
// Group subaccount updates by subaccount id.
537
528
streamUpdates := make ([]clobtypes.StreamUpdate , 0 )
538
529
subaccountIds := make ([]* satypes.SubaccountId , 0 )
@@ -679,33 +670,9 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
679
670
sm .EmitMetrics ()
680
671
}
681
672
682
- func (sm * FullNodeStreamingManagerImpl ) GetSubaccountSnapshotsForInitStreams (
683
- getSubaccountSnapshot func (subaccountId satypes.SubaccountId ) * satypes.StreamSubaccountUpdate ,
684
- ) map [satypes.SubaccountId ]* satypes.StreamSubaccountUpdate {
685
- sm .Lock ()
686
- defer sm .Unlock ()
687
-
688
- ret := make (map [satypes.SubaccountId ]* satypes.StreamSubaccountUpdate )
689
- for _ , subscription := range sm .orderbookSubscriptions {
690
- // If the subscription has been initialized, no need to grab the subaccount snapshot.
691
- if alreadyInitialized := subscription .initialized .Load (); alreadyInitialized {
692
- continue
693
- }
694
-
695
- for _ , subaccountId := range subscription .subaccountIds {
696
- if _ , exists := ret [subaccountId ]; exists {
697
- continue
698
- }
699
-
700
- ret [subaccountId ] = getSubaccountSnapshot (subaccountId )
701
- }
702
- }
703
- return ret
704
- }
705
-
706
673
func (sm * FullNodeStreamingManagerImpl ) InitializeNewStreams (
707
674
getOrderbookSnapshot func (clobPairId clobtypes.ClobPairId ) * clobtypes.OffchainUpdates ,
708
- subaccountSnapshots map [ satypes.SubaccountId ] * satypes.StreamSubaccountUpdate ,
675
+ getSubaccountSnapshot func ( subaccountId satypes.SubaccountId ) * satypes.StreamSubaccountUpdate ,
709
676
blockHeight uint32 ,
710
677
execMode sdk.ExecMode ,
711
678
) {
@@ -719,40 +686,31 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
719
686
updatesByClobPairId := make (map [uint32 ]* clobtypes.OffchainUpdates )
720
687
721
688
for subscriptionId , subscription := range sm .orderbookSubscriptions {
722
- if alreadyInitialized := subscription .initialized .Swap (true ); ! alreadyInitialized {
723
- allUpdates := clobtypes .NewOffchainUpdates ()
724
- for _ , clobPairId := range subscription .clobPairIds {
725
- if _ , ok := updatesByClobPairId [clobPairId ]; ! ok {
726
- updatesByClobPairId [clobPairId ] = getOrderbookSnapshot (clobtypes .ClobPairId (clobPairId ))
727
- }
728
- allUpdates .Append (updatesByClobPairId [clobPairId ])
729
- }
730
-
731
- saUpdates := []* satypes.StreamSubaccountUpdate {}
732
- for _ , subaccountId := range subscription .subaccountIds {
733
- // The subaccount snapshot may not exist due to the following race condition
734
- // 1. At beginning of PrepareCheckState we get snapshot for all subscribed subaccounts.
735
- // 2. A new subaccount is subscribed to by a new subscription.
736
- // 3. InitializeNewStreams is called.
737
- // Then the new subaccount would not be included in the snapshot.
738
- // We are okay with this behavior.
739
- if saUpdate , ok := subaccountSnapshots [subaccountId ]; ok {
740
- saUpdates = append (saUpdates , saUpdate )
741
- }
742
- }
743
-
744
- sm .SendCombinedSnapshot (allUpdates , saUpdates , subscriptionId , blockHeight , execMode )
745
-
746
- if sm .snapshotBlockInterval != 0 {
747
- subscription .nextSnapshotBlock = blockHeight + sm .snapshotBlockInterval
748
- }
749
- }
750
-
751
- // If the snapshot block interval is enabled and the next block is a snapshot block,
752
- // reset the `atomic.Bool` so snapshots are sent for the next block.
689
+ // If the snapshot block interval is enabled, reset the sync.Once in order to
690
+ // re-send snapshots out.
753
691
if sm .snapshotBlockInterval > 0 &&
754
- blockHeight + 1 == subscription .nextSnapshotBlock {
755
- subscription .initialized = & atomic. Bool {} // False by default.
692
+ blockHeight == subscription .nextSnapshotBlock {
693
+ subscription .initialize = & sync. Once {}
756
694
}
695
+
696
+ subscription .initialize .Do (
697
+ func () {
698
+ allUpdates := clobtypes .NewOffchainUpdates ()
699
+ for _ , clobPairId := range subscription .clobPairIds {
700
+ if _ , ok := updatesByClobPairId [clobPairId ]; ! ok {
701
+ updatesByClobPairId [clobPairId ] = getOrderbookSnapshot (clobtypes .ClobPairId (clobPairId ))
702
+ }
703
+ allUpdates .Append (updatesByClobPairId [clobPairId ])
704
+ }
705
+ saUpdates := []* satypes.StreamSubaccountUpdate {}
706
+ for _ , subaccountId := range subscription .subaccountIds {
707
+ saUpdates = append (saUpdates , getSubaccountSnapshot (subaccountId ))
708
+ }
709
+ sm .SendCombinedSnapshot (allUpdates , saUpdates , subscriptionId , blockHeight , execMode )
710
+ if sm .snapshotBlockInterval != 0 {
711
+ subscription .nextSnapshotBlock = blockHeight + sm .snapshotBlockInterval
712
+ }
713
+ },
714
+ )
757
715
}
758
716
}
0 commit comments