Skip to content

Commit

Permalink
Merge pull request #127 from mdehoog/revert-pending-subscription-filter
Browse files Browse the repository at this point in the history
Revert pending subscription filter
  • Loading branch information
protolambda authored Sep 4, 2023
2 parents ee5b962 + 20753a1 commit 0fb8208
Show file tree
Hide file tree
Showing 8 changed files with 8 additions and 39 deletions.
1 change: 0 additions & 1 deletion cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ var (
utils.RollupHistoricalRPCTimeoutFlag,
utils.RollupDisableTxPoolGossipFlag,
utils.RollupComputePendingBlock,
utils.RollupAllowPendingTxFilters,
configFileFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags)

Expand Down
9 changes: 1 addition & 8 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,11 +893,6 @@ var (
Usage: "By default the pending block equals the latest block to save resources and not leak txs from the tx-pool, this flag enables computing of the pending block from the tx-pool instead.",
Category: flags.RollupCategory,
}
RollupAllowPendingTxFilters = &cli.BoolFlag{
Name: "rollup.allowpendingtxfilters",
Usage: "By default 'eth_subscribe' with 'NewPendingTransaction' and 'eth_newPendingTransactionFilter' are disabled to prevent leaking txs from the tx-pool.",
Category: flags.RollupCategory,
}

// Metrics flags
MetricsEnabledFlag = &cli.BoolFlag{
Expand Down Expand Up @@ -1844,7 +1839,6 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
}
cfg.RollupDisableTxPoolGossip = ctx.Bool(RollupDisableTxPoolGossipFlag.Name)
cfg.RollupDisableTxPoolAdmission = cfg.RollupSequencerHTTP != "" && !ctx.Bool(RollupEnableTxPoolAdmissionFlag.Name)
cfg.RollupAllowPendingTxFilters = ctx.Bool(RollupAllowPendingTxFilters.Name)
// Override any default configs for hard coded networks.
switch {
case ctx.Bool(MainnetFlag.Name):
Expand Down Expand Up @@ -2037,8 +2031,7 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSyst
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
isLightClient := ethcfg.SyncMode == downloader.LightSync
filterSystem := filters.NewFilterSystem(backend, filters.Config{
LogCacheSize: ethcfg.FilterLogCacheSize,
AllowPendingTxs: ethcfg.RollupAllowPendingTxFilters,
LogCacheSize: ethcfg.FilterLogCacheSize,
})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Expand Down
1 change: 0 additions & 1 deletion eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ type Config struct {
RollupHistoricalRPCTimeout time.Duration
RollupDisableTxPoolGossip bool
RollupDisableTxPoolAdmission bool
RollupAllowPendingTxFilters bool
}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down
14 changes: 2 additions & 12 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ import (
var (
errInvalidTopic = errors.New("invalid topic(s)")
errFilterNotFound = errors.New("filter not found")
// errPendingDisabled is returned from NewPendingTransaction* when access to the mempool is not allowed
errPendingDisabled = errors.New("pending tx filters are disabled")
)

// filter is a helper struct that holds meta information over the filter type
Expand Down Expand Up @@ -111,11 +109,7 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
//
// It is part of the filter package because this filter can be used through the
// `eth_getFilterChanges` polling method that is also used for log filters.
func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) (rpc.ID, error) {
if !api.sys.cfg.AllowPendingTxs {
return "", errPendingDisabled
}

func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
var (
pendingTxs = make(chan []*types.Transaction)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
Expand Down Expand Up @@ -143,17 +137,13 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) (rpc.ID, error)
}
}()

return pendingTxSub.ID, nil
return pendingTxSub.ID
}

// NewPendingTransactions creates a subscription that is triggered each time a
// transaction enters the transaction pool. If fullTx is true the full tx is
// sent to the client, otherwise the hash is sent.
func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
if !api.sys.cfg.AllowPendingTxs {
return &rpc.Subscription{}, errPendingDisabled
}

notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand Down
2 changes: 0 additions & 2 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ import (
type Config struct {
LogCacheSize int // maximum number of cached blocks (default: 32)
Timeout time.Duration // how long filters stay active (default: 5min)
// allow filtering or subscriptions to new pending txs:
AllowPendingTxs bool
}

func (cfg Config) withDefaults() Config {
Expand Down
16 changes: 3 additions & 13 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc

func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) {
backend := &testBackend{db: db}
cfg.AllowPendingTxs = true
sys := NewFilterSystem(backend, cfg)
return backend, sys
}
Expand Down Expand Up @@ -264,10 +263,7 @@ func TestPendingTxFilter(t *testing.T) {
hashes []common.Hash
)

fid0, err := api.NewPendingTransactionFilter(nil)
if err != nil {
t.Fatalf("Unable to create filter: %v", err)
}
fid0 := api.NewPendingTransactionFilter(nil)

time.Sleep(1 * time.Second)
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
Expand Down Expand Up @@ -324,10 +320,7 @@ func TestPendingTxFilterFullTx(t *testing.T) {
)

fullTx := true
fid0, err := api.NewPendingTransactionFilter(&fullTx)
if err != nil {
t.Fatalf("Unable to create filter: %v", err)
}
fid0 := api.NewPendingTransactionFilter(&fullTx)

time.Sleep(1 * time.Second)
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
Expand Down Expand Up @@ -922,10 +915,7 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
// timeout either in 100ms or 200ms
fids := make([]rpc.ID, 20)
for i := 0; i < len(fids); i++ {
fid, err := api.NewPendingTransactionFilter(nil)
if err != nil {
t.Fatalf("Unable to create filter: %v", err)
}
fid := api.NewPendingTransactionFilter(nil)
fids[i] = fid
// Wait for at least one tx to arrive in filter
for {
Expand Down
2 changes: 1 addition & 1 deletion ethclient/gethclient/gethclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) {
if err != nil {
t.Fatalf("can't create new ethereum service: %v", err)
}
filterSystem := filters.NewFilterSystem(ethservice.APIBackend, filters.Config{AllowPendingTxs: true})
filterSystem := filters.NewFilterSystem(ethservice.APIBackend, filters.Config{})
n.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, false),
Expand Down
2 changes: 1 addition & 1 deletion graphql/graphql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func newGQLService(t *testing.T, stack *node.Node, gspec *core.Genesis, genBlock
t.Fatalf("could not create import blocks: %v", err)
}
// Set up handler
filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{AllowPendingTxs: true})
filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{})
handler, err := newHandler(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{})
if err != nil {
t.Fatalf("could not create graphql service: %v", err)
Expand Down

0 comments on commit 0fb8208

Please sign in to comment.