Skip to content

Commit

Permalink
Add client.GetTransactionsUntil
Browse files Browse the repository at this point in the history
  • Loading branch information
swift1337 committed Sep 20, 2024
1 parent 7cd1c06 commit e0d806f
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 18 deletions.
90 changes: 76 additions & 14 deletions zetaclient/chains/ton/liteapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,21 @@ type Client struct {
*liteapi.Client
}

const pageSize = 200

// GetFirstTransaction scrolls through the transactions of the given account to find the first one.
// Note that it will fail in case of old transactions. Ideally, use archival node.
// Also returns the number of scrolled transactions for this account i.e. total transactions
// Note that it might fail w/o using an archival node. Also returns the number of
// scrolled transactions for this account i.e. total transactions
func (c *Client) GetFirstTransaction(ctx context.Context, acc ton.AccountID) (*ton.Transaction, int, error) {
const pageSize = 100

state, err := c.GetAccountState(ctx, acc)
lt, hash, err := c.getLastTranHash(ctx, acc)
if err != nil {
return nil, 0, errors.Wrap(err, "unable to get account state")
}

if state.Account.Status() != tlb.AccountActive {
return nil, 0, errors.New("account is not active")
return nil, 0, err
}

var tx *ton.Transaction

// logical time and hash of the last transaction
lt, hash, scrolled := state.LastTransLt, state.LastTransHash, 0
var (
tx *ton.Transaction
scrolled int
)

for {
hashBits := ton.Bits256(hash)
Expand Down Expand Up @@ -65,6 +61,72 @@ func (c *Client) GetFirstTransaction(ctx context.Context, acc ton.AccountID) (*t
return tx, scrolled, nil
}

// GetTransactionsUntil returns all transactions in a range of (from,to] where from is "lt && hash".
// - oldestLT && oldestHash tx is EXCLUDED from the result.
// - ordered by DESC
func (c *Client) GetTransactionsUntil(
ctx context.Context,
acc ton.AccountID,
oldestLT uint64,
oldestHash ton.Bits256,
) ([]ton.Transaction, error) {
lt, hash, err := c.getLastTranHash(ctx, acc)
if err != nil {
return nil, err
}

var result []ton.Transaction

for {
hashBits := ton.Bits256(hash)

txs, err := c.GetTransactions(ctx, pageSize, acc, lt, hashBits)
if err != nil {
return nil, errors.Wrapf(err, "unable to get transactions [lt %d, hash %s]", lt, hashBits.Hex())
}

if len(txs) == 0 {
break
}

for i := range txs {
found := txs[i].Lt == oldestLT && txs[i].Hash() == tlb.Bits256(oldestHash)
if !found {
continue
}

// early exit
result = append(result, txs[:i]...)

return result, nil
}

// otherwise, append all page results
result = append(result, txs...)

// prepare pagination params for the next page
oldestIndex := len(txs) - 1

lt, hash = txs[oldestIndex].PrevTransLt, txs[oldestIndex].PrevTransHash
}

return result, nil
}

// getLastTranHash returns logical time and hash of the last transaction
func (c *Client) getLastTranHash(ctx context.Context, acc ton.AccountID) (uint64, tlb.Bits256, error) {
state, err := c.GetAccountState(ctx, acc)
if err != nil {
return 0, tlb.Bits256{}, errors.Wrap(err, "unable to get account state")
}

if state.Account.Status() != tlb.AccountActive {
return 0, tlb.Bits256{}, errors.New("account is not active")
}

return state.LastTransLt, state.LastTransHash, nil
}

func TransactionHashToString(lt uint64, hash ton.Bits256) string {
return fmt.Sprintf("%d:%s", lt, hash.Hex())
}
Expand Down
71 changes: 71 additions & 0 deletions zetaclient/chains/ton/liteapi/client_live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package liteapi

import (
"context"
"encoding/json"
"fmt"
"net/http"
"testing"
"time"
Expand All @@ -10,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tonkeeper/tongo/config"
"github.com/tonkeeper/tongo/liteapi"
"github.com/tonkeeper/tongo/tlb"
"github.com/tonkeeper/tongo/ton"
"github.com/zeta-chain/node/zetaclient/common"
)
Expand Down Expand Up @@ -52,6 +55,42 @@ func TestClient(t *testing.T) {

t.Logf("Time taken %s; transactions scanned: %d", finish.String(), scrolled)
})

t.Run("GetTransactionsUntil", func(t *testing.T) {
// ARRANGE
// Given sample account id (a dev wallet)
// https://tonviewer.com/UQCVlMcZ7EyV9maDsvscoLCd5KQfb7CHukyNJluWpMzlD0vr?section=transactions
accountID, err := ton.ParseAccountID("UQCVlMcZ7EyV9maDsvscoLCd5KQfb7CHukyNJluWpMzlD0vr")
require.NoError(t, err)

const getUntilLT = uint64(48645164000001)
const getUntilHash = `2e107215e634bbc3492bdf4b1466d59432623295072f59ab526d15737caa9531`

// as of 2024-09-20
const expectedTX = 3

var hash ton.Bits256
require.NoError(t, hash.FromHex(getUntilHash))

start := time.Now()

// ACT
// https://tonviewer.com/UQCVlMcZ7EyV9maDsvscoLCd5KQfb7CHukyNJluWpMzlD0vr?section=transactions
txs, err := client.GetTransactionsUntil(ctx, accountID, getUntilLT, hash)

finish := time.Since(start)

// ASSERT
require.NoError(t, err)

t.Logf("Time taken %s; transactions fetched: %d", finish.String(), len(txs))
for _, tx := range txs {
printTx(t, tx)
}

mustContainTX(t, txs, "a6672a0e80193c1f705ef1cf45a5883441b8252523b1d08f7656c80e400c74a8")
assert.GreaterOrEqual(t, len(txs), expectedTX)
})
}

func mustCreateClient(t *testing.T) *liteapi.Client {
Expand Down Expand Up @@ -80,3 +119,35 @@ func mustFetchConfig(t *testing.T) config.GlobalConfigurationFile {

return *conf
}

func mustContainTX(t *testing.T, txs []ton.Transaction, hash string) {
var h ton.Bits256
require.NoError(t, h.FromHex(hash))

for _, tx := range txs {
if tx.Hash() == tlb.Bits256(h) {
return
}
}

t.Fatalf("transaction %q not found", hash)
}

func printTx(t *testing.T, tx ton.Transaction) {
b, err := json.MarshalIndent(simplifyTx(tx), "", " ")
require.NoError(t, err)

t.Logf("TX %s", string(b))
}

func simplifyTx(tx ton.Transaction) map[string]any {
return map[string]any{
"block": fmt.Sprintf("shard: %d, seqno: %d", tx.BlockID.Shard, tx.BlockID.Seqno),
"hash": tx.Hash().Hex(),
"logicalTime": tx.Lt,
"unixTime": time.Unix(int64(tx.Transaction.Now), 0).UTC().String(),
"outMessagesCount": tx.OutMsgCnt,
// "inMessageInfo": tx.Msgs.InMsg.Value.Value.Info.IntMsgInfo,
// "outMessages": tx.Msgs.OutMsgs,
}
}
162 changes: 162 additions & 0 deletions zetaclient/chains/ton/observer/inbound.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package observer

import (
"context"
"fmt"
"slices"

"cosmossdk.io/errors"
"github.com/rs/zerolog"
"github.com/tonkeeper/tongo/ton"

"github.com/zeta-chain/node/pkg/ticker"
"github.com/zeta-chain/node/zetaclient/chains/ton/liteapi"
zctx "github.com/zeta-chain/node/zetaclient/context"
)

const (
// MaxTransactionsPerTick is the maximum number of transactions to process on a ticker
MaxTransactionsPerTick = 100
)

func (ob *Observer) watchInbound(ctx context.Context) error {
app, err := zctx.FromContext(ctx)
if err != nil {
return err
}

var (
chainID = ob.Chain().ChainId
initialInterval = ticker.SecondsFromUint64(ob.ChainParams().InboundTicker)
sampledLogger = ob.Logger().Inbound.Sample(&zerolog.BasicSampler{N: 10})
)

ob.Logger().Inbound.Info().Msgf("WatchInbound started for chain %d", chainID)

task := func(ctx context.Context, t *ticker.Ticker) error {
if !app.IsInboundObservationEnabled() {
sampledLogger.Info().Msgf("WatchInbound: inbound observation is disabled for chain %d", chainID)
return nil
}

if err := ob.observeInbound(ctx, app); err != nil {
ob.Logger().Inbound.Err(err).Msg("WatchInbound: observeInbound error")
}

newInterval := ticker.SecondsFromUint64(ob.ChainParams().InboundTicker)
t.SetInterval(newInterval)

return nil
}

return ticker.Run(
ctx,
initialInterval,
task,
ticker.WithStopChan(ob.StopChannel()),
ticker.WithLogger(ob.Logger().Inbound, "WatchInbound"),
)
}

// Flow:
// - [x] Ensure last scanned transaction is set
// - [x] Get all transaction between [lastScannedTx; now]
// - [ ] Filter only valid and inbound transactions
// - [ ] For each transaction (ordered by *ASC*)
// - [ ] Construct crosschain cosmos message
// - [ ] Vote
// - [ ] Save last scanned tx
func (ob *Observer) observeInbound(ctx context.Context, _ *zctx.AppContext) error {
if err := ob.ensureLastScannedTX(ctx); err != nil {
return errors.Wrap(err, "unable to ensure last scanned tx")
}

lt, hashBits, err := liteapi.TransactionHashFromString(ob.LastTxScanned())
if err != nil {
return errors.Wrapf(err, "unable to parse last scanned tx %q", ob.LastTxScanned())
}

txs, err := ob.client.GetTransactionsUntil(ctx, ob.gatewayID, lt, hashBits)
if err != nil {
return errors.Wrap(err, "unable to get transactions")
}

// Process from oldest to latest (ASC)
slices.Reverse(txs)

switch {
case len(txs) == 0:
// noop
return nil
case len(txs) > MaxTransactionsPerTick:
ob.Logger().Inbound.Info().
Msgf("ObserveInbound: got %d transactions. Taking first %d", len(txs), MaxTransactionsPerTick)

txs = txs[:MaxTransactionsPerTick]
default:
ob.Logger().Inbound.Info().Msgf("ObserveInbound: got %d transactions", len(txs))
}

// todo deploy sample GW to testnet
// todo send some TON and test

// todo FilterInboundEvent

for _, tx := range txs {
fmt.Println("TON TX", tx)
}

/*
// loop signature from oldest to latest to filter inbound events
for i := len(signatures) - 1; i >= 0; i-- {
sig := signatures[i]
sigString := sig.Signature.String()
// process successfully signature only
if sig.Err == nil {
txResult, err := ob.solClient.GetTransaction(ctx, sig.Signature, &rpc.GetTransactionOpts{})
if err != nil {
// we have to re-scan this signature on next ticker
return errors.Wrapf(err, "error GetTransaction for chain %d sig %s", chainID, sigString)
}
// filter inbound events and vote
err = ob.FilterInboundEventsAndVote(ctx, txResult)
if err != nil {
// we have to re-scan this signature on next ticker
return errors.Wrapf(err, "error FilterInboundEventAndVote for chain %d sig %s", chainID, sigString)
}
}
// signature scanned; save last scanned signature to both memory and db, ignore db error
if err := ob.SaveLastTxScanned(sigString, sig.Slot); err != nil {
ob.Logger().
Inbound.Error().
Err(err).
Msgf("ObserveInbound: error saving last sig %s for chain %d", sigString, chainID)
}
ob.Logger().
Inbound.Info().
Msgf("ObserveInbound: last scanned sig is %s for chain %d in slot %d", sigString, chainID, sig.Slot)
*/

return nil
}

func (ob *Observer) ensureLastScannedTX(ctx context.Context) error {
// noop
if ob.LastTxScanned() != "" {
return nil
}

tx, _, err := ob.client.GetFirstTransaction(ctx, ob.gatewayID)
if err != nil {
return err
}

txHash := liteapi.TransactionHashToString(tx.Lt, ton.Bits256(tx.Hash()))

ob.WithLastTxScanned(txHash)

return ob.WriteLastTxScannedToDB(txHash)
}
Loading

0 comments on commit e0d806f

Please sign in to comment.