Skip to content

Commit

Permalink
Fix sending of pending events
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann committed Feb 4, 2025
1 parent 7536b18 commit 448f8df
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 87 deletions.
7 changes: 7 additions & 0 deletions blockchain/event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ type FilteredEvent struct {
TransactionHash *felt.Felt
}

func (f *FilteredEvent) Equal(other *FilteredEvent) bool {
return f.BlockNumber == other.BlockNumber &&
f.BlockHash.Equal(other.BlockHash) &&
f.TransactionHash.Equal(other.TransactionHash) &&
f.Event.Equal(other.Event)
}

//nolint:gocyclo
func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*FilteredEvent, *ContinuationToken, error) {
var matchedEvents []*FilteredEvent
Expand Down
6 changes: 6 additions & 0 deletions core/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ type Event struct {
Keys []*felt.Felt
}

func (e *Event) Equal(other *Event) bool {
return slices.Equal(e.Data, other.Data) &&
e.From.Equal(other.From) &&
slices.Equal(e.Keys, other.Keys)
}

type L1ToL2Message struct {
From common.Address
Nonce *felt.Felt
Expand Down
4 changes: 2 additions & 2 deletions jsonrpc/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func NewWebsocket(rpc *Server, shutdown <-chan struct{}, log utils.SimpleLogger)
}

// WithMaxConnections sets the maximum number of concurrent websocket connections
func (ws *Websocket) WithMaxConnections(max int) *Websocket {
ws.maxConns = max
func (ws *Websocket) WithMaxConnections(maxConns int) *Websocket {
ws.maxConns = maxConns
return ws
}

Expand Down
8 changes: 4 additions & 4 deletions jsonrpc/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,27 @@ func TestWebsocketConnectionLimit(t *testing.T) {
defer httpSrv.Close()

// First connection should succeed
conn1, resp1, err := websocket.Dial(context.Background(), httpSrv.URL, nil)
conn1, resp1, err := websocket.Dial(context.Background(), httpSrv.URL, nil) //nolint:bodyclose
require.NoError(t, err)
require.Equal(t, http.StatusSwitchingProtocols, resp1.StatusCode)
defer conn1.Close(websocket.StatusNormalClosure, "")

// Second connection should succeed
conn2, resp2, err := websocket.Dial(context.Background(), httpSrv.URL, nil)
conn2, resp2, err := websocket.Dial(context.Background(), httpSrv.URL, nil) //nolint:bodyclose
require.NoError(t, err)
require.Equal(t, http.StatusSwitchingProtocols, resp2.StatusCode)
defer conn2.Close(websocket.StatusNormalClosure, "")

// Third connection should fail with 503 Service Unavailable
_, resp3, err := websocket.Dial(context.Background(), httpSrv.URL, nil)
_, resp3, err := websocket.Dial(context.Background(), httpSrv.URL, nil) //nolint:bodyclose
require.Error(t, err)
require.Equal(t, http.StatusServiceUnavailable, resp3.StatusCode)

// Close one connection and try again - should succeed
require.NoError(t, conn1.Close(websocket.StatusNormalClosure, ""))
time.Sleep(10 * time.Millisecond) // Give the server time to clean up

conn4, resp4, err := websocket.Dial(context.Background(), httpSrv.URL, nil)
conn4, resp4, err := websocket.Dial(context.Background(), httpSrv.URL, nil) //nolint:bodyclose
require.NoError(t, err)
require.Equal(t, http.StatusSwitchingProtocols, resp4.StatusCode)
require.NoError(t, conn4.Close(websocket.StatusNormalClosure, ""))
Expand Down
36 changes: 9 additions & 27 deletions rpc/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"time"

"github.com/NethermindEth/juno/blockchain"
Expand Down Expand Up @@ -117,7 +115,7 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
var wg conc.WaitGroup
wg.Go(func() {
// Stores the transaction hash -> number of events
eventsPreviouslySent := make([]*blockchain.FilteredEvent, 0)
eventsPreviouslySent := make(map[blockchain.FilteredEvent]struct{})

for {
select {
Expand All @@ -137,34 +135,19 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
return
}

fmt.Println("size of slice before", len(eventsPreviouslySent))
for i, r := range b.Receipts {
for _, e := range r.Events {
fe := &blockchain.FilteredEvent{
fe := blockchain.FilteredEvent{
Event: e,
BlockNumber: header.Number,
BlockHash: header.Hash,
TransactionHash: b.Transactions[i].Hash(),
}

var deleteI int
var duplicateFound bool
for j, dupE := range eventsPreviouslySent {
if reflect.DeepEqual(fe, dupE) {
duplicateFound = true
deleteI = j
break
}
}

if duplicateFound {
eventsPreviouslySent = append(eventsPreviouslySent[:deleteI], eventsPreviouslySent[deleteI+1:]...)
}
delete(eventsPreviouslySent, fe)
}
}
fmt.Println("size of slice after", len(eventsPreviouslySent))
case pending := <-pendingSub.Recv():
fmt.Println("Found pending block", len(pending.Transactions))
h.processEvents(subscriptionCtx, w, id, pending.Number, pending.Number, fromAddr, keys, eventsPreviouslySent)
}
}
Expand Down Expand Up @@ -334,7 +317,7 @@ func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Fe
}

func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt,
keys [][]felt.Felt, eventsPreviouslySent []*blockchain.FilteredEvent,
keys [][]felt.Felt, eventsPreviouslySent map[blockchain.FilteredEvent]struct{},
) {
filter, err := h.bcReader.EventFilter(fromAddr, keys)
if err != nil {
Expand Down Expand Up @@ -377,19 +360,18 @@ func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, t
}

func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent,
eventsPreviouslySent []*blockchain.FilteredEvent, id uint64,
eventsPreviouslySent map[blockchain.FilteredEvent]struct{}, id uint64,
) error {
eventsLoop:
for _, event := range events {
select {
case <-ctx.Done():
return ctx.Err()
default:
for _, prevEvent := range eventsPreviouslySent {
if reflect.DeepEqual(event, prevEvent) {
continue eventsLoop
if eventsPreviouslySent != nil {
if _, ok := eventsPreviouslySent[*event]; ok {
continue
}
eventsPreviouslySent = append(eventsPreviouslySent, event)
eventsPreviouslySent[*event] = struct{}{}
}

emittedEvent := &EmittedEvent{
Expand Down
53 changes: 0 additions & 53 deletions rpc/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"net"
"net/http/httptest"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -1032,55 +1031,3 @@ func marshalSubEventsResp(e *EmittedEvent, id uint64) ([]byte, error) {
},
})
}

func TestEventEquality(t *testing.T) {
e1 := &Event{
From: new(felt.Felt).SetUint64(1),
Keys: []*felt.Felt{new(felt.Felt).SetUint64(2), new(felt.Felt).SetUint64(3)},
Data: []*felt.Felt{new(felt.Felt).SetUint64(4), new(felt.Felt).SetUint64(3)},
}

e2 := &Event{
From: new(felt.Felt).SetUint64(1),
Keys: []*felt.Felt{new(felt.Felt).SetUint64(2), new(felt.Felt).SetUint64(3)},
Data: []*felt.Felt{new(felt.Felt).SetUint64(4), new(felt.Felt).SetUint64(3)},
}

assert.True(t, reflect.DeepEqual(e1, e2))

e3 := &core.Event{
From: new(felt.Felt).SetUint64(1),
Keys: []*felt.Felt{new(felt.Felt).SetUint64(2), new(felt.Felt).SetUint64(3)},
Data: []*felt.Felt{new(felt.Felt).SetUint64(4), new(felt.Felt).SetUint64(3)},
}

e4 := &core.Event{
From: new(felt.Felt).SetUint64(1),
Keys: []*felt.Felt{new(felt.Felt).SetUint64(2), new(felt.Felt).SetUint64(3)},
Data: []*felt.Felt{new(felt.Felt).SetUint64(4), new(felt.Felt).SetUint64(3)},
}

assert.True(t, reflect.DeepEqual(e3, e4))

bn1 := uint64(10)
ee1 := EmittedEvent{
Event: e1,
BlockNumber: &bn1,
BlockHash: new(felt.Felt).SetBytes([]byte("b hash")),
TransactionHash: new(felt.Felt).SetBytes([]byte("tx hash")),
}

bn2 := uint64(10)
ee2 := EmittedEvent{
Event: e2,
BlockNumber: &bn2,
BlockHash: new(felt.Felt).SetBytes([]byte("b hash")),
TransactionHash: new(felt.Felt).SetBytes([]byte("tx hash")),
}

assert.True(t, reflect.DeepEqual(ee1, ee2))
// assert.True(t, *ee1.Event.Data == *ee2.Event.Data)
assert.True(t, *ee1.BlockNumber == *ee2.BlockNumber)
assert.True(t, *ee1.BlockHash == *ee2.BlockHash)
assert.True(t, *ee1.TransactionHash == *ee2.TransactionHash)
}
1 change: 0 additions & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,6 @@ func (s *Synchronizer) StorePending(p *Pending) error {
}
s.pending.Store(p)

// send the pending transactions to the feed
s.pendingFeed.Send(p.Block)

return nil
Expand Down

0 comments on commit 448f8df

Please sign in to comment.