Skip to content

Commit

Permalink
Merge pull request #1609 from statechannels/chain-old-events
Browse files Browse the repository at this point in the history
Check for missed events when `chainservice` initializes
  • Loading branch information
bitwiseguy authored Sep 1, 2023
2 parents e5c526a + 3874b64 commit f66d260
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 66 deletions.
38 changes: 11 additions & 27 deletions internal/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import (
"os/exec"
"time"

b "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/statechannels/go-nitro/node/engine/chainservice"
NitroAdjudicator "github.com/statechannels/go-nitro/node/engine/chainservice/adjudicator"
ConsensusApp "github.com/statechannels/go-nitro/node/engine/chainservice/consensusapp"
chainutils "github.com/statechannels/go-nitro/node/engine/chainservice/utils"
Expand All @@ -20,28 +19,13 @@ import (
)

type ChainOpts struct {
ChainUrl string
ChainAuthToken string
ChainPk string
NaAddress common.Address
VpaAddress common.Address
CaAddress common.Address
}

func InitializeEthChainService(chainOpts ChainOpts) (*chainservice.EthChainService, error) {
if chainOpts.ChainPk == "" {
return nil, fmt.Errorf("chainpk must be set")
}

fmt.Println("Initializing chain service and connecting to " + chainOpts.ChainUrl + "...")

return chainservice.NewEthChainService(
chainOpts.ChainUrl,
chainOpts.ChainAuthToken,
chainOpts.ChainPk,
chainOpts.NaAddress,
chainOpts.CaAddress,
chainOpts.VpaAddress)
ChainUrl string
ChainStartBlock uint64
ChainAuthToken string
ChainPk string
NaAddress common.Address
VpaAddress common.Address
CaAddress common.Address
}

func StartAnvil() (*exec.Cmd, error) {
Expand Down Expand Up @@ -87,17 +71,17 @@ type contractBackend interface {
}

// deployFunc is a function that deploys a contract and returns the contract address, backend, and transaction.
type deployFunc[T contractBackend] func(auth *b.TransactOpts, backend b.ContractBackend) (common.Address, *ethTypes.Transaction, *T, error)
type deployFunc[T contractBackend] func(auth *bind.TransactOpts, backend bind.ContractBackend) (common.Address, *ethTypes.Transaction, *T, error)

// deployContract deploys a contract and waits for the transaction to be mined.
func deployContract[T contractBackend](ctx context.Context, name string, ethClient *ethclient.Client, txSubmitter *b.TransactOpts, deploy deployFunc[T]) (types.Address, error) {
func deployContract[T contractBackend](ctx context.Context, name string, ethClient *ethclient.Client, txSubmitter *bind.TransactOpts, deploy deployFunc[T]) (types.Address, error) {
a, tx, _, err := deploy(txSubmitter, ethClient)
if err != nil {
return types.Address{}, err
}

fmt.Printf("Waiting for %s deployment confirmation\n", name)
_, err = b.WaitMined(ctx, ethClient, tx)
_, err = bind.WaitMined(ctx, ethClient, tx)
if err != nil {
return types.Address{}, err
}
Expand Down
12 changes: 11 additions & 1 deletion internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,18 @@ func InitializeNode(pkString string, chainOpts chain.ChainOpts,
slog.Info("Initializing message service on port " + fmt.Sprint(msgPort) + "...")
messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, bootPeers)

// Compare chainOpts.ChainStartBlock to lastBlockNum seen in store. The larger of the two
// gets passed as an argument when creating NewEthChainService
storeBlockNum, err := ourStore.GetLastBlockNumSeen()
if err != nil {
return nil, nil, nil, nil, err
}
if storeBlockNum > chainOpts.ChainStartBlock {
chainOpts.ChainStartBlock = storeBlockNum
}

slog.Info("Initializing chain service and connecting to " + chainOpts.ChainUrl + "...")
ourChain, err := chain.InitializeEthChainService(chainOpts)
ourChain, err := chainservice.NewEthChainService(chainOpts)
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down
22 changes: 16 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func main() {
CONNECTIVITY_CATEGORY = "Connectivity:"
USE_NATS = "usenats"
CHAIN_URL = "chainurl"
CHAIN_START_BLOCK = "chainstartblock"
CHAIN_AUTH_TOKEN = "chainauthtoken"
NA_ADDRESS = "naaddress"
VPA_ADDRESS = "vpaaddress"
Expand All @@ -46,6 +47,7 @@ func main() {
)
var pkString, chainUrl, chainAuthToken, naAddress, vpaAddress, caAddress, chainPk, durableStoreFolder, bootPeers string
var msgPort, rpcPort, guiPort int
var chainStartBlock uint64
var useNats, useDurableStore bool

// urfave default precedence for flag value sources (highest to lowest):
Expand Down Expand Up @@ -102,6 +104,13 @@ func main() {
Destination: &chainPk,
EnvVars: []string{"CHAIN_PK"},
}),
altsrc.NewUint64Flag(&cli.Uint64Flag{
Name: CHAIN_START_BLOCK,
Usage: "Specifies the block number to start looking for nitro adjudicator events.",
Value: 0,
Category: CONNECTIVITY_CATEGORY,
Destination: &chainStartBlock,
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: NA_ADDRESS,
Usage: "Specifies the address of the nitro adjudicator contract.",
Expand Down Expand Up @@ -163,12 +172,13 @@ func main() {
Before: altsrc.InitInputSourceWithContext(flags, altsrc.NewTomlSourceFromFlagFunc(CONFIG)),
Action: func(cCtx *cli.Context) error {
chainOpts := chain.ChainOpts{
ChainUrl: chainUrl,
ChainAuthToken: chainAuthToken,
ChainPk: chainPk,
NaAddress: common.HexToAddress(naAddress),
VpaAddress: common.HexToAddress(vpaAddress),
CaAddress: common.HexToAddress(caAddress),
ChainUrl: chainUrl,
ChainStartBlock: chainStartBlock,
ChainAuthToken: chainAuthToken,
ChainPk: chainPk,
NaAddress: common.HexToAddress(naAddress),
VpaAddress: common.HexToAddress(vpaAddress),
CaAddress: common.HexToAddress(caAddress),
}

var peerSlice []string
Expand Down
83 changes: 60 additions & 23 deletions node/engine/chainservice/eth_chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
ethTypes "github.com/ethereum/go-ethereum/core/types"

"github.com/statechannels/go-nitro/channel/state"
"github.com/statechannels/go-nitro/internal/chain"
"github.com/statechannels/go-nitro/internal/logging"
NitroAdjudicator "github.com/statechannels/go-nitro/node/engine/chainservice/adjudicator"
Token "github.com/statechannels/go-nitro/node/engine/chainservice/erc20"
Expand Down Expand Up @@ -86,31 +87,40 @@ const RESUB_INTERVAL = 15 * time.Second
const REQUIRED_BLOCK_CONFIRMATIONS = 2

// NewEthChainService is a convenient wrapper around newEthChainService, which provides a simpler API
func NewEthChainService(chainUrl, chainAuthToken, chainPk string, naAddress, caAddress, vpaAddress common.Address) (*EthChainService, error) {
if vpaAddress == caAddress {
return nil, fmt.Errorf("virtual payment app address and consensus app address cannot be the same: %s", vpaAddress.String())
func NewEthChainService(chainOpts chain.ChainOpts) (*EthChainService, error) {
if chainOpts.ChainPk == "" {
return nil, fmt.Errorf("chainpk must be set")
}
ethClient, txSigner, err := chainutils.ConnectToChain(context.Background(), chainUrl, chainAuthToken, common.Hex2Bytes(chainPk))
if chainOpts.VpaAddress == chainOpts.CaAddress {
return nil, fmt.Errorf("virtual payment app address and consensus app address cannot be the same: %s", chainOpts.VpaAddress.String())
}

ethClient, txSigner, err := chainutils.ConnectToChain(
context.Background(),
chainOpts.ChainUrl,
chainOpts.ChainAuthToken,
common.Hex2Bytes(chainOpts.ChainPk),
)
if err != nil {
panic(err)
}

na, err := NitroAdjudicator.NewNitroAdjudicator(naAddress, ethClient)
na, err := NitroAdjudicator.NewNitroAdjudicator(chainOpts.NaAddress, ethClient)
if err != nil {
panic(err)
}

return newEthChainService(ethClient, na, naAddress, caAddress, vpaAddress, txSigner)
return newEthChainService(ethClient, chainOpts.ChainStartBlock, na, chainOpts.NaAddress, chainOpts.CaAddress, chainOpts.VpaAddress, txSigner)
}

// newEthChainService constructs a chain service that submits transactions to a NitroAdjudicator
// and listens to events from an eventSource
func newEthChainService(chain ethChain, na *NitroAdjudicator.NitroAdjudicator,
func newEthChainService(chain ethChain, startBlock uint64, na *NitroAdjudicator.NitroAdjudicator,
naAddress, caAddress, vpaAddress common.Address, txSigner *bind.TransactOpts,
) (*EthChainService, error) {
ctx, cancelCtx := context.WithCancel(context.Background())

logger := slog.Default().With("tx-signer", txSigner.From.String())
logger := logging.LoggerWithAddress(slog.Default(), txSigner.From)

eventQueue := EventQueue{}
heap.Init(&eventQueue)
Expand All @@ -123,15 +133,46 @@ func newEthChainService(chain ethChain, na *NitroAdjudicator.NitroAdjudicator,
return nil, err
}

// TODO: Return error from chain service instead of panicking
// Prevent go routines from processing events before checkForMissedEvents completes
ecs.eventTracker.mu.Lock()

ecs.wg.Add(3)
go ecs.listenForEventLogs(errChan, eventSub, eventChan, eventQuery)
go ecs.listenForNewBlocks(errChan, newBlockSub, newBlockChan)
go ecs.listenForErrors(errChan)

// Search for any missed events emitted while this node was offline
err = ecs.checkForMissedEvents(startBlock)
if err != nil {
return nil, err
}
ecs.eventTracker.mu.Unlock()

return &ecs, nil
}

func (ecs *EthChainService) checkForMissedEvents(startBlock uint64) error {
ecs.logger.Info("checking for missed chainservice events", "startBlock", startBlock)
query := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(startBlock)),
ToBlock: nil, // For the current block number
Addresses: []common.Address{ecs.naAddress},
Topics: [][]common.Hash{topicsToWatch},
}

events, err := ecs.chain.FilterLogs(ecs.ctx, query)
if err != nil {
ecs.logger.Error("failed to retrieve old chain logs: %v", err)
return err
}
ecs.logger.Info("found missed events", "numEvents", len(events))

for _, event := range events {
heap.Push(&ecs.eventTracker.events, event)
}
return nil
}

// listenForErrors listens for errors on the error channel and attempts to handle them if they occur.
// TODO: Currently "handle" is panicking
func (ecs *EthChainService) listenForErrors(errChan <-chan error) {
Expand Down Expand Up @@ -200,10 +241,10 @@ func (ecs *EthChainService) SendTransaction(tx protocols.ChainTransaction) error
}
return nil
case protocols.WithdrawAllTransaction:
state := tx.SignedState.State()
signedState := tx.SignedState.State()
signatures := tx.SignedState.Signatures()
nitroFixedPart := NitroAdjudicator.INitroTypesFixedPart(NitroAdjudicator.ConvertFixedPart(state.FixedPart()))
nitroVariablePart := NitroAdjudicator.ConvertVariablePart(state.VariablePart())
nitroFixedPart := NitroAdjudicator.INitroTypesFixedPart(NitroAdjudicator.ConvertFixedPart(signedState.FixedPart()))
nitroVariablePart := NitroAdjudicator.ConvertVariablePart(signedState.VariablePart())
nitroSignatures := []NitroAdjudicator.INitroTypesSignature{NitroAdjudicator.ConvertSignature(signatures[0]), NitroAdjudicator.ConvertSignature(signatures[1])}

candidate := NitroAdjudicator.INitroTypesSignedVariablePart{
Expand Down Expand Up @@ -317,22 +358,16 @@ out:
errorChan <- fmt.Errorf("subscribeFilterLogs failed on resubscribe: %w", err)
break out
}
ecs.logger.Log(context.Background(), logging.LevelTrace, "resubscribed to filtered event logs")
ecs.logger.Debug("resubscribed to filtered event logs")

case <-time.After(RESUB_INTERVAL):
// Due to https://github.com/ethereum/go-ethereum/issues/23845 we can't rely on a long running subscription.
// We unsub here and recreate the subscription in the next iteration of the select.
eventSub.Unsubscribe()

case chainEvent := <-eventChan:
for _, topic := range topicsToWatch {
if chainEvent.Topics[0] == topic {
ecs.logger.Debug("queueing new chainEvent", "block-num", chainEvent.BlockNumber)

ecs.updateEventTracker(errorChan, nil, &chainEvent)
}
}

ecs.logger.Debug("queueing new chainEvent", "block-num", chainEvent.BlockNumber)
ecs.updateEventTracker(errorChan, nil, &chainEvent)
}
}
}
Expand Down Expand Up @@ -360,11 +395,11 @@ out:
errorChan <- fmt.Errorf("subscribeNewHead failed on resubscribe: %w", err)
break out
}
ecs.logger.Log(context.Background(), logging.LevelTrace, "resubscribed to new blocks")
ecs.logger.Debug("resubscribed to new blocks")

case newBlock := <-newBlockChan:
newBlockNum := newBlock.Number.Uint64()
ecs.logger.Log(context.Background(), logging.LevelTrace, "detected new block", "block-num", newBlockNum)
ecs.logger.Debug("detected new block", "block-num", newBlockNum)
ecs.updateEventTracker(errorChan, &newBlockNum, nil)
}
}
Expand All @@ -380,6 +415,7 @@ func (ecs *EthChainService) updateEventTracker(errorChan chan<- error, blockNumb
}
if chainEvent != nil {
heap.Push(&ecs.eventTracker.events, *chainEvent)
ecs.logger.Debug("event added to queue", "updated-queue-length", ecs.eventTracker.events.Len())
}

eventsToDispatch := []ethTypes.Log{}
Expand All @@ -404,6 +440,7 @@ func (ecs *EthChainService) subscribeForLogs() (chan error, ethereum.Subscriptio
// Subscribe to Adjudicator events
eventQuery := ethereum.FilterQuery{
Addresses: []common.Address{ecs.naAddress},
Topics: [][]common.Hash{topicsToWatch},
}
eventChan := make(chan ethTypes.Log)
eventSub, err := ecs.chain.SubscribeFilterLogs(ecs.ctx, eventQuery, eventChan)
Expand Down
2 changes: 1 addition & 1 deletion node/engine/chainservice/simulated_backend_chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type SimulatedBackendChainService struct {
func NewSimulatedBackendChainService(sim SimulatedChain, bindings Bindings,
txSigner *bind.TransactOpts,
) (ChainService, error) {
ethChainService, err := newEthChainService(sim,
ethChainService, err := newEthChainService(sim, 0,
bindings.Adjudicator.Contract,
bindings.Adjudicator.Address,
bindings.ConsensusApp.Address,
Expand Down
Loading

0 comments on commit f66d260

Please sign in to comment.