Skip to content

Commit

Permalink
fix: Silence web3 event listener cancellation errors (#456)
Browse files Browse the repository at this point in the history
* chore: Return nil error when context cancelled

The program is over, no need to say anything.

* fix: Report only non-nil errors

If the error is nil, the channel was closed.

* chore: Reorder web3Events.Start params

Context is always the first param in Go.

* chore: Reorder event channel collection Start params

Context is always the first param in Go.

* chore: Update with go mod tidy

---------

Co-authored-by: James Walker <walkah@walkah.net>
  • Loading branch information
bgins and walkah authored Nov 28, 2024
1 parent 86a3bc6 commit d679ea3
Show file tree
Hide file tree
Showing 15 changed files with 60 additions and 67 deletions.
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/ethereum/go-ethereum v1.13.4
github.com/fatih/color v1.15.0
github.com/go-chi/httprate v0.14.1
github.com/go-git/go-git/v5 v5.10.0
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
Expand All @@ -23,14 +24,19 @@ require (
github.com/multiformats/go-multiaddr v0.13.0
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.31.0
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
github.com/shirou/gopsutil/v4 v4.24.10
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.9.0
github.com/theckman/yacspin v0.13.12
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.55.0
go.opentelemetry.io/otel v1.32.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.32.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0
go.opentelemetry.io/otel/metric v1.32.0
go.opentelemetry.io/otel/sdk v1.32.0
go.opentelemetry.io/otel/sdk/metric v1.32.0
go.opentelemetry.io/otel/trace v1.32.0
golang.org/x/crypto v0.28.0
gorgonia.org/cu v0.9.7-0.20240623234718-3cd40db700e9
Expand Down Expand Up @@ -69,7 +75,6 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-chi/httprate v0.14.1 // indirect
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
github.com/go-git/go-billy/v5 v5.5.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand Down Expand Up @@ -125,6 +130,7 @@ require (
github.com/libp2p/go-libp2p-routing-helpers v0.7.4 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand Down Expand Up @@ -152,6 +158,7 @@ require (
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
Expand All @@ -160,8 +167,6 @@ require (
github.com/rs/cors v1.10.1 // indirect
github.com/samber/lo v1.46.0 // indirect
github.com/sergi/go-diff v1.3.1 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/shirou/gopsutil/v4 v4.24.10 // indirect
github.com/skeema/knownhosts v1.2.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.10.0 // indirect
Expand All @@ -180,10 +185,6 @@ require (
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.32.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.32.0 // indirect
go.opentelemetry.io/otel/metric v1.32.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
42 changes: 8 additions & 34 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/jobcreator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (controller *JobCreatorController) Start(ctx context.Context, cm *system.Cl
errorChan <- err
return errorChan
}
err = controller.web3Events.Start(controller.web3SDK, ctx, cm)
err = controller.web3Events.Start(ctx, cm, controller.web3SDK)
if err != nil {
errorChan <- err
return errorChan
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobcreator/onchain_jobcreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (jobCreator *OnChainJobCreator) Start(ctx context.Context, cm *system.Clean
return errorChan
}

err = jobCreator.web3Events.Start(jobCreator.web3SDK, ctx, cm)
err = jobCreator.web3Events.Start(ctx, cm, jobCreator.web3SDK)
if err != nil {
errorChan <- err
return errorChan
Expand Down
2 changes: 1 addition & 1 deletion pkg/mediator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (controller *MediatorController) Start(ctx context.Context, cm *system.Clea
return errorChan
}
// activate the web3 event listeners
err = controller.web3Events.Start(controller.web3SDK, ctx, cm)
err = controller.web3Events.Start(ctx, cm, controller.web3SDK)
if err != nil {
errorChan <- err
return errorChan
Expand Down
2 changes: 1 addition & 1 deletion pkg/resourceprovider/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (controller *ResourceProviderController) Start(ctx context.Context, cm *sys
errorChan <- err
return errorChan
}
err = controller.web3Events.Start(controller.web3SDK, ctx, cm)
err = controller.web3Events.Start(ctx, cm, controller.web3SDK)
if err != nil {
errorChan <- err
return errorChan
Expand Down
2 changes: 1 addition & 1 deletion pkg/solver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (controller *SolverController) Start(ctx context.Context, cm *system.Cleanu

// activate the web3 event listeners
log.Debug().Msgf("controller.web3Events.Start")
err = controller.web3Events.Start(controller.web3SDK, ctx, cm)
err = controller.web3Events.Start(ctx, cm, controller.web3SDK)
if err != nil {
errorChan <- err
return errorChan
Expand Down
4 changes: 2 additions & 2 deletions pkg/web3/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ func NewEventChannels() *EventChannels {
}

func (eventChannels *EventChannels) Start(
sdk *Web3SDK,
ctx context.Context,
cm *system.CleanupManager,
sdk *Web3SDK,
) error {
for _, collection := range eventChannels.collections {
c := collection
go func() {
for {
err := c.Start(sdk, ctx, cm)
err := c.Start(ctx, cm, sdk)
if err != nil {
log.Error().Msgf("error starting listeners: %s reconnect in 2 seconds", err.Error())
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/web3/events_jobcreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func NewJobCreatorEventChannels() *JobCreatorEventChannels {
}

func (s *JobCreatorEventChannels) Start(
sdk *Web3SDK,
ctx context.Context,
cm *system.CleanupManager,
sdk *Web3SDK,
) error {
blockNumber, err := sdk.getBlockNumber()
if err != nil {
Expand Down Expand Up @@ -54,7 +54,7 @@ func (s *JobCreatorEventChannels) Start(
for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
return nil
case event := <-s.jobAddedChan:
log.Debug().
Str("storage->event", "JobAdded").
Expand All @@ -63,7 +63,10 @@ func (s *JobCreatorEventChannels) Start(
go handler(*event)
}
case err := <-jobAddedSub.Err():
return fmt.Errorf("cancel by job JobAdded event subscribe error %w", err)
if err != nil {
return fmt.Errorf("cancel by job JobAdded event subscribe error %w", err)
}
return nil
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/web3/events_mediation.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func NewMediationEventChannels() *MediationEventChannels {
}

func (m *MediationEventChannels) Start(
sdk *Web3SDK,
ctx context.Context,
cm *system.CleanupManager,
sdk *Web3SDK,
) error {
blockNumber, err := sdk.getBlockNumber()
if err != nil {
Expand Down Expand Up @@ -54,7 +54,7 @@ func (m *MediationEventChannels) Start(
for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
return nil
case event := <-m.mediationRequestedChan:
log.Debug().
Str("mediation->event", "MediationRequested").
Expand All @@ -63,7 +63,10 @@ func (m *MediationEventChannels) Start(
go handler(*event)
}
case err := <-mediationRequestedSub.Err():
return fmt.Errorf("cancel by mediation MediationRequested event subscribe error %w", err)
if err != nil {
return fmt.Errorf("cancel by mediation MediationRequested event subscribe error %w", err)
}
return nil
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/web3/events_payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func NewPaymentEventChannels() *PaymentEventChannels {
}

func (p *PaymentEventChannels) Start(
sdk *Web3SDK,
ctx context.Context,
cm *system.CleanupManager,
sdk *Web3SDK,
) error {
blockNumber, err := sdk.getBlockNumber()
if err != nil {
Expand Down Expand Up @@ -54,7 +54,7 @@ func (p *PaymentEventChannels) Start(
for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
return nil
case event := <-p.paymentChan:
log.Debug().
Str("payments->event", "Payment").
Expand All @@ -63,7 +63,10 @@ func (p *PaymentEventChannels) Start(
go handler(*event)
}
case err := <-paymentSub.Err():
return fmt.Errorf("cancel by mediation MediationRequested event subscribe error %w", err)
if err != nil {
return fmt.Errorf("cancel by mediation MediationRequested event subscribe error %w", err)
}
return nil
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/web3/events_pow.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func NewPowEventChannels() *PowEventChannels {
}

func (s *PowEventChannels) Start(
sdk *Web3SDK,
ctx context.Context,
cm *system.CleanupManager,
sdk *Web3SDK,
) error {
blockNumber, err := sdk.getBlockNumber()
if err != nil {
Expand Down Expand Up @@ -55,7 +55,7 @@ func (s *PowEventChannels) Start(
for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
return nil
case event := <-s.newPowRoundChan:
log.Debug().
Str("pow->event", "PowNewPowRound").
Expand All @@ -65,7 +65,10 @@ func (s *PowEventChannels) Start(
go handler(*event)
}
case err := <-newPowRoundSub.Err():
return fmt.Errorf("cancel by pow newPowRound event subscribe error %w", err)
if err != nil {
return fmt.Errorf("cancel by pow newPowRound event subscribe error %w", err)
}
return nil
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/web3/events_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func NewStorageEventChannels() *StorageEventChannels {
}

func (s *StorageEventChannels) Start(
sdk *Web3SDK,
ctx context.Context,
cm *system.CleanupManager,
sdk *Web3SDK,
) error {
blockNumber, err := sdk.getBlockNumber()
if err != nil {
Expand Down Expand Up @@ -54,7 +54,7 @@ func (s *StorageEventChannels) Start(
for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
return nil
case event := <-s.dealStateChangeChan:
log.Debug().
Str("storage->event", "DealStateChange").
Expand All @@ -63,7 +63,10 @@ func (s *StorageEventChannels) Start(
go handler(*event)
}
case err := <-dealStateChangeSub.Err():
return fmt.Errorf("cancel by storage DealStateChange event subscribe error %w", err)
if err != nil {
return fmt.Errorf("cancel by storage DealStateChange event subscribe error %w", err)
}
return nil
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/web3/events_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func NewTokenEventChannels() *TokenEventChannels {
}

func (t *TokenEventChannels) Start(
sdk *Web3SDK,
ctx context.Context,
cm *system.CleanupManager,
sdk *Web3SDK,
) error {
blockNumber, err := sdk.getBlockNumber()
if err != nil {
Expand Down Expand Up @@ -57,7 +57,7 @@ func (t *TokenEventChannels) Start(
for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
return nil
case event := <-t.transferChan:
log.Debug().
Str("token->event", "Transfer").
Expand All @@ -66,7 +66,10 @@ func (t *TokenEventChannels) Start(
go handler(*event)
}
case err := <-transferSub.Err():
return fmt.Errorf("cancel by token Transfer event subscribe error %w", err)
if err != nil {
return fmt.Errorf("cancel by token Transfer event subscribe error %w", err)
}
return nil
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/web3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type Web3Options struct {

type EventChannelCollection interface {
Start(
sdk *Web3SDK,
ctx context.Context,
cm *system.CleanupManager,
sdk *Web3SDK,
) error
}

0 comments on commit d679ea3

Please sign in to comment.