Skip to content

Commit

Permalink
Merge pull request #2848 from gobitfly/NOBIDS/improve_ethstore_perfor…
Browse files Browse the repository at this point in the history
…mance

(NOBIDS) improve ethstore export performance
  • Loading branch information
recy21 committed Mar 12, 2024
2 parents 4e67631 + 58323b6 commit 0794269
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
15 changes: 14 additions & 1 deletion cmd/ethstore-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"
"strings"

ethstore "github.com/gobitfly/eth.store"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/sirupsen/logrus"
)
Expand All @@ -25,6 +26,10 @@ func main() {
versionFlag := flag.Bool("version", false, "Show version and exit")
dayToReexport := flag.Int64("day", -1, "Day to reexport")
daysToReexport := flag.String("days", "", "Days to reexport")
receiptsModeStr := flag.String("receipts-mode", "single", "single or batch")
concurrency := flag.Int("concurrency", 1, "concurrency level to use (1 for no concurrency)")
debugLevel := flag.Uint64("debug-level", 0, "debug level to use for eth.store calculation output")

flag.Parse()

if *versionFlag {
Expand Down Expand Up @@ -82,6 +87,14 @@ func main() {
endDayReexport = *dayToReexport
}

exporter.StartEthStoreExporter(*bnAddress, *enAddress, *updateInterval, *errorInterval, *sleepInterval, startDayReexport, endDayReexport)
receiptsMode := ethstore.RECEIPTS_MODE_SINGLE

if *receiptsModeStr == "batch" {
receiptsMode = ethstore.RECEIPTS_MODE_BATCH
}

ethstore.SetDebugLevel(*debugLevel)
logrus.Infof("using receipts mode %s (%d)", *receiptsModeStr, receiptsMode)
exporter.StartEthStoreExporter(*bnAddress, *enAddress, *updateInterval, *errorInterval, *sleepInterval, startDayReexport, endDayReexport, *concurrency, receiptsMode)
logrus.Println("exiting...")
}
26 changes: 13 additions & 13 deletions exporter/ethstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type EthStoreExporter struct {
}

// start exporting of eth.store into db
func StartEthStoreExporter(bnAddress string, enAddress string, updateInterval, errorInterval, sleepInterval time.Duration, startDayReexport, endDayReexport int64) {
func StartEthStoreExporter(bnAddress string, enAddress string, updateInterval, errorInterval, sleepInterval time.Duration, startDayReexport, endDayReexport int64, concurrency, receiptsMode int) {
logger.Info("starting eth.store exporter")
ese := &EthStoreExporter{
DB: db.WriterDb,
Expand All @@ -51,7 +51,7 @@ func StartEthStoreExporter(bnAddress string, enAddress string, updateInterval, e
// Reexport days if specified
if startDayReexport != -1 && endDayReexport != -1 {
for day := startDayReexport; day <= endDayReexport; day++ {
err := ese.reexportDay(strconv.FormatInt(day, 10))
err := ese.reexportDay(strconv.FormatInt(day, 10), concurrency, receiptsMode)
if err != nil {
utils.LogError(err, fmt.Sprintf("error reexporting eth.store day %d in database", day), 0)
return
Expand All @@ -60,10 +60,10 @@ func StartEthStoreExporter(bnAddress string, enAddress string, updateInterval, e
return
}

ese.Run()
ese.Run(concurrency, receiptsMode)
}

func (ese *EthStoreExporter) reexportDay(day string) error {
func (ese *EthStoreExporter) reexportDay(day string, concurrency, receiptsMode int) error {
tx, err := ese.DB.Beginx()
if err != nil {
return err
Expand All @@ -75,22 +75,22 @@ func (ese *EthStoreExporter) reexportDay(day string) error {
return err
}

ese.prepareExportDayTx(tx, day)
ese.prepareExportDayTx(tx, day, concurrency, receiptsMode)
if err != nil {
return err
}

return tx.Commit()
}

func (ese *EthStoreExporter) exportDay(day string) error {
func (ese *EthStoreExporter) exportDay(day string, concurrency, receiptsMode int) error {
tx, err := ese.DB.Beginx()
if err != nil {
return err
}
defer tx.Rollback()

err = ese.prepareExportDayTx(tx, day)
err = ese.prepareExportDayTx(tx, day, concurrency, receiptsMode)
if err != nil {
return err
}
Expand All @@ -107,8 +107,8 @@ func (ese *EthStoreExporter) prepareClearDayTx(tx *sqlx.Tx, day string) error {
return err
}

func (ese *EthStoreExporter) prepareExportDayTx(tx *sqlx.Tx, day string) error {
ethStoreDay, validators, err := ese.getStoreDay(day)
func (ese *EthStoreExporter) prepareExportDayTx(tx *sqlx.Tx, day string, concurrency, receiptsMode int) error {
ethStoreDay, validators, err := ese.getStoreDay(day, concurrency, receiptsMode)
if err != nil {
return err
}
Expand Down Expand Up @@ -208,12 +208,12 @@ func (ese *EthStoreExporter) prepareExportDayTx(tx *sqlx.Tx, day string) error {
return err
}

func (ese *EthStoreExporter) getStoreDay(day string) (*ethstore.Day, map[uint64]*ethstore.Day, error) {
func (ese *EthStoreExporter) getStoreDay(day string, concurrency, receiptsMode int) (*ethstore.Day, map[uint64]*ethstore.Day, error) {
logger.Infof("retrieving eth.store for day %v", day)
return ethstore.Calculate(context.Background(), ese.BNAddress, ese.ENAddress, day, 1, ethstore.RECEIPTS_MODE_SINGLE)
return ethstore.Calculate(context.Background(), ese.BNAddress, ese.ENAddress, day, concurrency, receiptsMode)
}

func (ese *EthStoreExporter) Run() {
func (ese *EthStoreExporter) Run(concurrency, receiptsMode int) {
t := time.NewTicker(ese.UpdateInverval)
defer t.Stop()
DBCHECK:
Expand Down Expand Up @@ -282,7 +282,7 @@ DBCHECK:
})
// export missing days
for _, dayToExport := range daysToExportArray {
err = ese.exportDay(strconv.FormatUint(dayToExport, 10))
err = ese.exportDay(strconv.FormatUint(dayToExport, 10), concurrency, receiptsMode)
if err != nil {
utils.LogError(err, fmt.Sprintf("error exporting eth.store day %d into database", dayToExport), 0)
time.Sleep(ese.ErrorInterval)
Expand Down

0 comments on commit 0794269

Please sign in to comment.