diff --git a/cmd/ethstore-exporter/main.go b/cmd/ethstore-exporter/main.go index a1d6653526..a8188f178d 100644 --- a/cmd/ethstore-exporter/main.go +++ b/cmd/ethstore-exporter/main.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" + ethstore "github.com/gobitfly/eth.store" _ "github.com/jackc/pgx/v5/stdlib" "github.com/sirupsen/logrus" ) @@ -25,6 +26,9 @@ func main() { versionFlag := flag.Bool("version", false, "Show version and exit") dayToReexport := flag.Int64("day", -1, "Day to reexport") daysToReexport := flag.String("days", "", "Days to reexport") + receiptsModeStr := flag.String("receipts-mode", "single", "single or batch") + concurrency := flag.Int("concurrency", 1, "concurrency level to use (1 for no concurrency)") + flag.Parse() if *versionFlag { @@ -82,6 +86,14 @@ func main() { endDayReexport = *dayToReexport } - exporter.StartEthStoreExporter(*bnAddress, *enAddress, *updateInterval, *errorInterval, *sleepInterval, startDayReexport, endDayReexport) + receiptsMode := ethstore.RECEIPTS_MODE_SINGLE + + if *receiptsModeStr == "batch" { + receiptsMode = ethstore.RECEIPTS_MODE_BATCH + } + + ethstore.SetDebugLevel(1000) + logrus.Infof("using receipts mode %s (%d)", *receiptsModeStr, receiptsMode) + exporter.StartEthStoreExporter(*bnAddress, *enAddress, *updateInterval, *errorInterval, *sleepInterval, startDayReexport, endDayReexport, *concurrency, receiptsMode) logrus.Println("exiting...") } diff --git a/exporter/ethstore.go b/exporter/ethstore.go index dfd59fe659..42173f3018 100644 --- a/exporter/ethstore.go +++ b/exporter/ethstore.go @@ -27,7 +27,7 @@ type EthStoreExporter struct { } // start exporting of eth.store into db -func StartEthStoreExporter(bnAddress string, enAddress string, updateInterval, errorInterval, sleepInterval time.Duration, startDayReexport, endDayReexport int64) { +func StartEthStoreExporter(bnAddress string, enAddress string, updateInterval, errorInterval, sleepInterval time.Duration, startDayReexport, endDayReexport int64, concurrency, receiptsMode int) { logger.Info("starting eth.store exporter") ese := &EthStoreExporter{ DB: db.WriterDb, @@ -51,7 +51,7 @@ func StartEthStoreExporter(bnAddress string, enAddress string, updateInterval, e // Reexport days if specified if startDayReexport != -1 && endDayReexport != -1 { for day := startDayReexport; day <= endDayReexport; day++ { - err := ese.reexportDay(strconv.FormatInt(day, 10)) + err := ese.reexportDay(strconv.FormatInt(day, 10), concurrency, receiptsMode) if err != nil { utils.LogError(err, fmt.Sprintf("error reexporting eth.store day %d in database", day), 0) return @@ -60,10 +60,10 @@ func StartEthStoreExporter(bnAddress string, enAddress string, updateInterval, e return } - ese.Run() + ese.Run(concurrency, receiptsMode) } -func (ese *EthStoreExporter) reexportDay(day string) error { +func (ese *EthStoreExporter) reexportDay(day string, concurrency, receiptsMode int) error { tx, err := ese.DB.Beginx() if err != nil { return err @@ -75,7 +75,7 @@ func (ese *EthStoreExporter) reexportDay(day string) error { return err } - ese.prepareExportDayTx(tx, day) + ese.prepareExportDayTx(tx, day, concurrency, receiptsMode) if err != nil { return err } @@ -83,14 +83,14 @@ func (ese *EthStoreExporter) reexportDay(day string) error { return tx.Commit() } -func (ese *EthStoreExporter) exportDay(day string) error { +func (ese *EthStoreExporter) exportDay(day string, concurrency, receiptsMode int) error { tx, err := ese.DB.Beginx() if err != nil { return err } defer tx.Rollback() - err = ese.prepareExportDayTx(tx, day) + err = ese.prepareExportDayTx(tx, day, concurrency, receiptsMode) if err != nil { return err } @@ -107,8 +107,8 @@ func (ese *EthStoreExporter) prepareClearDayTx(tx *sqlx.Tx, day string) error { return err } -func (ese *EthStoreExporter) prepareExportDayTx(tx *sqlx.Tx, day string) error { - ethStoreDay, validators, err := ese.getStoreDay(day) +func (ese *EthStoreExporter) prepareExportDayTx(tx *sqlx.Tx, day string, concurrency, receiptsMode int) error { + ethStoreDay, validators, err := ese.getStoreDay(day, concurrency, receiptsMode) if err != nil { return err } @@ -208,12 +208,12 @@ func (ese *EthStoreExporter) prepareExportDayTx(tx *sqlx.Tx, day string) error { return err } -func (ese *EthStoreExporter) getStoreDay(day string) (*ethstore.Day, map[uint64]*ethstore.Day, error) { +func (ese *EthStoreExporter) getStoreDay(day string, concurrency, receiptsMode int) (*ethstore.Day, map[uint64]*ethstore.Day, error) { logger.Infof("retrieving eth.store for day %v", day) - return ethstore.Calculate(context.Background(), ese.BNAddress, ese.ENAddress, day, 1, ethstore.RECEIPTS_MODE_SINGLE) + return ethstore.Calculate(context.Background(), ese.BNAddress, ese.ENAddress, day, concurrency, receiptsMode) } -func (ese *EthStoreExporter) Run() { +func (ese *EthStoreExporter) Run(concurrency, receiptsMode int) { t := time.NewTicker(ese.UpdateInverval) defer t.Stop() DBCHECK: @@ -282,7 +282,7 @@ DBCHECK: }) // export missing days for _, dayToExport := range daysToExportArray { - err = ese.exportDay(strconv.FormatUint(dayToExport, 10)) + err = ese.exportDay(strconv.FormatUint(dayToExport, 10), concurrency, receiptsMode) if err != nil { utils.LogError(err, fmt.Sprintf("error exporting eth.store day %d into database", dayToExport), 0) time.Sleep(ese.ErrorInterval)