diff --git a/cmd/root.go b/cmd/root.go index 8381c2f..cabd919 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -55,6 +55,7 @@ func init() { rootCmd.PersistentFlags().Int("poller-from-block", 0, "From which block to start polling") rootCmd.PersistentFlags().Bool("poller-force-from-block", false, "Force the poller to start from the block specified in `poller-from-block`") rootCmd.PersistentFlags().Int("poller-until-block", 0, "Until which block to poll") + rootCmd.PersistentFlags().Int("poller-parallel-pollers", 5, "Maximum number of parallel pollers") rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer") rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval") rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds") @@ -76,6 +77,8 @@ func init() { rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-host", "", "Clickhouse host for orchestrator storage") rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-username", "", "Clickhouse username for orchestrator storage") rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-password", "", "Clickhouse password for orchestrator storage") + rootCmd.PersistentFlags().Bool("storage-orchestrator-clickhouse-asyncInsert", false, "Clickhouse async insert for orchestrator storage") + rootCmd.PersistentFlags().Int("storage-orchestrator-clickhouse-maxRowsPerInsert", 100000, "Clickhouse max rows per insert for orchestrator storage") rootCmd.PersistentFlags().Int("storage-orchestrator-memory-maxItems", 0, "Max items for orchestrator memory storage") rootCmd.PersistentFlags().Int("storage-orchestrator-redis-poolSize", 0, "Redis pool size for orchestrator storage") rootCmd.PersistentFlags().String("storage-orchestrator-redis-addr", "", "Redis address for orchestrator storage") @@ -85,8 +88,12 @@ func init() { rootCmd.PersistentFlags().String("storage-main-clickhouse-host", "", "Clickhouse host for main storage") rootCmd.PersistentFlags().String("storage-main-clickhouse-username", "", "Clickhouse username for main storage") rootCmd.PersistentFlags().String("storage-main-clickhouse-password", "", "Clickhouse password for main storage") + rootCmd.PersistentFlags().Bool("storage-main-clickhouse-asyncInsert", false, "Clickhouse async insert for main storage") + rootCmd.PersistentFlags().Int("storage-main-clickhouse-maxRowsPerInsert", 100000, "Clickhouse max rows per insert for main storage") rootCmd.PersistentFlags().String("storage-staging-clickhouse-username", "", "Clickhouse username for staging storage") rootCmd.PersistentFlags().String("storage-staging-clickhouse-password", "", "Clickhouse password for staging storage") + rootCmd.PersistentFlags().Bool("storage-staging-clickhouse-asyncInsert", false, "Clickhouse async insert for staging storage") + rootCmd.PersistentFlags().Int("storage-staging-clickhouse-maxRowsPerInsert", 100000, "Clickhouse max rows per insert for staging storage") rootCmd.PersistentFlags().String("api-host", "localhost:3000", "API host") viper.BindPFlag("rpc.url", rootCmd.PersistentFlags().Lookup("rpc-url")) viper.BindPFlag("rpc.blocks.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-blocks-blocksPerRequest")) @@ -107,6 +114,7 @@ func init() { viper.BindPFlag("poller.fromBlock", rootCmd.PersistentFlags().Lookup("poller-from-block")) viper.BindPFlag("poller.forceFromBlock", rootCmd.PersistentFlags().Lookup("poller-force-from-block")) viper.BindPFlag("poller.untilBlock", rootCmd.PersistentFlags().Lookup("poller-until-block")) + viper.BindPFlag("poller.parallelPollers", rootCmd.PersistentFlags().Lookup("poller-parallel-pollers")) viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled")) viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit")) viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval")) @@ -122,18 +130,24 @@ func init() { viper.BindPFlag("storage.staging.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-database")) viper.BindPFlag("storage.staging.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-host")) viper.BindPFlag("storage.staging.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-port")) + viper.BindPFlag("storage.staging.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-username")) + viper.BindPFlag("storage.staging.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-password")) + viper.BindPFlag("storage.staging.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-asyncInsert")) + viper.BindPFlag("storage.staging.clickhouse.maxRowsPerInsert", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-maxRowsPerInsert")) viper.BindPFlag("storage.main.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-database")) viper.BindPFlag("storage.main.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-host")) viper.BindPFlag("storage.main.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-port")) viper.BindPFlag("storage.main.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-username")) viper.BindPFlag("storage.main.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-password")) - viper.BindPFlag("storage.staging.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-username")) - viper.BindPFlag("storage.staging.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-password")) + viper.BindPFlag("storage.main.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-asyncInsert")) + viper.BindPFlag("storage.main.clickhouse.maxRowsPerInsert", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-maxRowsPerInsert")) viper.BindPFlag("storage.orchestrator.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-database")) viper.BindPFlag("storage.orchestrator.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-host")) viper.BindPFlag("storage.orchestrator.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-port")) viper.BindPFlag("storage.orchestrator.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-username")) viper.BindPFlag("storage.orchestrator.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-password")) + viper.BindPFlag("storage.orchestrator.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-asyncInsert")) + viper.BindPFlag("storage.orchestrator.clickhouse.maxRowsPerInsert", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-maxRowsPerInsert")) viper.BindPFlag("storage.orchestrator.memory.maxItems", rootCmd.PersistentFlags().Lookup("storage-orchestrator-memory-maxItems")) viper.BindPFlag("storage.orchestrator.redis.poolSize", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-poolSize")) viper.BindPFlag("storage.orchestrator.redis.addr", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-addr")) diff --git a/configs/config.go b/configs/config.go index a45c80e..669fcbb 100644 --- a/configs/config.go +++ b/configs/config.go @@ -14,12 +14,13 @@ type LogConfig struct { } type PollerConfig struct { - Enabled bool `mapstructure:"enabled"` - Interval int `mapstructure:"interval"` - BlocksPerPoll int `mapstructure:"blocksPerPoll"` - FromBlock int `mapstructure:"fromBlock"` - ForceFromBlock bool `mapstructure:"forceFromBlock"` - UntilBlock int `mapstructure:"untilBlock"` + Enabled bool `mapstructure:"enabled"` + Interval int `mapstructure:"interval"` + BlocksPerPoll int `mapstructure:"blocksPerPoll"` + FromBlock int `mapstructure:"fromBlock"` + ForceFromBlock bool `mapstructure:"forceFromBlock"` + UntilBlock int `mapstructure:"untilBlock"` + ParallelPollers int `mapstructure:"parallelPollers"` } type CommitterConfig struct { @@ -63,12 +64,14 @@ type StorageConnectionConfig struct { } type ClickhouseConfig struct { - Host string `mapstructure:"host"` - Port int `mapstructure:"port"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - Database string `mapstructure:"database"` - DisableTLS bool `mapstructure:"disableTLS"` + Host string `mapstructure:"host"` + Port int `mapstructure:"port"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + Database string `mapstructure:"database"` + DisableTLS bool `mapstructure:"disableTLS"` + AsyncInsert bool `mapstructure:"asyncInsert"` + MaxRowsPerInsert int `mapstructure:"maxRowsPerInsert"` } type MemoryConfig struct { diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index fef0861..72819e9 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -25,6 +25,7 @@ type Poller struct { storage storage.IStorage lastPolledBlock *big.Int pollUntilBlock *big.Int + parallelPollers int } type BlockNumberWithError struct { @@ -62,6 +63,7 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller { storage: storage, lastPolledBlock: lastPolledBlock, pollUntilBlock: untilBlock, + parallelPollers: config.Cfg.Poller.ParallelPollers, } } @@ -69,12 +71,10 @@ func (p *Poller) Start() { interval := time.Duration(p.triggerIntervalMs) * time.Millisecond ticker := time.NewTicker(interval) - // TODO: make this configurable? - const numWorkers = 5 - tasks := make(chan struct{}, numWorkers) + tasks := make(chan struct{}, p.parallelPollers) var blockRangeMutex sync.Mutex - for i := 0; i < numWorkers; i++ { + for i := 0; i < p.parallelPollers; i++ { go func() { for range tasks { blockRangeMutex.Lock() @@ -155,7 +155,7 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) endBlock = latestBlock } if p.reachedPollLimit(endBlock) { - log.Debug().Msgf("End block %s is greater than poll until block %s, setting to poll until block", endBlock, p.pollUntilBlock) + log.Debug().Msgf("End block %s is greater than or equal to poll until block %s, setting range end to poll until block", endBlock, p.pollUntilBlock) endBlock = p.pollUntilBlock } return endBlock diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 6f5f187..23a5ff4 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -24,12 +24,17 @@ type ClickHouseConnector struct { cfg *config.ClickhouseConfig } +var DEFAULT_MAX_ROWS_PER_INSERT = 100000 + func NewClickHouseConnector(cfg *config.ClickhouseConfig) (*ClickHouseConnector, error) { conn, err := connectDB(cfg) // Question: Should we add the table setup here? if err != nil { return nil, err } + if cfg.MaxRowsPerInsert == 0 { + cfg.MaxRowsPerInsert = DEFAULT_MAX_ROWS_PER_INSERT + } return &ClickHouseConnector{ conn: conn, cfg: cfg, @@ -55,6 +60,15 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) { Username: cfg.Username, Password: cfg.Password, }, + Settings: func() clickhouse.Settings { + if cfg.AsyncInsert { + return clickhouse.Settings{ + "async_insert": "1", + "wait_for_async_insert": "1", + } + } + return clickhouse.Settings{} + }(), }) if err != nil { return nil, err @@ -71,40 +85,51 @@ func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error { gas_used, withdrawals_root, base_fee_per_gas ) ` - batch, err := c.conn.PrepareBatch(context.Background(), query) - if err != nil { - return err - } - for _, block := range *blocks { - err := batch.Append( - block.ChainId, - block.Number, - block.Timestamp, - block.Hash, - block.ParentHash, - block.Sha3Uncles, - block.Nonce, - block.MixHash, - block.Miner, - block.StateRoot, - block.TransactionsRoot, - block.ReceiptsRoot, - block.Size, - block.LogsBloom, - block.ExtraData, - block.Difficulty, - block.TotalDifficulty, - block.TransactionCount, - block.GasLimit, - block.GasUsed, - block.WithdrawalsRoot, - block.BaseFeePerGas, - ) + for i := 0; i < len(*blocks); i += c.cfg.MaxRowsPerInsert { + end := i + c.cfg.MaxRowsPerInsert + if end > len(*blocks) { + end = len(*blocks) + } + + batch, err := c.conn.PrepareBatch(context.Background(), query) if err != nil { return err } + + for _, block := range (*blocks)[i:end] { + err := batch.Append( + block.ChainId, + block.Number, + block.Timestamp, + block.Hash, + block.ParentHash, + block.Sha3Uncles, + block.Nonce, + block.MixHash, + block.Miner, + block.StateRoot, + block.TransactionsRoot, + block.ReceiptsRoot, + block.Size, + block.LogsBloom, + block.ExtraData, + block.Difficulty, + block.TotalDifficulty, + block.TransactionCount, + block.GasLimit, + block.GasUsed, + block.WithdrawalsRoot, + block.BaseFeePerGas, + ) + if err != nil { + return err + } + } + if err := batch.Send(); err != nil { + return err + } } - return batch.Send() + return nil } func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) error { @@ -115,47 +140,60 @@ func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) erro transaction_type, r, s, v, access_list, contract_address, gas_used, cumulative_gas_used, effective_gas_price, blob_gas_used, blob_gas_price, logs_bloom, status ) ` - batch, err := c.conn.PrepareBatch(context.Background(), query) - if err != nil { - return err - } - for _, tx := range *txs { - err := batch.Append( - tx.ChainId, - tx.Hash, - tx.Nonce, - tx.BlockHash, - tx.BlockNumber, - tx.BlockTimestamp, - tx.TransactionIndex, - tx.FromAddress, - tx.ToAddress, - tx.Value, - tx.Gas, - tx.GasPrice, - tx.Data, - tx.FunctionSelector, - tx.MaxFeePerGas, - tx.MaxPriorityFeePerGas, - tx.TransactionType, - tx.R, - tx.S, - tx.V, - tx.AccessListJson, - tx.ContractAddress, - tx.GasUsed, - tx.CumulativeGasUsed, - tx.EffectiveGasPrice, - tx.BlobGasUsed, - tx.BlobGasPrice, - tx.LogsBloom, - tx.Status, - ) + for i := 0; i < len(*txs); i += c.cfg.MaxRowsPerInsert { + end := i + c.cfg.MaxRowsPerInsert + if end > len(*txs) { + end = len(*txs) + } + + batch, err := c.conn.PrepareBatch(context.Background(), query) if err != nil { return err } + + for _, tx := range (*txs)[i:end] { + err := batch.Append( + tx.ChainId, + tx.Hash, + tx.Nonce, + tx.BlockHash, + tx.BlockNumber, + tx.BlockTimestamp, + tx.TransactionIndex, + tx.FromAddress, + tx.ToAddress, + tx.Value, + tx.Gas, + tx.GasPrice, + tx.Data, + tx.FunctionSelector, + tx.MaxFeePerGas, + tx.MaxPriorityFeePerGas, + tx.TransactionType, + tx.R, + tx.S, + tx.V, + tx.AccessListJson, + tx.ContractAddress, + tx.GasUsed, + tx.CumulativeGasUsed, + tx.EffectiveGasPrice, + tx.BlobGasUsed, + tx.BlobGasPrice, + tx.LogsBloom, + tx.Status, + ) + if err != nil { + return err + } + } + + if err := batch.Send(); err != nil { + return err + } } - return batch.Send() + + return nil } func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error { @@ -165,51 +203,64 @@ func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error { log_index, address, data, topic_0, topic_1, topic_2, topic_3 ) ` - batch, err := c.conn.PrepareBatch(context.Background(), query) - if err != nil { - return err - } - for _, log := range *logs { - err := batch.Append( - log.ChainId, - log.BlockNumber, - log.BlockHash, - log.BlockTimestamp, - log.TransactionHash, - log.TransactionIndex, - log.LogIndex, - log.Address, - log.Data, - func() string { - if len(log.Topics) > 0 { - return log.Topics[0] - } - return "" - }(), - func() string { - if len(log.Topics) > 1 { - return log.Topics[1] - } - return "" - }(), - func() string { - if len(log.Topics) > 2 { - return log.Topics[2] - } - return "" - }(), - func() string { - if len(log.Topics) > 3 { - return log.Topics[3] - } - return "" - }(), - ) + for i := 0; i < len(*logs); i += c.cfg.MaxRowsPerInsert { + end := i + c.cfg.MaxRowsPerInsert + if end > len(*logs) { + end = len(*logs) + } + + batch, err := c.conn.PrepareBatch(context.Background(), query) if err != nil { return err } + + for _, log := range (*logs)[i:end] { + err := batch.Append( + log.ChainId, + log.BlockNumber, + log.BlockHash, + log.BlockTimestamp, + log.TransactionHash, + log.TransactionIndex, + log.LogIndex, + log.Address, + log.Data, + func() string { + if len(log.Topics) > 0 { + return log.Topics[0] + } + return "" + }(), + func() string { + if len(log.Topics) > 1 { + return log.Topics[1] + } + return "" + }(), + func() string { + if len(log.Topics) > 2 { + return log.Topics[2] + } + return "" + }(), + func() string { + if len(log.Topics) > 3 { + return log.Topics[3] + } + return "" + }(), + ) + if err != nil { + return err + } + } + + if err := batch.Send(); err != nil { + return err + } } - return batch.Send() + + return nil } func (c *ClickHouseConnector) StoreBlockFailures(failures []common.BlockFailure) error { @@ -631,7 +682,7 @@ func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error { } func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (*[]common.BlockData, error) { - query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0", + query := fmt.Sprintf("SELECT data FROM %s.block_data WHERE block_number IN (%s) AND is_deleted = 0", c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers)) if qf.ChainId.Sign() != 0 { @@ -699,39 +750,52 @@ func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error { gas, gas_used, input, output, value, author, reward_type, refund_address ) ` - batch, err := c.conn.PrepareBatch(context.Background(), query) - if err != nil { - return err - } - for _, trace := range *traces { - err = batch.Append( - trace.ChainID, - trace.BlockNumber, - trace.BlockHash, - trace.BlockTimestamp, - trace.TransactionHash, - trace.TransactionIndex, - trace.Subtraces, - trace.TraceAddress, - trace.TraceType, - trace.CallType, - trace.Error, - trace.FromAddress, - trace.ToAddress, - trace.Gas.Uint64(), - trace.GasUsed.Uint64(), - trace.Input, - trace.Output, - trace.Value, - trace.Author, - trace.RewardType, - trace.RefundAddress, - ) + for i := 0; i < len(*traces); i += c.cfg.MaxRowsPerInsert { + end := i + c.cfg.MaxRowsPerInsert + if end > len(*traces) { + end = len(*traces) + } + + batch, err := c.conn.PrepareBatch(context.Background(), query) if err != nil { return err } + + for _, trace := range (*traces)[i:end] { + err = batch.Append( + trace.ChainID, + trace.BlockNumber, + trace.BlockHash, + trace.BlockTimestamp, + trace.TransactionHash, + trace.TransactionIndex, + trace.Subtraces, + trace.TraceAddress, + trace.TraceType, + trace.CallType, + trace.Error, + trace.FromAddress, + trace.ToAddress, + trace.Gas.Uint64(), + trace.GasUsed.Uint64(), + trace.Input, + trace.Output, + trace.Value, + trace.Author, + trace.RewardType, + trace.RefundAddress, + ) + if err != nil { + return err + } + } + + if err := batch.Send(); err != nil { + return err + } } - return batch.Send() + + return nil } func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace, err error) { @@ -919,7 +983,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { defer wg.Done() if err := c.insertBlocks(&blocks); err != nil { saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting blocks: %v", err) + saveErr = fmt.Errorf("error inserting blocks: %v", err) saveErrMutex.Unlock() } }() @@ -931,7 +995,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { defer wg.Done() if err := c.insertLogs(&logs); err != nil { saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting logs: %v", err) + saveErr = fmt.Errorf("error inserting logs: %v", err) saveErrMutex.Unlock() } }() @@ -943,7 +1007,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { defer wg.Done() if err := c.insertTransactions(&transactions); err != nil { saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting transactions: %v", err) + saveErr = fmt.Errorf("error inserting transactions: %v", err) saveErrMutex.Unlock() } }() @@ -955,7 +1019,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { defer wg.Done() if err := c.insertTraces(&traces); err != nil { saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting traces: %v", err) + saveErr = fmt.Errorf("error inserting traces: %v", err) saveErrMutex.Unlock() } }() diff --git a/internal/tools/clickhouse_create_blocks_table.sql b/internal/tools/clickhouse_create_blocks_table.sql index efcc731..7a03300 100644 --- a/internal/tools/clickhouse_create_blocks_table.sql +++ b/internal/tools/clickhouse_create_blocks_table.sql @@ -27,4 +27,5 @@ CREATE TABLE blocks ( INDEX idx_hash hash TYPE bloom_filter GRANULARITY 1, ) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted) ORDER BY (chain_id, number) +PARTITION BY chain_id SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file diff --git a/internal/tools/clickhouse_create_logs_table.sql b/internal/tools/clickhouse_create_logs_table.sql index 4ece815..81cbd6e 100644 --- a/internal/tools/clickhouse_create_logs_table.sql +++ b/internal/tools/clickhouse_create_logs_table.sql @@ -24,4 +24,5 @@ CREATE TABLE logs ( INDEX idx_topic3 topic_3 TYPE bloom_filter GRANULARITY 1, ) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted) ORDER BY (chain_id, block_number, transaction_hash, log_index) +PARTITION BY chain_id SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file diff --git a/internal/tools/clickhouse_create_staging_table.sql b/internal/tools/clickhouse_create_staging_table.sql index 2ab6ec2..fdcfe0c 100644 --- a/internal/tools/clickhouse_create_staging_table.sql +++ b/internal/tools/clickhouse_create_staging_table.sql @@ -7,4 +7,5 @@ CREATE TABLE block_data ( INDEX idx_block_number block_number TYPE minmax GRANULARITY 1, ) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted) ORDER BY (chain_id, block_number) +PARTITION BY chain_id SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file diff --git a/internal/tools/clickhouse_create_traces_table.sql b/internal/tools/clickhouse_create_traces_table.sql index 1d59696..ea0fce9 100644 --- a/internal/tools/clickhouse_create_traces_table.sql +++ b/internal/tools/clickhouse_create_traces_table.sql @@ -30,4 +30,5 @@ CREATE TABLE traces ( INDEX idx_type type TYPE bloom_filter GRANULARITY 1, ) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted) ORDER BY (chain_id, block_number, transaction_hash, trace_address) +PARTITION BY chain_id SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file diff --git a/internal/tools/clickhouse_create_transactions_table.sql b/internal/tools/clickhouse_create_transactions_table.sql index 7487316..eaaaadf 100644 --- a/internal/tools/clickhouse_create_transactions_table.sql +++ b/internal/tools/clickhouse_create_transactions_table.sql @@ -37,4 +37,6 @@ CREATE TABLE transactions ( INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1, INDEX idx_function_selector function_selector TYPE bloom_filter GRANULARITY 1, ) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted) -ORDER BY (chain_id, block_number, hash) SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file +ORDER BY (chain_id, block_number, hash) +PARTITION BY chain_id +SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file