Skip to content

Commit

Permalink
Resub (#197)
Browse files Browse the repository at this point in the history
* support SubscribeXXXWitReconn

* add change log

* update comments
  • Loading branch information
wangdayong228 authored Oct 10, 2022
1 parent 9439f95 commit 4cb1946
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 4 deletions.
4 changes: 4 additions & 0 deletions changeLog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Go-conflux-sdk Change Log
============

v1.4.3
------------
- Support `SubscribeNewHeadsWitReconn`, `SubscribeEpochsWithReconn`, `SubscribeLogsWithReconn`

v1.4.0
------------
- Remove offset & limit from LogFilter
Expand Down
21 changes: 21 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,27 @@ func (client *Client) SubscribeLogs(channel chan types.SubscriptionLog, filter t
return client.Subscribe(context.Background(), "cfx", channel, "logs", filter)
}

// SubscribeNewHeadsWitReconn subscribes all new block headers participating in the consensus.
// It will auto re-subscribe if lost connect.
func (client *Client) SubscribeNewHeadsWitReconn(channel chan types.BlockHeader) *rpc.ReconnClientSubscription {
return client.SubscribeWithReconn(context.Background(), "cfx", channel, "newHeads")
}

// SubscribeEpochsWithReconn subscribes consensus results: the total order of blocks, as expressed by a sequence of epochs. Currently subscriptionEpochType only support "latest_mined" and "latest_state"
// It will auto re-subscribe if lost connect.
func (client *Client) SubscribeEpochsWithReconn(channel chan types.WebsocketEpochResponse, subscriptionEpochType ...types.Epoch) *rpc.ReconnClientSubscription {
if len(subscriptionEpochType) > 0 {
return client.SubscribeWithReconn(context.Background(), "cfx", channel, "epochs", subscriptionEpochType[0].String())
}
return client.SubscribeWithReconn(context.Background(), "cfx", channel, "epochs")
}

// SubscribeLogs subscribes all logs matching a certain filter, in order.
// It will auto re-subscribe if lost connect.
func (client *Client) SubscribeLogsWithReconn(channel chan types.SubscriptionLog, filter types.LogFilter) *rpc.ReconnClientSubscription {
return client.SubscribeWithReconn(context.Background(), "cfx", channel, "logs", filter)
}

// === helper methods ===

// WaitForTransationBePacked returns transaction when it is packed
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ require (
github.com/graph-gophers/graphql-go v1.3.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/mcuadros/go-defaults v1.2.0
github.com/openweb3/go-rpc-provider v0.2.2
github.com/openweb3/go-rpc-provider v0.3.0
github.com/openweb3/go-sdk-common v0.0.0-20220720074746-a7134e1d372c
github.com/pkg/errors v0.9.1
github.com/shopspring/decimal v1.3.1
github.com/smartystreets/goconvey v1.6.4
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4
github.com/stretchr/testify v1.7.0
gopkg.in/urfave/cli.v1 v1.20.0
gotest.tools v2.2.0+incompatible
Expand All @@ -21,4 +22,4 @@ require (

// replace github.com/openweb3/go-sdk-common => ../go-sdk-common
// replace github.com/ethereum/go-ethereum => ../../ethereum/go-ethereum
// replace github.com/openweb3/go-rpc-provider v0.2.0 => ../go-rpc-provider
// replace github.com/openweb3/go-rpc-provider v0.2.2 => ../go-rpc-provider
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt
github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/openweb3/go-rpc-provider v0.2.2 h1:7p+CEnum8A4ZCzcCWhaQsf/leNAUkwI2n/LKesxGqqE=
github.com/openweb3/go-rpc-provider v0.2.2/go.mod h1:DYz40TbzhzyTA06UFqGIKSXp0uFot6ZKh4QarD//eZ0=
github.com/openweb3/go-rpc-provider v0.3.0 h1:8D22MVyUG/NrpspwonzOGOhLeWZhxjJdQd4VT5KdJy4=
github.com/openweb3/go-rpc-provider v0.3.0/go.mod h1:DYz40TbzhzyTA06UFqGIKSXp0uFot6ZKh4QarD//eZ0=
github.com/openweb3/go-sdk-common v0.0.0-20220720074746-a7134e1d372c h1:BrPXZpkTdmZe5bNjSSnxWqL44X9FcZ3xftLcYNkIJ68=
github.com/openweb3/go-sdk-common v0.0.0-20220720074746-a7134e1d372c/go.mod h1:0WCVKMiLiYEaHhpQWQ3rgLti/Fv/+JPRiB0sEoovwk8=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
Expand Down
42 changes: 42 additions & 0 deletions integration_test/resub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package integrationtest

import (
"fmt"
"sync"
"testing"

sdk "github.com/Conflux-Chain/go-conflux-sdk"
"github.com/Conflux-Chain/go-conflux-sdk/types"
)

func _TestResubHeads(t *testing.T) {
client := sdk.MustNewClient("wss://test.confluxrpc.com/ws")

headc := make(chan types.BlockHeader)
sub := client.SubscribeNewHeadsWitReconn(headc)

retry := 0

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case err := <-sub.Err():
fmt.Printf("sub error: %v\n", err)
retry++
if retry >= 20 {
sub.Unsubscribe()
return
}
case <-sub.ResubSuccess():
fmt.Println("sub success")
retry = 0
case h := <-headc:
fmt.Printf("received head %v\n", h)
}
}
}()
wg.Wait()
}

0 comments on commit 4cb1946

Please sign in to comment.