diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index c930e3c7657c..204fe1634768 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -415,18 +415,24 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa // // TODO(karalabe): Deprecate when the subscription one can return past data too. func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.FilterQuery) ([]types.Log, error) { - // Initialize unset filter boundaried to run from genesis to chain head - from := int64(0) - if query.FromBlock != nil { - from = query.FromBlock.Int64() - } - to := int64(-1) - if query.ToBlock != nil { - to = query.ToBlock.Int64() + var filter *filters.Filter + if query.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain}, *query.BlockHash, query.Addresses, query.Topics) + } else { + // Initialize unset filter boundaried to run from genesis to chain head + from := int64(0) + if query.FromBlock != nil { + from = query.FromBlock.Int64() + } + to := int64(-1) + if query.ToBlock != nil { + to = query.ToBlock.Int64() + } + // Construct the range filter + filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics) } - // Construct and execute the filter - filter := filters.New(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics) - + // Run the filter and return all the logs logs, err := filter.Logs(ctx) if err != nil { return nil, err @@ -527,8 +533,8 @@ func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumb return fb.bc.GetHeaderByNumber(uint64(block.Int64())), nil } -func (fb *filterBackend) HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) { - return fb.bc.GetHeaderByHash(blockHash), nil +func (fb *filterBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + return fb.bc.GetHeaderByHash(hash), nil } func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { diff --git a/eth/filters/api.go b/eth/filters/api.go index ac1330a18663..ed36c3a6476a 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -332,39 +332,24 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([ return nil, errExceedMaxTopics } - var ( - fromBlock int64 - toBlock int64 - ) - + var filter *Filter if crit.BlockHash != nil { - // look up block number from block hash - header, err := api.backend.HeaderByHash(ctx, *crit.BlockHash) - if err != nil { - return nil, err - } - if header == nil { - return nil, errors.New("unknown block") - } - fromBlock = int64(header.Number.Int64()) - toBlock = fromBlock + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics) } else { // Convert the RPC block numbers into internal representations - if crit.FromBlock == nil { - fromBlock = int64(rpc.LatestBlockNumber) - } else { - fromBlock = crit.FromBlock.Int64() + begin := rpc.LatestBlockNumber.Int64() + if crit.FromBlock != nil { + begin = crit.FromBlock.Int64() } - if crit.ToBlock == nil { - toBlock = int64(rpc.LatestBlockNumber) - } else { - toBlock = crit.ToBlock.Int64() + end := rpc.LatestBlockNumber.Int64() + if crit.ToBlock != nil { + end = crit.ToBlock.Int64() } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics) } - - // Create and run the filter to get all the logs - filter := New(api.backend, fromBlock, toBlock, crit.Addresses, crit.Topics) - + // Run the filter and return all the logs logs, err := filter.Logs(ctx) if err != nil { return nil, err @@ -402,17 +387,24 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty return nil, fmt.Errorf("filter not found") } - begin := rpc.LatestBlockNumber.Int64() - if f.crit.FromBlock != nil { - begin = f.crit.FromBlock.Int64() - } - end := rpc.LatestBlockNumber.Int64() - if f.crit.ToBlock != nil { - end = f.crit.ToBlock.Int64() + var filter *Filter + if f.crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if f.crit.FromBlock != nil { + begin = f.crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if f.crit.ToBlock != nil { + end = f.crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) } - // Create and run the filter to get all the logs - filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) - + // Run the filter and return all the logs logs, err := filter.Logs(ctx) if err != nil { return nil, err @@ -587,7 +579,7 @@ func decodeAddress(s string) (common.Address, error) { } b, err := hexutil.Decode(s) if err == nil && len(b) != common.AddressLength { - err = fmt.Errorf("hex has invalid length %d after decoding", len(b)) + err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength) } return common.BytesToAddress(b), err } @@ -595,7 +587,7 @@ func decodeAddress(s string) (common.Address, error) { func decodeTopic(s string) (common.Hash, error) { b, err := hexutil.Decode(s) if err == nil && len(b) != common.HashLength { - err = fmt.Errorf("hex has invalid length %d after decoding", len(b)) + err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength) } return common.BytesToHash(b), err } diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go index 799344a53c83..98635b9de7b9 100644 --- a/eth/filters/bench_test.go +++ b/eth/filters/bench_test.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "fmt" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "testing" "time" @@ -28,6 +27,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/common/bitutil" "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/bloombits" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" @@ -66,7 +66,7 @@ const benchFilterCnt = 2000 func benchmarkBloomBits(b *testing.B, sectionSize uint64) { benchDataDir := node.DefaultDataDir() + "/geth/chaindata" - fmt.Println("Running bloombits benchmark section size:", sectionSize) + b.Log("Running bloombits benchmark section size:", sectionSize) db, err := rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "") if err != nil { @@ -78,7 +78,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { } clearBloomBits(db) - fmt.Println("Generating bloombits data...") + b.Log("Generating bloombits data...") headNum := core.GetBlockNumber(db, head) if headNum < sectionSize+512 { b.Fatalf("not enough blocks for running a benchmark") @@ -113,16 +113,16 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { core.WriteBloomBits(db, uint(i), sectionIdx, sectionHead, comp) } //if sectionIdx%50 == 0 { - // fmt.Println(" section", sectionIdx, "/", cnt) + // b.Log(" section", sectionIdx, "/", cnt) //} } d := time.Since(start) - fmt.Println("Finished generating bloombits data") - fmt.Println(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block") - fmt.Println(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize)) + b.Log("Finished generating bloombits data") + b.Log(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block") + b.Log(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize)) - fmt.Println("Running filter benchmarks...") + b.Log("Running filter benchmarks...") start = time.Now() mux := new(event.TypeMux) var backend *testBackend @@ -136,14 +136,14 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { var addr common.Address addr[0] = byte(i) addr[1] = byte(i / 256) - filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) + filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) if _, err := filter.Logs(context.Background()); err != nil { b.Error("filter.Find error:", err) } } d = time.Since(start) - fmt.Println("Finished running filter benchmarks") - fmt.Println(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks") + b.Log("Finished running filter benchmarks") + b.Log(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks") db.Close() } @@ -175,7 +175,7 @@ func clearBloomBits(db ethdb.Database) { func BenchmarkNoBloomBits(b *testing.B) { benchDataDir := node.DefaultDataDir() + "/geth/chaindata" - fmt.Println("Running benchmark without bloombits") + b.Log("Running benchmark without bloombits") db, err := rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "") if err != nil { b.Fatalf("error opening database at %v: %v", benchDataDir, err) @@ -188,14 +188,14 @@ func BenchmarkNoBloomBits(b *testing.B) { clearBloomBits(db) - fmt.Println("Running filter benchmarks...") + b.Log("Running filter benchmarks...") start := time.Now() mux := new(event.TypeMux) backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} - filter := New(backend, 0, int64(headNum), []common.Address{{}}, nil) + filter := NewRangeFilter(backend, 0, int64(headNum), []common.Address{{}}, nil) filter.Logs(context.Background()) d := time.Since(start) - fmt.Println("Finished running filter benchmarks") - fmt.Println(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks") + b.Log("Finished running filter benchmarks") + b.Log(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks") db.Close() } diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 8780a4997d18..dcd872fc4c48 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -18,6 +18,7 @@ package filters import ( "context" + "errors" "math/big" "github.com/XinFinOrg/XDPoSChain/common" @@ -50,17 +51,19 @@ type Backend interface { type Filter struct { backend Backend - db ethdb.Database - begin, end int64 - addresses []common.Address - topics [][]common.Hash + db ethdb.Database + addresses []common.Address + topics [][]common.Hash + + block common.Hash // Block hash if filtering a single block + begin, end int64 // Range interval if filtering multiple blocks matcher *bloombits.Matcher } -// New creates a new filter which uses a bloom filter on blocks to figure out whether -// a particular block is interesting or not. -func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { +// NewRangeFilter creates a new filter which uses a bloom filter on blocks to +// figure out whether a particular block is interesting or not. +func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { // Flatten the address and topic filter clauses into a single bloombits filter // system. Since the bloombits are not positional, nil topics are permitted, // which get flattened into a nil byte slice. @@ -79,23 +82,52 @@ func New(backend Backend, begin, end int64, addresses []common.Address, topics [ } filters = append(filters, filter) } - // Assemble and return the filter size, _ := backend.BloomStatus() + // Create a generic filter and convert it into a range filter + filter := newFilter(backend, addresses, topics) + + filter.matcher = bloombits.NewMatcher(size, filters) + filter.begin = begin + filter.end = end + + return filter +} + +// NewBlockFilter creates a new filter which directly inspects the contents of +// a block to figure out whether it is interesting or not. +func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { + // Create a generic filter and convert it into a block filter + filter := newFilter(backend, addresses, topics) + filter.block = block + return filter +} + +// newFilter creates a generic filter that can either filter based on a block hash, +// or based on range queries. The search criteria needs to be explicitly set. +func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter { return &Filter{ backend: backend, - begin: begin, - end: end, addresses: addresses, topics: topics, db: backend.ChainDb(), - matcher: bloombits.NewMatcher(size, filters), } } // Logs searches the blockchain for matching log entries, returning all from the // first block that contains matches, updating the start of the filter accordingly. func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { + // If we're doing singleton block filtering, execute and return + if f.block != (common.Hash{}) { + header, err := f.backend.HeaderByHash(ctx, f.block) + if err != nil { + return nil, err + } + if header == nil { + return nil, errors.New("unknown block") + } + return f.blockLogs(ctx, header) + } // Figure out the limits of the filter range header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if header == nil { @@ -188,13 +220,23 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e if header == nil || err != nil { return logs, err } - if bloomFilter(header.Bloom, f.addresses, f.topics) { - found, err := f.checkMatches(ctx, header) - if err != nil { - return logs, err - } - logs = append(logs, found...) + found, err := f.blockLogs(ctx, header) + if err != nil { + return logs, err + } + logs = append(logs, found...) + } + return logs, nil +} + +// blockLogs returns the logs matching the filter criteria within a single block. +func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { + if bloomFilter(header.Bloom, f.addresses, f.topics) { + found, err := f.checkMatches(ctx, header) + if err != nil { + return logs, err } + logs = append(logs, found...) } return logs, nil } @@ -259,9 +301,9 @@ Logs: if len(topics) > len(log.Topics) { continue Logs } - for i, topics := range topics { - match := len(topics) == 0 // empty rule set == wildcard - for _, topic := range topics { + for i, sub := range topics { + match := len(sub) == 0 // empty rule set == wildcard + for _, topic := range sub { if log.Topics[i] == topic { match = true break diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index b3ee0c72fefd..6dafd9610421 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -30,6 +30,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/event" + "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/rpc" ) @@ -91,8 +92,21 @@ type EventSystem struct { backend Backend lightMode bool lastHead *types.Header - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification + + // Subscriptions + txSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + chainSub event.Subscription // Subscription for new chain event + pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event + + // Channels + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txCh chan core.TxPreEvent // Channel to receive new transaction event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -108,6 +122,24 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), + txCh: make(chan core.TxPreEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), + } + + // Subscribe events + m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) + m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) + m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) + m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) + // TODO(rjl493456442): use feed to subscribe pending log event + m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) + + // Make sure none of the subscriptions are empty + if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || + m.pendingLogSub.Closed() { + log.Crit("Subscribe for event system failed") } go m.eventLoop() @@ -306,8 +338,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { } } case *event.TypeMuxEvent: - switch muxe := e.Data.(type) { - case core.PendingLogsEvent: + if muxe, ok := e.Data.(core.PendingLogsEvent); ok { for _, f := range filters[PendingLogsSubscription] { if e.Time.After(f.created) { if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { @@ -411,50 +442,35 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. // eventLoop (un)installs filters and processes mux events. func (es *EventSystem) eventLoop() { - var ( - index = make(filterIndex) - sub = es.mux.Subscribe(core.PendingLogsEvent{}) - // Subscribe TxPreEvent form txpool - txCh = make(chan core.TxPreEvent, txChanSize) - txSub = es.backend.SubscribeTxPreEvent(txCh) - // Subscribe RemovedLogsEvent - rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) - rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh) - // Subscribe []*types.Log - logsCh = make(chan []*types.Log, logsChanSize) - logsSub = es.backend.SubscribeLogsEvent(logsCh) - // Subscribe ChainEvent - chainEvCh = make(chan core.ChainEvent, chainEvChanSize) - chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) - ) - - // Unsubscribe all events - defer sub.Unsubscribe() - defer txSub.Unsubscribe() - defer rmLogsSub.Unsubscribe() - defer logsSub.Unsubscribe() - defer chainEvSub.Unsubscribe() - + // Ensure all subscriptions get cleaned up + defer func() { + es.pendingLogSub.Unsubscribe() + es.txSub.Unsubscribe() + es.logsSub.Unsubscribe() + es.rmLogsSub.Unsubscribe() + es.chainSub.Unsubscribe() + }() + + index := make(filterIndex) for i := UnknownSubscription; i < LastIndexSubscription; i++ { index[i] = make(map[rpc.ID]*subscription) } for { select { - case ev, active := <-sub.Chan(): - if !active { // system stopped - return - } - es.broadcast(index, ev) - // Handle subscribed events - case ev := <-txCh: + case ev := <-es.txCh: es.broadcast(index, ev) - case ev := <-rmLogsCh: + case ev := <-es.logsCh: es.broadcast(index, ev) - case ev := <-logsCh: + case ev := <-es.rmLogsCh: es.broadcast(index, ev) - case ev := <-chainEvCh: + case ev := <-es.chainCh: + es.broadcast(index, ev) + case ev, active := <-es.pendingLogSub.Chan(): + if !active { // system stopped + return + } es.broadcast(index, ev) case f := <-es.install: @@ -466,6 +482,7 @@ func (es *EventSystem) eventLoop() { index[f.typ][f.id] = f } close(f.installed) + case f := <-es.uninstall: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions @@ -477,13 +494,13 @@ func (es *EventSystem) eventLoop() { close(f.err) // System stopped - case <-txSub.Err(): + case <-es.txSub.Err(): return - case <-rmLogsSub.Err(): + case <-es.logsSub.Err(): return - case <-logsSub.Err(): + case <-es.rmLogsSub.Err(): return - case <-chainEvSub.Err(): + case <-es.chainSub.Err(): return } } diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index b75012dcb015..1b74d21df7e0 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -97,7 +97,7 @@ func BenchmarkFilters(b *testing.B) { } b.ResetTimer() - filter := New(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) + filter := NewRangeFilter(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) for i := 0; i < b.N; i++ { logs, _ := filter.Logs(context.Background()) @@ -186,14 +186,14 @@ func TestFilters(t *testing.T) { } } - filter := New(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) + filter := NewRangeFilter(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) logs, _ := filter.Logs(context.Background()) if len(logs) != 4 { t.Error("expected 4 log, got", len(logs)) } - filter = New(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = NewRangeFilter(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -202,7 +202,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = New(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = NewRangeFilter(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -211,7 +211,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = New(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}}) + filter = NewRangeFilter(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 2 { @@ -219,7 +219,7 @@ func TestFilters(t *testing.T) { } failHash := common.BytesToHash([]byte("fail")) - filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}}) + filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { @@ -227,14 +227,14 @@ func TestFilters(t *testing.T) { } failAddr := common.BytesToAddress([]byte("failmenow")) - filter = New(backend, 0, -1, []common.Address{failAddr}, nil) + filter = NewRangeFilter(backend, 0, -1, []common.Address{failAddr}, nil) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } - filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) + filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { diff --git a/event/event.go b/event/event.go index 20d20d1f57aa..423278731483 100644 --- a/event/event.go +++ b/event/event.go @@ -180,6 +180,12 @@ func (s *TypeMuxSubscription) Unsubscribe() { s.closewait() } +func (s *TypeMuxSubscription) Closed() bool { + s.closeMu.Lock() + defer s.closeMu.Unlock() + return s.closed +} + func (s *TypeMuxSubscription) closewait() { s.closeMu.Lock() defer s.closeMu.Unlock()