From 3b538c50d248368745760a70423dc23961f5cd50 Mon Sep 17 00:00:00 2001 From: Roman <96019015+RomanHarazha@users.noreply.github.com> Date: Wed, 24 May 2023 13:23:49 +0300 Subject: [PATCH] Fix/requests spam (#2) * add fixed indexer prototype * add fixed indexer runner * add a check if the order/match exists * fix order/match getting url * refactor indexer * fix ws subscription * fix update events unpacking * remove block range * Revert "remove block range" This reverts commit 420142f7297f126cfc7857ac36ceec4e9eb04307. * consider block range * make websocket usage optional * use override_last_block cfg field instead of defaultLastBlock const --- config.yaml | 4 +- internal/config/network.go | 18 +- internal/service/catch_up.go | 41 ----- internal/service/events_handlers.go | 106 ++++++++++++ internal/service/helpers.go | 129 +++++++++++++++ internal/service/indexer.go | 247 +++++++++++++++++++--------- internal/service/main.go | 31 ++-- internal/service/match_orders.go | 75 --------- internal/service/orders.go | 75 --------- internal/service/types.go | 39 +++++ 10 files changed, 470 insertions(+), 295 deletions(-) delete mode 100644 internal/service/catch_up.go create mode 100644 internal/service/events_handlers.go create mode 100644 internal/service/helpers.go delete mode 100644 internal/service/match_orders.go delete mode 100644 internal/service/orders.go create mode 100644 internal/service/types.go diff --git a/config.yaml b/config.yaml index e93787b..7a34e33 100644 --- a/config.yaml +++ b/config.yaml @@ -11,7 +11,9 @@ network: contract: "Swapica address" chain_id: 5 index_period: 30s # period of contract calls for fetching events, should be > average_block_time + use_websocket: true + ws: "wss://goerli.infura.io/ws/v3/" # required to subscribe to blocks + override_last_block: "8931015" # optional fields block_range: 3000 # max difference between start and end block on eth_getLogs call, e.g. for Fuji Ankr RPC it's 3000 - override_last_block: "8931015" # if set, updates the last block in DB and catches up from it; need to use on sync problems request_timeout: 3s diff --git a/internal/config/network.go b/internal/config/network.go index ce774cd..497f64f 100644 --- a/internal/config/network.go +++ b/internal/config/network.go @@ -14,11 +14,13 @@ import ( type Network struct { *gobind.Swapica + ContractAddress common.Address EthClient *ethclient.Client + WsClient *ethclient.Client ChainID int64 IndexPeriod time.Duration BlockRange uint64 - OverrideLastBlock *uint64 + OverrideLastBlock uint64 RequestTimeout time.Duration } @@ -31,10 +33,12 @@ func (c *config) Network() Network { RPC string `fig:"rpc,required"` Contract common.Address `fig:"contract,required"` ChainID int64 `fig:"chain_id,required"` + UseWs bool `fig:"use_websocket,required"` IndexPeriod time.Duration `fig:"index_period,required"` BlockRange uint64 `fig:"block_range"` - OverrideLastBlock *uint64 `fig:"override_last_block"` + OverrideLastBlock uint64 `fig:"override_last_block"` RequestTimeout time.Duration `fig:"request_timeout"` + WS string `fig:"ws,required"` } err := figure.Out(&cfg). @@ -61,9 +65,19 @@ func (c *config) Network() Network { cfg.RequestTimeout = defaultRequestTimeout } + var wsCli *ethclient.Client + if cfg.UseWs { + wsCli, err = ethclient.Dial(cfg.WS) + if err != nil { + panic(errors.Wrap(err, "failed to connect to RPC provider")) + } + } + return Network{ Swapica: s, + ContractAddress: cfg.Contract, EthClient: cli, + WsClient: wsCli, ChainID: cfg.ChainID, IndexPeriod: cfg.IndexPeriod, BlockRange: cfg.BlockRange, diff --git a/internal/service/catch_up.go b/internal/service/catch_up.go deleted file mode 100644 index c24022c..0000000 --- a/internal/service/catch_up.go +++ /dev/null @@ -1,41 +0,0 @@ -package service - -import ( - "context" - - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "gitlab.com/distributed_lab/logan/v3/errors" -) - -func (r *indexer) catchUp(ctx context.Context, currentBlock uint64) error { - defer func() { r.updateLastBlock(ctx) }() - if currentBlock == r.lastBlock { - r.log.Debug("last block is up to date, no need for catch-up") - return nil - } - - r.log.Infof("catching up the network from the block number %d", r.lastBlock) - if r.blockRange == 0 { - var err error - opts := &bind.FilterOpts{Start: r.lastBlock + 1} - err = r.handleEvents(ctx, opts) - return errors.Wrap(err, "failed to handle events") - } - - // +1, because for eth_getLogs events with blockNumber == fromBlock or toBlock are included, so intersection is avoided - for start := r.lastBlock + 1; start <= currentBlock; start += r.blockRange + 1 { - end := start + r.blockRange - if end > currentBlock { - end = currentBlock - } - - opts := &bind.FilterOpts{Start: start, End: &end} - if err := r.handleEvents(ctx, opts); err != nil { - return errors.Wrap(err, "failed to handle events") - } - } - - r.lastBlock = currentBlock - r.lastBlockOutdated = true - return nil -} diff --git a/internal/service/events_handlers.go b/internal/service/events_handlers.go new file mode 100644 index 0000000..df11419 --- /dev/null +++ b/internal/service/events_handlers.go @@ -0,0 +1,106 @@ +package service + +import ( + "context" + "math/big" + "strconv" + + "github.com/Swapica/indexer-svc/internal/gobind" + "github.com/ethereum/go-ethereum/core/types" + "gitlab.com/distributed_lab/logan/v3" + "gitlab.com/distributed_lab/logan/v3/errors" +) + +func (r *indexer) handleOrderCreated(ctx context.Context, eventName string, log *types.Log) error { + var event gobind.SwapicaOrderCreated + + err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data) + if err != nil { + return errors.Wrap(err, "failed to unpack event", logan.F{ + "event": eventName, + }) + } + + exists, err := r.orderExists(event.Order.OrderId.Int64()) + if err != nil { + return errors.Wrap(err, "failed to check if order exists") + } + if exists { + return nil + } + + if err = r.addOrder(ctx, event.Order, event.UseRelayer); err != nil { + return errors.Wrap(err, "failed to index order") + } + + return nil +} + +func (r *indexer) handleOrderUpdated(ctx context.Context, eventName string, log *types.Log) error { + var event gobind.SwapicaOrderUpdated + + err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data) + if err != nil { + return errors.Wrap(err, "failed to unpack event", logan.F{ + "event": eventName, + }) + } + + id, err := strconv.ParseInt(log.Topics[1].String(), 0, 64) + if err != nil { + return errors.Wrap(err, "failed to parse order id from topic") + } + + if err = r.updateOrder(ctx, big.NewInt(id), event.Status); err != nil { + return errors.Wrap(err, "failed to index order") + } + + return nil +} + +func (r *indexer) handleMatchCreated(ctx context.Context, eventName string, log *types.Log) error { + var event gobind.SwapicaMatchCreated + + err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data) + if err != nil { + return errors.Wrap(err, "failed to unpack event", logan.F{ + "event": eventName, + }) + } + + exists, err := r.matchExists(event.Match.MatchId.Int64()) + if err != nil { + return errors.Wrap(err, "failed to check if match exists") + } + if exists { + return nil + } + + if err = r.addMatch(ctx, event.Match, event.UseRelayer); err != nil { + return errors.Wrap(err, "failed to add match order") + } + + return nil +} + +func (r *indexer) handleMatchUpdated(ctx context.Context, eventName string, log *types.Log) error { + var event gobind.SwapicaMatchUpdated + + err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data) + if err != nil { + return errors.Wrap(err, "failed to unpack event", logan.F{ + "event": eventName, + }) + } + + id, err := strconv.ParseInt(log.Topics[1].String(), 0, 64) + if err != nil { + return errors.Wrap(err, "failed to parse match id from topic") + } + + if err = r.updateMatch(ctx, big.NewInt(id), event.Status); err != nil { + return errors.Wrap(err, "failed to update match order") + } + + return nil +} diff --git a/internal/service/helpers.go b/internal/service/helpers.go new file mode 100644 index 0000000..58234ae --- /dev/null +++ b/internal/service/helpers.go @@ -0,0 +1,129 @@ +package service + +import ( + "context" + "github.com/Swapica/indexer-svc/internal/gobind" + "github.com/Swapica/indexer-svc/internal/service/requests" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "gitlab.com/distributed_lab/json-api-connector/cerrors" + "gitlab.com/distributed_lab/logan/v3/errors" + "math/big" + "net/http" + "net/url" + "strconv" +) + +var NotFound = errors.New("not found") + +func (r *indexer) filters() ethereum.FilterQuery { + topics := make([]common.Hash, 0, len(r.handlers)) + for eventName := range r.handlers { + event := r.swapicaAbi.Events[eventName] + + topics = append(topics, event.ID) + } + + filterQuery := ethereum.FilterQuery{ + Addresses: []common.Address{ + r.contractAddress, + }, + Topics: [][]common.Hash{ + topics, + }, + } + return filterQuery +} + +func (r *indexer) addOrder(ctx context.Context, o gobind.ISwapicaOrder, useRelayer bool) error { + log := r.log.WithField("order_id", o.OrderId.String()) + log.Debug("adding new order") + body := requests.NewAddOrder(o, r.chainID, useRelayer) + u, _ := url.Parse("/orders") + + err := r.collector.PostJSON(u, body, ctx, nil) + if isConflict(err) { + log.Warn("order already exists in collector DB, skipping it") + return nil + } + + return errors.Wrap(err, "failed to add order into collector service") +} + +func (r *indexer) updateOrder(ctx context.Context, id *big.Int, status gobind.ISwapicaOrderStatus) error { + r.log.WithField("order_id", id.String()).Debug("updating order status") + body := requests.NewUpdateOrder(id, status) + u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/orders") + err := r.collector.PatchJSON(u, body, ctx, nil) + return errors.Wrap(err, "failed to update order in collector service") +} + +func (r *indexer) orderExists(id int64) (bool, error) { + u, err := url.Parse("/orders/" + strconv.FormatInt(id, 10)) + if err != nil { + return false, errors.Wrap(err, "failed to parse url") + } + + var order Order + + err = r.collector.Get(u, &order) + if err != nil && err.Error() != NotFound.Error() { + return false, errors.Wrap(err, "failed to get order") + } + + return id == order.OrderID, nil +} + +func (r *indexer) addMatch(ctx context.Context, mo gobind.ISwapicaMatch, useRelayer bool) error { + log := r.log.WithField("match_id", mo.MatchId.String()) + log.Debug("adding new match order") + body := requests.NewAddMatch(mo, r.chainID, useRelayer) + u, _ := url.Parse("/match_orders") + + err := r.collector.PostJSON(u, body, ctx, nil) + if isConflict(err) { + log.Warn("match order already exists in collector DB, skipping it") + return nil + } + + return errors.Wrap(err, "failed to add match order into collector service") +} + +func (r *indexer) updateMatch(ctx context.Context, id *big.Int, state uint8) error { + r.log.WithField("match_id", id.String()).Debug("updating match state") + body := requests.NewUpdateMatch(id, state) + u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/match_orders") + err := r.collector.PatchJSON(u, body, ctx, nil) + return errors.Wrap(err, "failed to update match order in collector service") +} + +func (r *indexer) matchExists(id int64) (bool, error) { + u, err := url.Parse("/match_orders/" + strconv.FormatInt(id, 10)) + if err != nil { + return false, errors.Wrap(err, "failed to parse url") + } + + var match Match + + err = r.collector.Get(u, &match) + if err != nil && err.Error() != NotFound.Error() { + return false, errors.Wrap(err, "failed to get match") + } + + return id == match.MatchID, nil +} + +func (r *indexer) updateLastBlock(ctx context.Context, lastBlock uint64) error { + body := requests.NewUpdateBlock(lastBlock) + u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/block") + err := r.collector.PostJSON(u, body, ctx, nil) + if err != nil { + return errors.Wrap(err, "failed to save last block") + } + return nil +} + +func isConflict(err error) bool { + c, ok := err.(cerrors.Error) + return ok && c.Status() == http.StatusConflict +} diff --git a/internal/service/indexer.go b/internal/service/indexer.go index 7d2e6d6..5418ad5 100644 --- a/internal/service/indexer.go +++ b/internal/service/indexer.go @@ -2,18 +2,18 @@ package service import ( "context" - "net/http" - "net/url" - "strconv" + "math/big" + "strings" "time" "github.com/Swapica/indexer-svc/internal/config" "github.com/Swapica/indexer-svc/internal/gobind" - "github.com/Swapica/indexer-svc/internal/service/requests" - "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" jsonapi "gitlab.com/distributed_lab/json-api-connector" - "gitlab.com/distributed_lab/json-api-connector/cerrors" "gitlab.com/distributed_lab/logan/v3" "gitlab.com/distributed_lab/logan/v3/errors" ) @@ -23,126 +23,209 @@ type indexer struct { swapica *gobind.Swapica collector *jsonapi.Connector ethClient *ethclient.Client + wsClient *ethclient.Client chainID int64 blockRange uint64 lastBlock uint64 lastBlockOutdated bool requestTimeout time.Duration + handlers map[string]Handler + swapicaAbi abi.ABI + contractAddress common.Address + indexPeriod time.Duration } +type Handler func(ctx context.Context, eventName string, log *types.Log) error + func newIndexer(c config.Config, lastBlock uint64) indexer { - return indexer{ - log: c.Log(), - swapica: c.Network().Swapica, - collector: c.Collector(), - ethClient: c.Network().EthClient, - chainID: c.Network().ChainID, - blockRange: c.Network().BlockRange, - lastBlock: lastBlock, - requestTimeout: c.Network().RequestTimeout, + swapicaAbi, err := abi.JSON(strings.NewReader(gobind.SwapicaMetaData.ABI)) + if err != nil { + panic(errors.Wrap(err, "failed to get ABI")) + } + + indexerInstance := indexer{ + log: c.Log(), + swapica: c.Network().Swapica, + collector: c.Collector(), + ethClient: c.Network().EthClient, + wsClient: c.Network().WsClient, + chainID: c.Network().ChainID, + blockRange: c.Network().BlockRange, + lastBlock: lastBlock, + requestTimeout: c.Network().RequestTimeout, + swapicaAbi: swapicaAbi, + contractAddress: c.Network().ContractAddress, + indexPeriod: c.Network().IndexPeriod, } + + indexerInstance.handlers = map[string]Handler{ + "OrderCreated": indexerInstance.handleOrderCreated, + "OrderUpdated": indexerInstance.handleOrderUpdated, + "MatchCreated": indexerInstance.handleMatchCreated, + "MatchUpdated": indexerInstance.handleMatchUpdated, + } + + return indexerInstance } func (r *indexer) run(ctx context.Context) error { - var err error - currentBlock := r.lastBlock - opts := &bind.FilterOpts{Start: r.lastBlock + 1} + lastChainBlock, err := r.ethClient.BlockNumber(ctx) + if err != nil { + return errors.Wrap(err, "failed to get last block number") + } - defer func() { r.updateLastBlock(ctx) }() + newEvents := make(chan types.Log, 1024) + sub, err := r.wsClient.SubscribeFilterLogs(ctx, r.filters(), newEvents) + if err != nil { + return errors.Wrap(err, "failed to subscribe to logs") + } + defer sub.Unsubscribe() - // For Infura nodes it is often no need to limit block range, therefore it's better to save requests - if r.blockRange != 0 { - if currentBlock, err = r.getNetworkLatestBlock(ctx); err != nil { - return errors.Wrap(err, "failed to get the latest block from the network") + if err := r.handleUnprocessedEvents(ctx, lastChainBlock); err != nil { + return errors.Wrap(err, "failed to handle unprocessed events") + } + + if err := r.waitForEvents(ctx, sub, newEvents); err != nil { + return errors.Wrap(err, "failed to wait for unprocessed events") + } + + return nil +} + +func (r *indexer) runWithoutWs(ctx context.Context) error { + lastChainBlock, err := r.ethClient.BlockNumber(ctx) + if err != nil { + return errors.Wrap(err, "failed to get last block number") + } + + if err := r.handleUnprocessedEvents(ctx, lastChainBlock); err != nil { + return errors.Wrap(err, "failed to handle unprocessed events") + } + + ticker := time.NewTicker(r.indexPeriod) + filters := r.filters() + + for range ticker.C { + lastChainBlock, err = r.ethClient.BlockNumber(ctx) + if err != nil { + return errors.Wrap(err, "failed to get last block number") } - log := r.log.WithField("current_block", currentBlock) - if currentBlock == r.lastBlock { - log.Info("current block is equal to the saved one, index_period might be too short; skipping iteration") - return nil + filters.FromBlock = big.NewInt(int64(r.lastBlock) + 1) + filters.ToBlock = big.NewInt(int64(lastChainBlock)) + + logs, err := r.ethClient.FilterLogs(ctx, filters) + if err != nil { + return errors.Wrap(err, "failed to get filter logs") } - if currentBlock > r.lastBlock+r.blockRange+1 { - log.Info("block range is too wide, step-by-step catch-up required") - err = r.catchUp(ctx, currentBlock) - if err != nil { - return errors.Wrap(err, "failed to catch up the network") + for _, log := range logs { + if err := r.handleEvent(ctx, log); err != nil { + return errors.Wrap(err, "failed to handle event") } - return nil } - r.lastBlock = currentBlock - } - if err = r.handleEvents(ctx, opts); err != nil { - return errors.Wrap(err, "failed to handle events") + r.lastBlock = lastChainBlock } - r.lastBlockOutdated = r.lastBlockOutdated || r.lastBlock != currentBlock return nil } -func (r *indexer) handleEvents(ctx context.Context, opts *bind.FilterOpts) error { - toBlock := "latest" - if opts.End != nil { - toBlock = strconv.FormatUint(*opts.End, 10) - } - r.log.Debugf("filtering events with fromBlock=%d and toBlock=%s", opts.Start, toBlock) +func (r *indexer) handleUnprocessedEvents( + ctx context.Context, lastChainBlock uint64, +) error { + filters := r.filters() - child, cancel := context.WithTimeout(ctx, r.requestTimeout) - defer cancel() - opts.Context = child + if r.blockRange != 0 { + for start := r.lastBlock + 1; start <= lastChainBlock; start += r.blockRange + 1 { + end := start + r.blockRange + if end > lastChainBlock { + end = lastChainBlock + } + + filters.FromBlock = new(big.Int).SetUint64(start) + filters.ToBlock = new(big.Int).SetUint64(end) - if err := r.handleCreatedOrders(ctx, opts); err != nil { - return errors.Wrap(err, "failed to handle created orders") + logs, err := r.ethClient.FilterLogs(ctx, filters) + if err != nil { + return errors.Wrap(err, "failed to get filter logs") + } + + for _, log := range logs { + if err := r.handleEvent(ctx, log); err != nil { + return errors.Wrap(err, "failed to handle event") + } + } + } + r.lastBlock = lastChainBlock + return nil } - if err := r.handleCreatedMatches(ctx, opts); err != nil { - return errors.Wrap(err, "failed to handle created match orders") + filters.FromBlock = new(big.Int).SetUint64(r.lastBlock) + filters.ToBlock = new(big.Int).SetUint64(lastChainBlock + 1) + + logs, err := r.ethClient.FilterLogs(ctx, filters) + if err != nil { + return errors.Wrap(err, "failed to get filter logs") } - if err := r.handleUpdatedOrders(ctx, opts); err != nil { - return errors.Wrap(err, "failed to handle updated orders") + for _, log := range logs { + if err := r.handleEvent(ctx, log); err != nil { + return errors.Wrap(err, "failed to handle event") + } } + r.lastBlock = lastChainBlock + + return nil +} - err := r.handleUpdatedMatches(ctx, opts) - return errors.Wrap(err, "failed to handle updated match orders") +func (r *indexer) waitForEvents( + ctx context.Context, sub ethereum.Subscription, events <-chan types.Log, +) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-sub.Err(): + return errors.Wrap(err, "log subscription failed") + case event := <-events: + if err := r.handleEvent(ctx, event); err != nil { + return errors.Wrap(err, "failed to handle event") + } + r.lastBlock = event.BlockNumber + } + } } -func (r *indexer) getNetworkLatestBlock(ctx context.Context) (uint64, error) { - child, cancel := context.WithTimeout(ctx, r.requestTimeout) - defer cancel() +func (r *indexer) handleEvent(ctx context.Context, log types.Log) error { + topic := log.Topics[0] // First topic must be a hashed signature of the event - n, err := r.ethClient.BlockNumber(child) + event, err := r.swapicaAbi.EventByID(topic) if err != nil { - return n, errors.Wrap(err, "failed to get eth_blockNumber") - } - if n < r.lastBlock { - return n, errors.Errorf("given saved_last_block=%d is greater than network_latest_block=%d", r.lastBlock, n) + return errors.Wrap(err, "failed to get event by topic", logan.F{ + "topic": topic.Hex(), + }) } - return n, nil -} + handler, ok := r.handlers[event.Name] + if !ok { + return errors.From(errors.New("no handler for such event name"), + logan.F{ + "event_name": event.Name, + }) + } -func (r *indexer) updateLastBlock(ctx context.Context) { - log := r.log.WithField("last_block", r.lastBlock) - if !r.lastBlockOutdated { - log.Debug("no updates of the last block") - return + if err := handler(ctx, event.Name, &log); err != nil { + return errors.Wrap(err, "handling of event failed", logan.F{ + "topic": topic.Hex(), + "event_name": event.Name, + }) } - body := requests.NewUpdateBlock(r.lastBlock) - u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/block") - err := r.collector.PostJSON(u, body, ctx, nil) - if err != nil { - log.WithError(err).Error("failed to save last block") - return + if err := r.updateLastBlock(ctx, log.BlockNumber); err != nil { + return errors.Wrap(err, "failed to update last block") } - r.lastBlockOutdated = false - log.Debug("successfully saved last block") -} -func isConflict(err error) bool { - c, ok := err.(cerrors.Error) - return ok && c.Status() == http.StatusConflict + return nil } diff --git a/internal/service/main.go b/internal/service/main.go index fd07a42..2e2ba64 100644 --- a/internal/service/main.go +++ b/internal/service/main.go @@ -20,9 +20,6 @@ type service struct { cfg config.Config } -const ethBlockTime = 13 * time.Second -const defaultLastBlock = 8483496 // Goerli's latest block of tx with no events at this moment - func (s *service) run() error { s.log.Info("Service started") @@ -31,20 +28,20 @@ func (s *service) run() error { return errors.Wrap(err, "failed to get last block") } - ctx := context.Background() runner := newIndexer(s.cfg, last) - period := s.cfg.Network().IndexPeriod - latest, err := runner.getNetworkLatestBlock(ctx) - if err != nil { - return errors.Wrap(err, "failed to get block for initial catch-up") - } - if err = runner.catchUp(ctx, latest); err != nil { - return errors.Wrap(err, "failed to perform network initial catch-up") + if s.cfg.Network().WsClient != nil { + running.WithBackOff( + context.Background(), s.log, "indexer", + runner.run, + s.cfg.Network().IndexPeriod, s.cfg.Network().IndexPeriod, 10*time.Minute) + } else { + running.WithBackOff( + context.Background(), s.log, "indexer", + runner.runWithoutWs, + s.cfg.Network().IndexPeriod, s.cfg.Network().IndexPeriod, 10*time.Minute) } - time.Sleep(period) // prevent log about short period - running.WithBackOff(ctx, s.log, "indexer", runner.run, period, ethBlockTime, 10*time.Minute) return nil } @@ -62,19 +59,15 @@ func Run(cfg config.Config) { } func (s *service) getLastBlock() (uint64, error) { - last := s.cfg.Network().OverrideLastBlock - if last != nil { - return *last, nil - } // No error can occur when parsing int64 + const_string path, _ := url.Parse(strconv.FormatInt(s.cfg.Network().ChainID, 10) + "/block") var resp resources.BlockResponse if err := s.cfg.Collector().Get(path, &resp); err != nil { if err, ok := err.(cerrors.Error); ok && err.Status() == http.StatusNotFound { - s.log.WithField("default_last_block", defaultLastBlock). + s.log.WithField("default_last_block", s.cfg.Network().OverrideLastBlock). Warn("last block should be set either in orders DB or in override_last_block config field, using default") - return defaultLastBlock, nil + return s.cfg.Network().OverrideLastBlock, nil } return 0, errors.Wrap(err, "failed to get last block from collector") } diff --git a/internal/service/match_orders.go b/internal/service/match_orders.go deleted file mode 100644 index dc3b277..0000000 --- a/internal/service/match_orders.go +++ /dev/null @@ -1,75 +0,0 @@ -package service - -import ( - "context" - "math/big" - "net/url" - "strconv" - - "github.com/Swapica/indexer-svc/internal/gobind" - "github.com/Swapica/indexer-svc/internal/service/requests" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "gitlab.com/distributed_lab/logan/v3/errors" -) - -func (r *indexer) handleCreatedMatches(ctx context.Context, opts *bind.FilterOpts) error { - it, err := r.swapica.FilterMatchCreated(opts) - if err != nil { - return errors.Wrap(err, "failed to filter MatchCreated events") - } - for it.Next() { - if err = r.addMatch(ctx, it.Event.Match, it.Event.UseRelayer); err != nil { - return errors.Wrap(err, "failed to add match order") - } - - if b := it.Event.Raw.BlockNumber + 1; b > r.lastBlock { - r.lastBlock = b - r.lastBlockOutdated = true - } - } - - return errors.Wrap(it.Error(), "error occurred while iterating over MatchCreated events") -} - -func (r *indexer) handleUpdatedMatches(ctx context.Context, opts *bind.FilterOpts) error { - it, err := r.swapica.FilterMatchUpdated(opts, nil) - if err != nil { - return errors.Wrap(err, "failed to filter MatchUpdated events") - } - - for it.Next() { - if err = r.updateMatch(ctx, it.Event.MatchId, it.Event.Status); err != nil { - return errors.Wrap(err, "failed to update match order") - } - - if b := it.Event.Raw.BlockNumber + 1; b > r.lastBlock { - r.lastBlock = b - r.lastBlockOutdated = true - } - } - - return errors.Wrap(it.Error(), "error occurred while iterating over MatchUpdated events") -} - -func (r *indexer) addMatch(ctx context.Context, mo gobind.ISwapicaMatch, useRelayer bool) error { - log := r.log.WithField("match_id", mo.MatchId.String()) - log.Debug("adding new match order") - body := requests.NewAddMatch(mo, r.chainID, useRelayer) - u, _ := url.Parse("/match_orders") - - err := r.collector.PostJSON(u, body, ctx, nil) - if isConflict(err) { - log.Warn("match order already exists in collector DB, skipping it") - return nil - } - - return errors.Wrap(err, "failed to add match order into collector service") -} - -func (r *indexer) updateMatch(ctx context.Context, id *big.Int, state uint8) error { - r.log.WithField("match_id", id.String()).Debug("updating match state") - body := requests.NewUpdateMatch(id, state) - u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/match_orders") - err := r.collector.PatchJSON(u, body, ctx, nil) - return errors.Wrap(err, "failed to update match order in collector service") -} diff --git a/internal/service/orders.go b/internal/service/orders.go deleted file mode 100644 index 5c3fca3..0000000 --- a/internal/service/orders.go +++ /dev/null @@ -1,75 +0,0 @@ -package service - -import ( - "context" - "math/big" - "net/url" - "strconv" - - "github.com/Swapica/indexer-svc/internal/gobind" - "github.com/Swapica/indexer-svc/internal/service/requests" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "gitlab.com/distributed_lab/logan/v3/errors" -) - -func (r *indexer) handleCreatedOrders(ctx context.Context, opts *bind.FilterOpts) error { - it, err := r.swapica.FilterOrderCreated(opts) - if err != nil { - return errors.Wrap(err, "failed to filter OrderCreated events") - } - for it.Next() { - if err = r.addOrder(ctx, it.Event.Order, it.Event.UseRelayer); err != nil { - return errors.Wrap(err, "failed to index order") - } - - if b := it.Event.Raw.BlockNumber + 1; b > r.lastBlock { - r.lastBlock = b - r.lastBlockOutdated = true - } - } - - return errors.Wrap(it.Error(), "error occurred while iterating over OrderCreated events") -} - -func (r *indexer) handleUpdatedOrders(ctx context.Context, opts *bind.FilterOpts) error { - it, err := r.swapica.FilterOrderUpdated(opts, nil) - if err != nil { - return errors.Wrap(err, "failed to filter OrderUpdated events") - } - - for it.Next() { - if err = r.updateOrder(ctx, it.Event.OrderId, it.Event.Status); err != nil { - return errors.Wrap(err, "failed to index order") - } - - if b := it.Event.Raw.BlockNumber + 1; b > r.lastBlock { - r.lastBlock = b - r.lastBlockOutdated = true - } - } - - return errors.Wrap(it.Error(), "error occurred while iterating over OrderUpdated events") -} - -func (r *indexer) addOrder(ctx context.Context, o gobind.ISwapicaOrder, useRelayer bool) error { - log := r.log.WithField("order_id", o.OrderId.String()) - log.Debug("adding new order") - body := requests.NewAddOrder(o, r.chainID, useRelayer) - u, _ := url.Parse("/orders") - - err := r.collector.PostJSON(u, body, ctx, nil) - if isConflict(err) { - log.Warn("order already exists in collector DB, skipping it") - return nil - } - - return errors.Wrap(err, "failed to add order into collector service") -} - -func (r *indexer) updateOrder(ctx context.Context, id *big.Int, status gobind.ISwapicaOrderStatus) error { - r.log.WithField("order_id", id.String()).Debug("updating order status") - body := requests.NewUpdateOrder(id, status) - u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/orders") - err := r.collector.PatchJSON(u, body, ctx, nil) - return errors.Wrap(err, "failed to update order in collector service") -} diff --git a/internal/service/types.go b/internal/service/types.go new file mode 100644 index 0000000..bc1c56b --- /dev/null +++ b/internal/service/types.go @@ -0,0 +1,39 @@ +package service + +import "database/sql" + +type Order struct { + // ID surrogate key is strongly preferred against PRIMARY KEY (OrderID, SrcChain) + ID int64 `structs:"-" db:"id"` + OrderID int64 `structs:"order_id" db:"order_id"` + SrcChain int64 `structs:"src_chain" db:"src_chain"` + Creator string `structs:"creator" db:"creator"` + SellToken int64 `structs:"sell_token" db:"sell_token"` + BuyToken int64 `structs:"buy_token" db:"buy_token"` + SellAmount string `structs:"sell_amount" db:"sell_amount"` + BuyAmount string `structs:"buy_amount" db:"buy_amount"` + DestChain int64 `structs:"dest_chain" db:"dest_chain"` + State uint8 `structs:"state" db:"state"` + UseRelayer bool `structs:"use_relayer" db:"use_relayer"` + + // ExecutedByMatch foreign key for match_orders(ID) + ExecutedByMatch sql.NullInt64 `structs:"executed_by_match,omitempty,omitnested" db:"executed_by_match"` + MatchID sql.NullInt64 `structs:"match_id,omitempty,omitnested" db:"match_id"` + MatchSwapica sql.NullString `structs:"match_swapica,omitempty,omitnested" db:"match_swapica"` +} + +type Match struct { + // ID surrogate key is strongly preferred against PRIMARY KEY (MatchID, SrcChain) + ID int64 `structs:"-" db:"id"` + MatchID int64 `structs:"match_id" db:"match_id"` + SrcChain int64 `structs:"src_chain" db:"src_chain"` + // OriginOrder foreign key for orders(ID) + OriginOrder int64 `structs:"origin_order" db:"origin_order"` + OrderID int64 `structs:"order_id" db:"order_id"` + OrderChain int64 `structs:"order_chain" db:"order_chain"` + Creator string `structs:"creator" db:"creator"` + SellToken int64 `structs:"sell_token" db:"sell_token"` + SellAmount string `structs:"sell_amount" db:"sell_amount"` + State uint8 `structs:"state" db:"state"` + UseRelayer bool `structs:"use_relayer" db:"use_relayer"` +}