diff --git a/blockchain/event_filter.go b/blockchain/event_filter.go index 09ea67437e..6dea500478 100644 --- a/blockchain/event_filter.go +++ b/blockchain/event_filter.go @@ -109,6 +109,13 @@ type FilteredEvent struct { TransactionHash *felt.Felt } +func (f *FilteredEvent) Equal(other *FilteredEvent) bool { + return f.BlockNumber == other.BlockNumber && + f.BlockHash.Equal(other.BlockHash) && + f.TransactionHash.Equal(other.TransactionHash) && + f.Event.Equal(other.Event) +} + //nolint:gocyclo func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*FilteredEvent, *ContinuationToken, error) { var matchedEvents []*FilteredEvent diff --git a/core/transaction.go b/core/transaction.go index 38d2fe67cf..fd1fbe23b3 100644 --- a/core/transaction.go +++ b/core/transaction.go @@ -80,6 +80,12 @@ type Event struct { Keys []*felt.Felt } +func (e *Event) Equal(other *Event) bool { + return slices.Equal(e.Data, other.Data) && + e.From.Equal(other.From) && + slices.Equal(e.Keys, other.Keys) +} + type L1ToL2Message struct { From common.Address Nonce *felt.Felt diff --git a/jsonrpc/websocket.go b/jsonrpc/websocket.go index 972c0accac..d3b222454f 100644 --- a/jsonrpc/websocket.go +++ b/jsonrpc/websocket.go @@ -44,8 +44,8 @@ func NewWebsocket(rpc *Server, shutdown <-chan struct{}, log utils.SimpleLogger) } // WithMaxConnections sets the maximum number of concurrent websocket connections -func (ws *Websocket) WithMaxConnections(max int) *Websocket { - ws.maxConns = max +func (ws *Websocket) WithMaxConnections(maxConns int) *Websocket { + ws.maxConns = maxConns return ws } diff --git a/jsonrpc/websocket_test.go b/jsonrpc/websocket_test.go index 27e299759e..aaad4707d6 100644 --- a/jsonrpc/websocket_test.go +++ b/jsonrpc/websocket_test.go @@ -102,19 +102,19 @@ func TestWebsocketConnectionLimit(t *testing.T) { defer httpSrv.Close() // First connection should succeed - conn1, resp1, err := websocket.Dial(context.Background(), httpSrv.URL, nil) + conn1, resp1, err := websocket.Dial(context.Background(), httpSrv.URL, nil) //nolint:bodyclose require.NoError(t, err) require.Equal(t, http.StatusSwitchingProtocols, resp1.StatusCode) defer conn1.Close(websocket.StatusNormalClosure, "") // Second connection should succeed - conn2, resp2, err := websocket.Dial(context.Background(), httpSrv.URL, nil) + conn2, resp2, err := websocket.Dial(context.Background(), httpSrv.URL, nil) //nolint:bodyclose require.NoError(t, err) require.Equal(t, http.StatusSwitchingProtocols, resp2.StatusCode) defer conn2.Close(websocket.StatusNormalClosure, "") // Third connection should fail with 503 Service Unavailable - _, resp3, err := websocket.Dial(context.Background(), httpSrv.URL, nil) + _, resp3, err := websocket.Dial(context.Background(), httpSrv.URL, nil) //nolint:bodyclose require.Error(t, err) require.Equal(t, http.StatusServiceUnavailable, resp3.StatusCode) @@ -122,7 +122,7 @@ func TestWebsocketConnectionLimit(t *testing.T) { require.NoError(t, conn1.Close(websocket.StatusNormalClosure, "")) time.Sleep(10 * time.Millisecond) // Give the server time to clean up - conn4, resp4, err := websocket.Dial(context.Background(), httpSrv.URL, nil) + conn4, resp4, err := websocket.Dial(context.Background(), httpSrv.URL, nil) //nolint:bodyclose require.NoError(t, err) require.Equal(t, http.StatusSwitchingProtocols, resp4.StatusCode) require.NoError(t, conn4.Close(websocket.StatusNormalClosure, "")) diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go index ca7f40746a..6f7e44cb9b 100644 --- a/rpc/subscriptions.go +++ b/rpc/subscriptions.go @@ -4,8 +4,6 @@ import ( "context" "encoding/json" "errors" - "fmt" - "reflect" "time" "github.com/NethermindEth/juno/blockchain" @@ -117,7 +115,7 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys var wg conc.WaitGroup wg.Go(func() { // Stores the transaction hash -> number of events - eventsPreviouslySent := make([]*blockchain.FilteredEvent, 0) + eventsPreviouslySent := make(map[blockchain.FilteredEvent]struct{}) for { select { @@ -137,34 +135,19 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys return } - fmt.Println("size of slice before", len(eventsPreviouslySent)) for i, r := range b.Receipts { for _, e := range r.Events { - fe := &blockchain.FilteredEvent{ + fe := blockchain.FilteredEvent{ Event: e, BlockNumber: header.Number, BlockHash: header.Hash, TransactionHash: b.Transactions[i].Hash(), } - var deleteI int - var duplicateFound bool - for j, dupE := range eventsPreviouslySent { - if reflect.DeepEqual(fe, dupE) { - duplicateFound = true - deleteI = j - break - } - } - - if duplicateFound { - eventsPreviouslySent = append(eventsPreviouslySent[:deleteI], eventsPreviouslySent[deleteI+1:]...) - } + delete(eventsPreviouslySent, fe) } } - fmt.Println("size of slice after", len(eventsPreviouslySent)) case pending := <-pendingSub.Recv(): - fmt.Println("Found pending block", len(pending.Transactions)) h.processEvents(subscriptionCtx, w, id, pending.Number, pending.Number, fromAddr, keys, eventsPreviouslySent) } } @@ -334,7 +317,7 @@ func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Fe } func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt, - keys [][]felt.Felt, eventsPreviouslySent []*blockchain.FilteredEvent, + keys [][]felt.Felt, eventsPreviouslySent map[blockchain.FilteredEvent]struct{}, ) { filter, err := h.bcReader.EventFilter(fromAddr, keys) if err != nil { @@ -377,19 +360,18 @@ func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, t } func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent, - eventsPreviouslySent []*blockchain.FilteredEvent, id uint64, + eventsPreviouslySent map[blockchain.FilteredEvent]struct{}, id uint64, ) error { -eventsLoop: for _, event := range events { select { case <-ctx.Done(): return ctx.Err() default: - for _, prevEvent := range eventsPreviouslySent { - if reflect.DeepEqual(event, prevEvent) { - continue eventsLoop + if eventsPreviouslySent != nil { + if _, ok := eventsPreviouslySent[*event]; ok { + continue } - eventsPreviouslySent = append(eventsPreviouslySent, event) + eventsPreviouslySent[*event] = struct{}{} } emittedEvent := &EmittedEvent{ diff --git a/rpc/subscriptions_test.go b/rpc/subscriptions_test.go index 031939ad58..aa6acd7fd8 100644 --- a/rpc/subscriptions_test.go +++ b/rpc/subscriptions_test.go @@ -7,7 +7,6 @@ import ( "io" "net" "net/http/httptest" - "reflect" "testing" "time" @@ -1032,55 +1031,3 @@ func marshalSubEventsResp(e *EmittedEvent, id uint64) ([]byte, error) { }, }) } - -func TestEventEquality(t *testing.T) { - e1 := &Event{ - From: new(felt.Felt).SetUint64(1), - Keys: []*felt.Felt{new(felt.Felt).SetUint64(2), new(felt.Felt).SetUint64(3)}, - Data: []*felt.Felt{new(felt.Felt).SetUint64(4), new(felt.Felt).SetUint64(3)}, - } - - e2 := &Event{ - From: new(felt.Felt).SetUint64(1), - Keys: []*felt.Felt{new(felt.Felt).SetUint64(2), new(felt.Felt).SetUint64(3)}, - Data: []*felt.Felt{new(felt.Felt).SetUint64(4), new(felt.Felt).SetUint64(3)}, - } - - assert.True(t, reflect.DeepEqual(e1, e2)) - - e3 := &core.Event{ - From: new(felt.Felt).SetUint64(1), - Keys: []*felt.Felt{new(felt.Felt).SetUint64(2), new(felt.Felt).SetUint64(3)}, - Data: []*felt.Felt{new(felt.Felt).SetUint64(4), new(felt.Felt).SetUint64(3)}, - } - - e4 := &core.Event{ - From: new(felt.Felt).SetUint64(1), - Keys: []*felt.Felt{new(felt.Felt).SetUint64(2), new(felt.Felt).SetUint64(3)}, - Data: []*felt.Felt{new(felt.Felt).SetUint64(4), new(felt.Felt).SetUint64(3)}, - } - - assert.True(t, reflect.DeepEqual(e3, e4)) - - bn1 := uint64(10) - ee1 := EmittedEvent{ - Event: e1, - BlockNumber: &bn1, - BlockHash: new(felt.Felt).SetBytes([]byte("b hash")), - TransactionHash: new(felt.Felt).SetBytes([]byte("tx hash")), - } - - bn2 := uint64(10) - ee2 := EmittedEvent{ - Event: e2, - BlockNumber: &bn2, - BlockHash: new(felt.Felt).SetBytes([]byte("b hash")), - TransactionHash: new(felt.Felt).SetBytes([]byte("tx hash")), - } - - assert.True(t, reflect.DeepEqual(ee1, ee2)) - // assert.True(t, *ee1.Event.Data == *ee2.Event.Data) - assert.True(t, *ee1.BlockNumber == *ee2.BlockNumber) - assert.True(t, *ee1.BlockHash == *ee2.BlockHash) - assert.True(t, *ee1.TransactionHash == *ee2.TransactionHash) -} diff --git a/sync/sync.go b/sync/sync.go index 5f799a381a..9ec490d002 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -630,7 +630,6 @@ func (s *Synchronizer) StorePending(p *Pending) error { } s.pending.Store(p) - // send the pending transactions to the feed s.pendingFeed.Send(p.Block) return nil