Skip to content

Commit

Permalink
Fix/requests spam (#2)
Browse files Browse the repository at this point in the history
* add fixed indexer prototype

* add fixed indexer runner

* add a check if the order/match exists

* fix order/match getting url

* refactor indexer

* fix ws subscription

* fix update events unpacking

* remove block range

* Revert "remove block range"

This reverts commit 420142f.

* consider block range

* make websocket usage optional

* use override_last_block cfg field instead of defaultLastBlock const
  • Loading branch information
freigeistig authored May 24, 2023
1 parent 1ad7458 commit 3b538c5
Show file tree
Hide file tree
Showing 10 changed files with 470 additions and 295 deletions.
4 changes: 3 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ network:
contract: "Swapica address"
chain_id: 5
index_period: 30s # period of contract calls for fetching events, should be > average_block_time
use_websocket: true
ws: "wss://goerli.infura.io/ws/v3/" # required to subscribe to blocks
override_last_block: "8931015"
# optional fields
block_range: 3000 # max difference between start and end block on eth_getLogs call, e.g. for Fuji Ankr RPC it's 3000
override_last_block: "8931015" # if set, updates the last block in DB and catches up from it; need to use on sync problems
request_timeout: 3s
18 changes: 16 additions & 2 deletions internal/config/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (

type Network struct {
*gobind.Swapica
ContractAddress common.Address
EthClient *ethclient.Client
WsClient *ethclient.Client
ChainID int64
IndexPeriod time.Duration
BlockRange uint64
OverrideLastBlock *uint64
OverrideLastBlock uint64
RequestTimeout time.Duration
}

Expand All @@ -31,10 +33,12 @@ func (c *config) Network() Network {
RPC string `fig:"rpc,required"`
Contract common.Address `fig:"contract,required"`
ChainID int64 `fig:"chain_id,required"`
UseWs bool `fig:"use_websocket,required"`
IndexPeriod time.Duration `fig:"index_period,required"`
BlockRange uint64 `fig:"block_range"`
OverrideLastBlock *uint64 `fig:"override_last_block"`
OverrideLastBlock uint64 `fig:"override_last_block"`
RequestTimeout time.Duration `fig:"request_timeout"`
WS string `fig:"ws,required"`
}

err := figure.Out(&cfg).
Expand All @@ -61,9 +65,19 @@ func (c *config) Network() Network {
cfg.RequestTimeout = defaultRequestTimeout
}

var wsCli *ethclient.Client
if cfg.UseWs {
wsCli, err = ethclient.Dial(cfg.WS)
if err != nil {
panic(errors.Wrap(err, "failed to connect to RPC provider"))
}
}

return Network{
Swapica: s,
ContractAddress: cfg.Contract,
EthClient: cli,
WsClient: wsCli,
ChainID: cfg.ChainID,
IndexPeriod: cfg.IndexPeriod,
BlockRange: cfg.BlockRange,
Expand Down
41 changes: 0 additions & 41 deletions internal/service/catch_up.go

This file was deleted.

106 changes: 106 additions & 0 deletions internal/service/events_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package service

import (
"context"
"math/big"
"strconv"

"github.com/Swapica/indexer-svc/internal/gobind"
"github.com/ethereum/go-ethereum/core/types"
"gitlab.com/distributed_lab/logan/v3"
"gitlab.com/distributed_lab/logan/v3/errors"
)

func (r *indexer) handleOrderCreated(ctx context.Context, eventName string, log *types.Log) error {
var event gobind.SwapicaOrderCreated

err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data)
if err != nil {
return errors.Wrap(err, "failed to unpack event", logan.F{
"event": eventName,
})
}

exists, err := r.orderExists(event.Order.OrderId.Int64())
if err != nil {
return errors.Wrap(err, "failed to check if order exists")
}
if exists {
return nil
}

if err = r.addOrder(ctx, event.Order, event.UseRelayer); err != nil {
return errors.Wrap(err, "failed to index order")
}

return nil
}

func (r *indexer) handleOrderUpdated(ctx context.Context, eventName string, log *types.Log) error {
var event gobind.SwapicaOrderUpdated

err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data)
if err != nil {
return errors.Wrap(err, "failed to unpack event", logan.F{
"event": eventName,
})
}

id, err := strconv.ParseInt(log.Topics[1].String(), 0, 64)
if err != nil {
return errors.Wrap(err, "failed to parse order id from topic")
}

if err = r.updateOrder(ctx, big.NewInt(id), event.Status); err != nil {
return errors.Wrap(err, "failed to index order")
}

return nil
}

func (r *indexer) handleMatchCreated(ctx context.Context, eventName string, log *types.Log) error {
var event gobind.SwapicaMatchCreated

err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data)
if err != nil {
return errors.Wrap(err, "failed to unpack event", logan.F{
"event": eventName,
})
}

exists, err := r.matchExists(event.Match.MatchId.Int64())
if err != nil {
return errors.Wrap(err, "failed to check if match exists")
}
if exists {
return nil
}

if err = r.addMatch(ctx, event.Match, event.UseRelayer); err != nil {
return errors.Wrap(err, "failed to add match order")
}

return nil
}

func (r *indexer) handleMatchUpdated(ctx context.Context, eventName string, log *types.Log) error {
var event gobind.SwapicaMatchUpdated

err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data)
if err != nil {
return errors.Wrap(err, "failed to unpack event", logan.F{
"event": eventName,
})
}

id, err := strconv.ParseInt(log.Topics[1].String(), 0, 64)
if err != nil {
return errors.Wrap(err, "failed to parse match id from topic")
}

if err = r.updateMatch(ctx, big.NewInt(id), event.Status); err != nil {
return errors.Wrap(err, "failed to update match order")
}

return nil
}
129 changes: 129 additions & 0 deletions internal/service/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package service

import (
"context"
"github.com/Swapica/indexer-svc/internal/gobind"
"github.com/Swapica/indexer-svc/internal/service/requests"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"gitlab.com/distributed_lab/json-api-connector/cerrors"
"gitlab.com/distributed_lab/logan/v3/errors"
"math/big"
"net/http"
"net/url"
"strconv"
)

var NotFound = errors.New("not found")

func (r *indexer) filters() ethereum.FilterQuery {
topics := make([]common.Hash, 0, len(r.handlers))
for eventName := range r.handlers {
event := r.swapicaAbi.Events[eventName]

topics = append(topics, event.ID)
}

filterQuery := ethereum.FilterQuery{
Addresses: []common.Address{
r.contractAddress,
},
Topics: [][]common.Hash{
topics,
},
}
return filterQuery
}

func (r *indexer) addOrder(ctx context.Context, o gobind.ISwapicaOrder, useRelayer bool) error {
log := r.log.WithField("order_id", o.OrderId.String())
log.Debug("adding new order")
body := requests.NewAddOrder(o, r.chainID, useRelayer)
u, _ := url.Parse("/orders")

err := r.collector.PostJSON(u, body, ctx, nil)
if isConflict(err) {
log.Warn("order already exists in collector DB, skipping it")
return nil
}

return errors.Wrap(err, "failed to add order into collector service")
}

func (r *indexer) updateOrder(ctx context.Context, id *big.Int, status gobind.ISwapicaOrderStatus) error {
r.log.WithField("order_id", id.String()).Debug("updating order status")
body := requests.NewUpdateOrder(id, status)
u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/orders")
err := r.collector.PatchJSON(u, body, ctx, nil)
return errors.Wrap(err, "failed to update order in collector service")
}

func (r *indexer) orderExists(id int64) (bool, error) {
u, err := url.Parse("/orders/" + strconv.FormatInt(id, 10))
if err != nil {
return false, errors.Wrap(err, "failed to parse url")
}

var order Order

err = r.collector.Get(u, &order)
if err != nil && err.Error() != NotFound.Error() {
return false, errors.Wrap(err, "failed to get order")
}

return id == order.OrderID, nil
}

func (r *indexer) addMatch(ctx context.Context, mo gobind.ISwapicaMatch, useRelayer bool) error {
log := r.log.WithField("match_id", mo.MatchId.String())
log.Debug("adding new match order")
body := requests.NewAddMatch(mo, r.chainID, useRelayer)
u, _ := url.Parse("/match_orders")

err := r.collector.PostJSON(u, body, ctx, nil)
if isConflict(err) {
log.Warn("match order already exists in collector DB, skipping it")
return nil
}

return errors.Wrap(err, "failed to add match order into collector service")
}

func (r *indexer) updateMatch(ctx context.Context, id *big.Int, state uint8) error {
r.log.WithField("match_id", id.String()).Debug("updating match state")
body := requests.NewUpdateMatch(id, state)
u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/match_orders")
err := r.collector.PatchJSON(u, body, ctx, nil)
return errors.Wrap(err, "failed to update match order in collector service")
}

func (r *indexer) matchExists(id int64) (bool, error) {
u, err := url.Parse("/match_orders/" + strconv.FormatInt(id, 10))
if err != nil {
return false, errors.Wrap(err, "failed to parse url")
}

var match Match

err = r.collector.Get(u, &match)
if err != nil && err.Error() != NotFound.Error() {
return false, errors.Wrap(err, "failed to get match")
}

return id == match.MatchID, nil
}

func (r *indexer) updateLastBlock(ctx context.Context, lastBlock uint64) error {
body := requests.NewUpdateBlock(lastBlock)
u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/block")
err := r.collector.PostJSON(u, body, ctx, nil)
if err != nil {
return errors.Wrap(err, "failed to save last block")
}
return nil
}

func isConflict(err error) bool {
c, ok := err.(cerrors.Error)
return ok && c.Status() == http.StatusConflict
}
Loading

0 comments on commit 3b538c5

Please sign in to comment.