Skip to content

Commit

Permalink
clickhouse and poller settings
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Oct 23, 2024
1 parent b0a4ef9 commit 4ebbc5a
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 32 deletions.
18 changes: 16 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func init() {
rootCmd.PersistentFlags().Int("poller-from-block", 0, "From which block to start polling")
rootCmd.PersistentFlags().Bool("poller-force-from-block", false, "Force the poller to start from the block specified in `poller-from-block`")
rootCmd.PersistentFlags().Int("poller-until-block", 0, "Until which block to poll")
rootCmd.PersistentFlags().Int("poller-parallel-pollers", 5, "Maximum number of parallel pollers")
rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer")
rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval")
rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds")
Expand All @@ -76,6 +77,8 @@ func init() {
rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-host", "", "Clickhouse host for orchestrator storage")
rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-username", "", "Clickhouse username for orchestrator storage")
rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-password", "", "Clickhouse password for orchestrator storage")
rootCmd.PersistentFlags().Bool("storage-orchestrator-clickhouse-asyncInsert", false, "Clickhouse async insert for orchestrator storage")
rootCmd.PersistentFlags().Int("storage-orchestrator-clickhouse-maxRowsPerInsert", 100000, "Clickhouse max rows per insert for orchestrator storage")
rootCmd.PersistentFlags().Int("storage-orchestrator-memory-maxItems", 0, "Max items for orchestrator memory storage")
rootCmd.PersistentFlags().Int("storage-orchestrator-redis-poolSize", 0, "Redis pool size for orchestrator storage")
rootCmd.PersistentFlags().String("storage-orchestrator-redis-addr", "", "Redis address for orchestrator storage")
Expand All @@ -85,8 +88,12 @@ func init() {
rootCmd.PersistentFlags().String("storage-main-clickhouse-host", "", "Clickhouse host for main storage")
rootCmd.PersistentFlags().String("storage-main-clickhouse-username", "", "Clickhouse username for main storage")
rootCmd.PersistentFlags().String("storage-main-clickhouse-password", "", "Clickhouse password for main storage")
rootCmd.PersistentFlags().Bool("storage-main-clickhouse-asyncInsert", false, "Clickhouse async insert for main storage")
rootCmd.PersistentFlags().Int("storage-main-clickhouse-maxRowsPerInsert", 100000, "Clickhouse max rows per insert for main storage")
rootCmd.PersistentFlags().String("storage-staging-clickhouse-username", "", "Clickhouse username for staging storage")
rootCmd.PersistentFlags().String("storage-staging-clickhouse-password", "", "Clickhouse password for staging storage")
rootCmd.PersistentFlags().Bool("storage-staging-clickhouse-asyncInsert", false, "Clickhouse async insert for staging storage")
rootCmd.PersistentFlags().Int("storage-staging-clickhouse-maxRowsPerInsert", 100000, "Clickhouse max rows per insert for staging storage")
rootCmd.PersistentFlags().String("api-host", "localhost:3000", "API host")
viper.BindPFlag("rpc.url", rootCmd.PersistentFlags().Lookup("rpc-url"))
viper.BindPFlag("rpc.blocks.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-blocks-blocksPerRequest"))
Expand All @@ -107,6 +114,7 @@ func init() {
viper.BindPFlag("poller.fromBlock", rootCmd.PersistentFlags().Lookup("poller-from-block"))
viper.BindPFlag("poller.forceFromBlock", rootCmd.PersistentFlags().Lookup("poller-force-from-block"))
viper.BindPFlag("poller.untilBlock", rootCmd.PersistentFlags().Lookup("poller-until-block"))
viper.BindPFlag("poller.parallelPollers", rootCmd.PersistentFlags().Lookup("poller-parallel-pollers"))
viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled"))
viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit"))
viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval"))
Expand All @@ -122,18 +130,24 @@ func init() {
viper.BindPFlag("storage.staging.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-database"))
viper.BindPFlag("storage.staging.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-host"))
viper.BindPFlag("storage.staging.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-port"))
viper.BindPFlag("storage.staging.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-username"))
viper.BindPFlag("storage.staging.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-password"))
viper.BindPFlag("storage.staging.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-asyncInsert"))
viper.BindPFlag("storage.staging.clickhouse.maxRowsPerInsert", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-maxRowsPerInsert"))
viper.BindPFlag("storage.main.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-database"))
viper.BindPFlag("storage.main.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-host"))
viper.BindPFlag("storage.main.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-port"))
viper.BindPFlag("storage.main.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-username"))
viper.BindPFlag("storage.main.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-password"))
viper.BindPFlag("storage.staging.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-username"))
viper.BindPFlag("storage.staging.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-password"))
viper.BindPFlag("storage.main.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-asyncInsert"))
viper.BindPFlag("storage.main.clickhouse.maxRowsPerInsert", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-maxRowsPerInsert"))
viper.BindPFlag("storage.orchestrator.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-database"))
viper.BindPFlag("storage.orchestrator.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-host"))
viper.BindPFlag("storage.orchestrator.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-port"))
viper.BindPFlag("storage.orchestrator.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-username"))
viper.BindPFlag("storage.orchestrator.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-password"))
viper.BindPFlag("storage.orchestrator.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-asyncInsert"))
viper.BindPFlag("storage.orchestrator.clickhouse.maxRowsPerInsert", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-maxRowsPerInsert"))
viper.BindPFlag("storage.orchestrator.memory.maxItems", rootCmd.PersistentFlags().Lookup("storage-orchestrator-memory-maxItems"))
viper.BindPFlag("storage.orchestrator.redis.poolSize", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-poolSize"))
viper.BindPFlag("storage.orchestrator.redis.addr", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-addr"))
Expand Down
27 changes: 15 additions & 12 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ type LogConfig struct {
}

type PollerConfig struct {
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
BlocksPerPoll int `mapstructure:"blocksPerPoll"`
FromBlock int `mapstructure:"fromBlock"`
ForceFromBlock bool `mapstructure:"forceFromBlock"`
UntilBlock int `mapstructure:"untilBlock"`
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
BlocksPerPoll int `mapstructure:"blocksPerPoll"`
FromBlock int `mapstructure:"fromBlock"`
ForceFromBlock bool `mapstructure:"forceFromBlock"`
UntilBlock int `mapstructure:"untilBlock"`
ParallelPollers int `mapstructure:"parallelPollers"`
}

type CommitterConfig struct {
Expand Down Expand Up @@ -63,12 +64,14 @@ type StorageConnectionConfig struct {
}

type ClickhouseConfig struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Database string `mapstructure:"database"`
DisableTLS bool `mapstructure:"disableTLS"`
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Database string `mapstructure:"database"`
DisableTLS bool `mapstructure:"disableTLS"`
AsyncInsert bool `mapstructure:"asyncInsert"`
MaxRowsPerInsert int `mapstructure:"maxRowsPerInsert"`
}

type MemoryConfig struct {
Expand Down
10 changes: 5 additions & 5 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Poller struct {
storage storage.IStorage
lastPolledBlock *big.Int
pollUntilBlock *big.Int
parallelPollers int
}

type BlockNumberWithError struct {
Expand Down Expand Up @@ -62,19 +63,18 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
storage: storage,
lastPolledBlock: lastPolledBlock,
pollUntilBlock: untilBlock,
parallelPollers: config.Cfg.Poller.ParallelPollers,
}
}

func (p *Poller) Start() {
interval := time.Duration(p.triggerIntervalMs) * time.Millisecond
ticker := time.NewTicker(interval)

// TODO: make this configurable?
const numWorkers = 5
tasks := make(chan struct{}, numWorkers)
tasks := make(chan struct{}, p.parallelPollers)
var blockRangeMutex sync.Mutex

for i := 0; i < numWorkers; i++ {
for i := 0; i < p.parallelPollers; i++ {
go func() {
for range tasks {
blockRangeMutex.Lock()
Expand Down Expand Up @@ -155,7 +155,7 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int)
endBlock = latestBlock
}
if p.reachedPollLimit(endBlock) {
log.Debug().Msgf("End block %s is greater than poll until block %s, setting to poll until block", endBlock, p.pollUntilBlock)
log.Debug().Msgf("End block %s is greater than or equal to poll until block %s, setting range end to poll until block", endBlock, p.pollUntilBlock)
endBlock = p.pollUntilBlock
}
return endBlock
Expand Down
90 changes: 77 additions & 13 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ type ClickHouseConnector struct {
cfg *config.ClickhouseConfig
}

var DEFAULT_MAX_ROWS_PER_INSERT = 100000

func NewClickHouseConnector(cfg *config.ClickhouseConfig) (*ClickHouseConnector, error) {
conn, err := connectDB(cfg)
// Question: Should we add the table setup here?
if err != nil {
return nil, err
}
if cfg.MaxRowsPerInsert == 0 {
cfg.MaxRowsPerInsert = DEFAULT_MAX_ROWS_PER_INSERT
}
return &ClickHouseConnector{
conn: conn,
cfg: cfg,
Expand All @@ -55,6 +60,15 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
Username: cfg.Username,
Password: cfg.Password,
},
Settings: func() clickhouse.Settings {
if cfg.AsyncInsert {
return clickhouse.Settings{
"async_insert": "1",
"wait_for_async_insert": "1",
}
}
return clickhouse.Settings{}
}(),
})
if err != nil {
return nil, err
Expand All @@ -71,11 +85,18 @@ func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error {
gas_used, withdrawals_root, base_fee_per_gas
)
`
for i := 0; i < len(*blocks); i += c.cfg.MaxRowsPerInsert {
end := i + c.cfg.MaxRowsPerInsert
if end > len(*blocks) {
end = len(*blocks)
}

batch, err := c.conn.PrepareBatch(context.Background(), query)
if err != nil {
return err
}
for _, block := range *blocks {

for _, block := range (*blocks)[i:end] {
err := batch.Append(
block.ChainId,
block.Number,
Expand Down Expand Up @@ -104,7 +125,11 @@ func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error {
return err
}
}
return batch.Send()
if err := batch.Send(); err != nil {
return err
}
}
return nil
}

func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) error {
Expand All @@ -115,11 +140,18 @@ func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) erro
transaction_type, r, s, v, access_list, contract_address, gas_used, cumulative_gas_used, effective_gas_price, blob_gas_used, blob_gas_price, logs_bloom, status
)
`
for i := 0; i < len(*txs); i += c.cfg.MaxRowsPerInsert {
end := i + c.cfg.MaxRowsPerInsert
if end > len(*txs) {
end = len(*txs)
}

batch, err := c.conn.PrepareBatch(context.Background(), query)
if err != nil {
return err
}
for _, tx := range *txs {

for _, tx := range (*txs)[i:end] {
err := batch.Append(
tx.ChainId,
tx.Hash,
Expand Down Expand Up @@ -155,7 +187,13 @@ func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) erro
return err
}
}
return batch.Send()

if err := batch.Send(); err != nil {
return err
}
}

return nil
}

func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error {
Expand All @@ -165,11 +203,18 @@ func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error {
log_index, address, data, topic_0, topic_1, topic_2, topic_3
)
`
for i := 0; i < len(*logs); i += c.cfg.MaxRowsPerInsert {
end := i + c.cfg.MaxRowsPerInsert
if end > len(*logs) {
end = len(*logs)
}

batch, err := c.conn.PrepareBatch(context.Background(), query)
if err != nil {
return err
}
for _, log := range *logs {

for _, log := range (*logs)[i:end] {
err := batch.Append(
log.ChainId,
log.BlockNumber,
Expand Down Expand Up @@ -209,7 +254,13 @@ func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error {
return err
}
}
return batch.Send()

if err := batch.Send(); err != nil {
return err
}
}

return nil
}

func (c *ClickHouseConnector) StoreBlockFailures(failures []common.BlockFailure) error {
Expand Down Expand Up @@ -631,7 +682,7 @@ func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error {
}

func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (*[]common.BlockData, error) {
query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0",
query := fmt.Sprintf("SELECT data FROM %s.block_data WHERE block_number IN (%s) AND is_deleted = 0",
c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers))

if qf.ChainId.Sign() != 0 {
Expand Down Expand Up @@ -699,11 +750,18 @@ func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error {
gas, gas_used, input, output, value, author, reward_type, refund_address
)
`
for i := 0; i < len(*traces); i += c.cfg.MaxRowsPerInsert {
end := i + c.cfg.MaxRowsPerInsert
if end > len(*traces) {
end = len(*traces)
}

batch, err := c.conn.PrepareBatch(context.Background(), query)
if err != nil {
return err
}
for _, trace := range *traces {

for _, trace := range (*traces)[i:end] {
err = batch.Append(
trace.ChainID,
trace.BlockNumber,
Expand Down Expand Up @@ -731,7 +789,13 @@ func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error {
return err
}
}
return batch.Send()

if err := batch.Send(); err != nil {
return err
}
}

return nil
}

func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace, err error) {
Expand Down Expand Up @@ -919,7 +983,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
defer wg.Done()
if err := c.insertBlocks(&blocks); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting blocks: %v", err)
saveErr = fmt.Errorf("error inserting blocks: %v", err)
saveErrMutex.Unlock()
}
}()
Expand All @@ -931,7 +995,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
defer wg.Done()
if err := c.insertLogs(&logs); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting logs: %v", err)
saveErr = fmt.Errorf("error inserting logs: %v", err)
saveErrMutex.Unlock()
}
}()
Expand All @@ -943,7 +1007,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
defer wg.Done()
if err := c.insertTransactions(&transactions); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting transactions: %v", err)
saveErr = fmt.Errorf("error inserting transactions: %v", err)
saveErrMutex.Unlock()
}
}()
Expand All @@ -955,7 +1019,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
defer wg.Done()
if err := c.insertTraces(&traces); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting traces: %v", err)
saveErr = fmt.Errorf("error inserting traces: %v", err)
saveErrMutex.Unlock()
}
}()
Expand Down

0 comments on commit 4ebbc5a

Please sign in to comment.