From 1c3700e7b4d46fc048643532ea8464c0268398e6 Mon Sep 17 00:00:00 2001 From: Charitha Bandi <45089429+charithabandi@users.noreply.github.com> Date: Wed, 22 Jan 2025 16:22:19 -0600 Subject: [PATCH] restrict empty block production to 1 empty block per emptyBlockTimeout (#1251) - Introduces emptyBlockTimeout config to control the frequency with which the leader can propose empty blocks if no transactions are available. setting it to 0 disables it. - Resolution expiry is now tracked using a timestamp rather than the number of blocks - event store now tracks in-memory the events without resolutions - network performance improvements especially with blocksync - enter into catchup mode only if no valid messages have been received for a duration of emptyBlocktimeout + blkPropTimeout. --- app/migration/proposal_status.go | 5 +- app/node/build.go | 9 +- app/params/status.go | 2 +- app/setup/genesis.go | 7 +- app/shared/bind/bind.go | 4 +- app/validator/join-status.go | 2 +- app/validator/join-status_test.go | 11 +- app/validator/list-join-requests.go | 4 +- app/validator/list-join-requests_test.go | 38 +++--- config/config.go | 85 +++++------- config/config_test.go | 7 +- core/types/chain/types.go | 2 +- core/types/params.go | 34 +++-- core/types/params_test.go | 3 +- core/types/time.go | 21 +++ core/types/types.go | 5 +- extensions/resolutions/resolutions.go | 22 ++-- node/block_processor/interfaces.go | 6 + node/block_processor/processor.go | 23 +++- node/block_processor/transactions.go | 8 ++ node/block_processor/transactions_test.go | 7 + node/consensus/block.go | 6 + node/consensus/blocksync.go | 150 ++++++++++++---------- node/consensus/engine.go | 139 ++++++++++++++------ node/consensus/engine_test.go | 88 +++++++++++-- node/consensus/follower.go | 7 +- node/consensus/interfaces.go | 2 + node/consensus/leader.go | 63 ++++++--- node/consensus/updates_test.go | 17 ++- node/mempool/mempool.go | 6 + node/meta/meta_test.go | 3 +- node/migrations/migrator.go | 11 +- node/node_live_test.go | 9 ++ node/node_test.go | 2 +- node/services/jsonrpc/adminsvc/service.go | 4 +- node/statesync_test.go | 2 +- node/tx.go | 6 +- node/txapp/routes.go | 11 +- node/txapp/txapp.go | 3 +- node/types/interfaces.go | 8 +- node/voting/events.go | 27 +++- node/voting/sql.go | 5 +- node/voting/vote_test.go | 21 ++- node/voting/voting.go | 19 +-- test/integration/kwild_test.go | 3 +- 45 files changed, 614 insertions(+), 303 deletions(-) create mode 100644 core/types/time.go diff --git a/app/migration/proposal_status.go b/app/migration/proposal_status.go index 92920b843..90154cad2 100644 --- a/app/migration/proposal_status.go +++ b/app/migration/proposal_status.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "math" + "time" "github.com/spf13/cobra" @@ -53,7 +54,7 @@ func proposalStatusCmd() *cobra.Command { type MigrationStatus struct { ProposalID *types.UUID - ExpiresAt int64 `json:"expires_at"` // ExpiresAt is the block height at which the migration proposal expires + ExpiresAt time.Time `json:"expires_at"` // ExpiresAt is the block height at which the migration proposal expires Board []*types.AccountID `json:"board"` // Board is the list of validators who are eligible to vote on the migration proposal Approved []bool `json:"approved"` // Approved is the list of bools indicating if the corresponding validator approved the migration proposal } @@ -74,7 +75,7 @@ func (m *MigrationStatus) MarshalText() ([]byte, error) { var msg bytes.Buffer msg.WriteString("Migration Status:\n") msg.WriteString(fmt.Sprintf("\tProposal ID: %s\n", m.ProposalID.String())) - msg.WriteString(fmt.Sprintf("\tExpires At: %d\n", m.ExpiresAt)) + msg.WriteString(fmt.Sprintf("\tExpires At: %s\n", m.ExpiresAt.String())) msg.WriteString(fmt.Sprintf("\tApprovals Received: %d (needed %d)\n", approved, needed)) for i := range m.Board { diff --git a/app/node/build.go b/app/node/build.go index 82102254a..8bc430978 100644 --- a/app/node/build.go +++ b/app/node/build.go @@ -172,10 +172,10 @@ func buildDB(ctx context.Context, d *coreDependencies, closers *closeFuncs) *pg. // adjustExpiration = true // } - // err = migrations.CleanupResolutionsAfterMigration(d.ctx, db, adjustExpiration, startHeight) - // if err != nil { - // failBuild(err, "failed to cleanup resolutions after snapshot restore") - // } + err = migrations.CleanupResolutionsAfterMigration(d.ctx, db) + if err != nil { + failBuild(err, "failed to cleanup resolutions after snapshot restore") + } if err = db.EnsureFullReplicaIdentityDatasets(d.ctx); err != nil { failBuild(err, "failed enable full replica identity on user datasets") @@ -404,6 +404,7 @@ func buildConsensusEngine(_ context.Context, d *coreDependencies, db *pg.DB, Mempool: mempool, Logger: d.logger.New("CONS"), ProposeTimeout: time.Duration(d.cfg.Consensus.ProposeTimeout), + EmptyBlockTimeout: time.Duration(d.cfg.Consensus.EmptyBlockTimeout), BlockProposalInterval: time.Duration(d.cfg.Consensus.BlockProposalInterval), BlockAnnInterval: time.Duration(d.cfg.Consensus.BlockAnnInterval), BroadcastTxTimeout: time.Duration(d.cfg.RPC.BroadcastTxTimeout), diff --git a/app/params/status.go b/app/params/status.go index 5c0199f7b..a6fbb0788 100644 --- a/app/params/status.go +++ b/app/params/status.go @@ -97,7 +97,7 @@ func (rs MsgResolutionStatus) MarshalText() ([]byte, error) { var buf bytes.Buffer fmt.Fprintf(&buf, "%sID: %s\n", rs.indent, rs.ResolutionID) fmt.Fprintf(&buf, "%sType: %s\n", rs.indent, rs.Type) - fmt.Fprintf(&buf, "%sExpiresAt: %d\n", rs.indent, rs.ExpiresAt) + fmt.Fprintf(&buf, "%sExpiresAt: %s\n", rs.indent, rs.ExpiresAt) fmt.Fprintf(&buf, "%sBoard: %s\n", rs.indent, rs.Board) fmt.Fprintf(&buf, "%sApprovals: %v\n", rs.indent, rs.Approved) return buf.Bytes(), nil diff --git a/app/setup/genesis.go b/app/setup/genesis.go index 2cbbc15c6..4be2dc1c7 100644 --- a/app/setup/genesis.go +++ b/app/setup/genesis.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "strings" + "time" "github.com/kwilteam/kwil-db/app/shared/display" "github.com/kwilteam/kwil-db/config" @@ -39,7 +40,7 @@ type genesisFlagConfig struct { leader string dbOwner string maxBlockSize int64 - joinExpiry int64 + joinExpiry time.Duration maxVotesPerTx int64 genesisState string } @@ -109,7 +110,7 @@ func bindGenesisFlags(cmd *cobra.Command, cfg *genesisFlagConfig) { cmd.Flags().StringVar(&cfg.leader, "leader", "", "public key of the block proposer") cmd.Flags().StringVar(&cfg.dbOwner, "db-owner", "", "owner of the database") cmd.Flags().Int64Var(&cfg.maxBlockSize, "max-block-size", 0, "maximum block size") - cmd.Flags().Int64Var(&cfg.joinExpiry, "join-expiry", 0, "Number of blocks before a join proposal expires") + cmd.Flags().DurationVar(&cfg.joinExpiry, "join-expiry", 0, "Number of blocks before a join proposal expires") cmd.Flags().Int64Var(&cfg.maxVotesPerTx, "max-votes-per-tx", 0, "Maximum votes per transaction") cmd.Flags().StringVar(&cfg.genesisState, "genesis-snapshot", "", "path to genesis state snapshot file") } @@ -226,7 +227,7 @@ func mergeGenesisFlags(conf *config.GenesisConfig, cmd *cobra.Command, flagCfg * } if cmd.Flags().Changed("join-expiry") { - conf.JoinExpiry = flagCfg.joinExpiry + conf.JoinExpiry = types.Duration(flagCfg.joinExpiry) } if cmd.Flags().Changed("max-votes-per-tx") { diff --git a/app/shared/bind/bind.go b/app/shared/bind/bind.go index e631ca6ab..b8dadf7a8 100644 --- a/app/shared/bind/bind.go +++ b/app/shared/bind/bind.go @@ -13,8 +13,8 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" - "github.com/kwilteam/kwil-db/config" "github.com/kwilteam/kwil-db/core/log" + ktypes "github.com/kwilteam/kwil-db/core/types" "github.com/kwilteam/kwil-db/node/pg" "github.com/kwilteam/kwil-db/node/types" ) @@ -107,7 +107,7 @@ func SetFlagsFromStructTags(fs *pflag.FlagSet, cfg interface{}, nameTag, descTag case time.Duration: fs.Duration(flagName, vt, desc) return - case config.Duration: + case ktypes.Duration: fs.Duration(flagName, time.Duration(vt), desc) return case types.HexBytes: diff --git a/app/validator/join-status.go b/app/validator/join-status.go index 7ce10dfa9..aa6b8939e 100644 --- a/app/validator/join-status.go +++ b/app/validator/join-status.go @@ -101,7 +101,7 @@ func (r *respValJoinStatus) MarshalText() ([]byte, error) { var msg bytes.Buffer msg.WriteString(fmt.Sprintf("Candidate: %s\n", r.Data.Candidate.String())) msg.WriteString(fmt.Sprintf("Requested Power: %d\n", r.Data.Power)) - msg.WriteString(fmt.Sprintf("Expiration Height: %d\n", r.Data.ExpiresAt)) + msg.WriteString(fmt.Sprintf("Expiration Timestamp: %s\n", r.Data.ExpiresAt.String())) msg.WriteString(fmt.Sprintf("%d Approvals Received (%d needed):\n", approved, needed)) diff --git a/app/validator/join-status_test.go b/app/validator/join-status_test.go index df80e0775..cf25e59aa 100644 --- a/app/validator/join-status_test.go +++ b/app/validator/join-status_test.go @@ -3,6 +3,7 @@ package validator import ( "encoding/json" "testing" + "time" "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/types" @@ -59,6 +60,8 @@ func Test_respValJoinStatus_MarshalJSON(t *testing.T) { } func Test_respValJoinStatus_MarshalText(t *testing.T) { + now := time.Now() + nowStr := now.String() tests := []struct { name string response respValJoinStatus @@ -70,7 +73,7 @@ func Test_respValJoinStatus_MarshalText(t *testing.T) { Data: &types.JoinRequest{ Candidate: &types.AccountID{Identifier: []byte{0x12, 0x34}, KeyType: crypto.KeyTypeSecp256k1}, Power: 1000, - ExpiresAt: 5000, + ExpiresAt: now, Board: []*types.AccountID{ {Identifier: []byte{0xAB, 0xCD}, KeyType: crypto.KeyTypeSecp256k1}, {Identifier: []byte{0xEF, 0x12}, KeyType: crypto.KeyTypeSecp256k1}, @@ -79,7 +82,7 @@ func Test_respValJoinStatus_MarshalText(t *testing.T) { Approved: []bool{true, true, true}, }, }, - want: "Candidate: AccountID{identifier = 1234, keyType = secp256k1}\nRequested Power: 1000\nExpiration Height: 5000\n3 Approvals Received (2 needed):\nValidator AccountID{identifier = abcd, keyType = secp256k1}, approved\nValidator AccountID{identifier = ef12, keyType = secp256k1}, approved\nValidator AccountID{identifier = 5678, keyType = secp256k1}, approved\n", + want: "Candidate: AccountID{identifier = 1234, keyType = secp256k1}\nRequested Power: 1000\nExpiration Timestamp: " + nowStr + "\n3 Approvals Received (2 needed):\nValidator AccountID{identifier = abcd, keyType = secp256k1}, approved\nValidator AccountID{identifier = ef12, keyType = secp256k1}, approved\nValidator AccountID{identifier = 5678, keyType = secp256k1}, approved\n", }, { name: "mixed approvals", @@ -87,7 +90,7 @@ func Test_respValJoinStatus_MarshalText(t *testing.T) { Data: &types.JoinRequest{ Candidate: &types.AccountID{Identifier: []byte{0xFF}, KeyType: crypto.KeyTypeSecp256k1}, Power: 500, - ExpiresAt: 1000, + ExpiresAt: now, Board: []*types.AccountID{ {Identifier: []byte{0x11, 0x22}, KeyType: crypto.KeyTypeSecp256k1}, {Identifier: []byte{0x33, 0x44}, KeyType: crypto.KeyTypeSecp256k1}, @@ -95,7 +98,7 @@ func Test_respValJoinStatus_MarshalText(t *testing.T) { Approved: []bool{true, false}, }, }, - want: "Candidate: AccountID{identifier = ff, keyType = secp256k1}\nRequested Power: 500\nExpiration Height: 1000\n1 Approvals Received (2 needed):\nValidator AccountID{identifier = 1122, keyType = secp256k1}, approved\nValidator AccountID{identifier = 3344, keyType = secp256k1}, not approved\n", + want: "Candidate: AccountID{identifier = ff, keyType = secp256k1}\nRequested Power: 500\nExpiration Timestamp: " + nowStr + "\n1 Approvals Received (2 needed):\nValidator AccountID{identifier = 1122, keyType = secp256k1}, approved\nValidator AccountID{identifier = 3344, keyType = secp256k1}, not approved\n", }, } diff --git a/app/validator/list-join-requests.go b/app/validator/list-join-requests.go index 1314ef71a..733422239 100644 --- a/app/validator/list-join-requests.go +++ b/app/validator/list-join-requests.go @@ -77,7 +77,7 @@ func (r *respJoinList) MarshalText() ([]byte, error) { msg.WriteString(fmt.Sprintf("Pending join requests (%d %s needed):\n", needed, approvalTerm)) msg.WriteString(" Candidate | Power | Approvals | Expiration\n") msg.WriteString("------------------------------------------------------------------+-------+-----------+------------") - //ref spacing: 22cbbb666c26b2c1f42502df72c32de4d521138a1a2c96121d417a2f341a759c | 1 | 100 | 100 + //ref spacing: 22cbbb666c26b2c1f42502df72c32de4d521138a1a2c96121d417a2f341a759c | 1 | 100 | 2025-01-21 11:01:49-0600 CST for _, j := range r.Joins { approvals := 0 for _, a := range j.Approved { @@ -86,7 +86,7 @@ func (r *respJoinList) MarshalText() ([]byte, error) { } } - msg.WriteString(fmt.Sprintf("\n %s | % 5d | % 9d | %d", j.Candidate.String(), j.Power, approvals, j.ExpiresAt)) + msg.WriteString(fmt.Sprintf("\n %s | % 5d | % 9d | %s", j.Candidate.String(), j.Power, approvals, j.ExpiresAt.String())) } diff --git a/app/validator/list-join-requests_test.go b/app/validator/list-join-requests_test.go index f49063be3..276d42ded 100644 --- a/app/validator/list-join-requests_test.go +++ b/app/validator/list-join-requests_test.go @@ -3,6 +3,7 @@ package validator import ( "encoding/json" "testing" + "time" "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/types" @@ -24,24 +25,22 @@ func Test_respJoinList_MarshalJSON(t *testing.T) { Identifier: []byte{0x12, 0x34}, KeyType: crypto.KeyTypeEd25519, }, - Power: 100, - ExpiresAt: 900, - Board: []*types.AccountID{{Identifier: []byte{0xAB, 0xCD}, KeyType: crypto.KeyTypeSecp256k1}}, - Approved: []bool{true}, + Power: 100, + Board: []*types.AccountID{{Identifier: []byte{0xAB, 0xCD}, KeyType: crypto.KeyTypeSecp256k1}}, + Approved: []bool{true}, }, { Candidate: &types.AccountID{ Identifier: []byte{0x56, 0x78}, KeyType: crypto.KeyTypeSecp256k1, }, - Power: 200, - ExpiresAt: 1000, - Board: []*types.AccountID{{Identifier: []byte{0xEF, 0x12}, KeyType: crypto.KeyTypeSecp256k1}}, - Approved: []bool{false}, + Power: 200, + Board: []*types.AccountID{{Identifier: []byte{0xEF, 0x12}, KeyType: crypto.KeyTypeSecp256k1}}, + Approved: []bool{false}, }, }, }, - want: `[{"candidate":{"identifier":"1234","key_type":1},"power":100,"expires_at":900,"board":[{"identifier":"abcd","key_type":0}],"approved":[true]},{"candidate":{"identifier":"5678","key_type":0},"power":200,"expires_at":1000,"board":[{"identifier":"ef12","key_type":0}],"approved":[false]}]`, + want: `[{"candidate":{"identifier":"1234","key_type":1},"power":100,"expires_at": "0001-01-01T00:00:00Z","board":[{"identifier":"abcd","key_type":0}],"approved":[true]},{"candidate":{"identifier":"5678","key_type":0},"power":200,"expires_at":"0001-01-01T00:00:00Z","board":[{"identifier":"ef12","key_type":0}],"approved":[false]}]`, }, { name: "empty joins", @@ -59,14 +58,13 @@ func Test_respJoinList_MarshalJSON(t *testing.T) { Identifier: []byte{0x12, 0x34}, KeyType: crypto.KeyTypeEd25519, }, - Power: 150, - ExpiresAt: 1200, - Board: []*types.AccountID{{Identifier: []byte{0xAB, 0xCD}, KeyType: crypto.KeyTypeSecp256k1}}, - Approved: []bool{true, false}, + Power: 150, + Board: []*types.AccountID{{Identifier: []byte{0xAB, 0xCD}, KeyType: crypto.KeyTypeSecp256k1}}, + Approved: []bool{true, false}, }, }, }, - want: `[{"candidate":{"identifier":"1234","key_type":1},"power":150,"expires_at":1200,"board":[{"identifier":"abcd","key_type":0}],"approved":[true,false]}]`, + want: `[{"candidate":{"identifier":"1234","key_type":1},"power":150,"expires_at": "0001-01-01T00:00:00Z","board":[{"identifier":"abcd","key_type":0}],"approved":[true,false]}]`, }, } @@ -80,6 +78,8 @@ func Test_respJoinList_MarshalJSON(t *testing.T) { } func Test_respJoinList_MarshalText(t *testing.T) { + now := time.Now() + nowStr := now.String() tests := []struct { name string response respJoinList @@ -102,13 +102,13 @@ func Test_respJoinList_MarshalText(t *testing.T) { KeyType: crypto.KeyTypeEd25519, }, Power: 100, - ExpiresAt: 1000, + ExpiresAt: now, Board: []*types.AccountID{{Identifier: []byte{0xAB}}}, Approved: []bool{true}, }, }, }, - want: "Pending join requests (1 approval needed):\n Candidate | Power | Approvals | Expiration\n------------------------------------------------------------------+-------+-----------+------------\n AccountID{identifier = 1234, keyType = ed25519} | 100 | 1 | 1000", + want: "Pending join requests (1 approval needed):\n Candidate | Power | Approvals | Expiration\n------------------------------------------------------------------+-------+-----------+------------\n AccountID{identifier = 1234, keyType = ed25519} | 100 | 1 | " + nowStr, }, { name: "multiple approvals needed", @@ -120,7 +120,7 @@ func Test_respJoinList_MarshalText(t *testing.T) { KeyType: crypto.KeyTypeEd25519, }, Power: 100, - ExpiresAt: 1000, + ExpiresAt: now, Board: []*types.AccountID{ {Identifier: []byte{0xAB}}, {Identifier: []byte{0xCD}}, @@ -133,7 +133,7 @@ func Test_respJoinList_MarshalText(t *testing.T) { Identifier: []byte{0x56, 0x78}, }, Power: 200, - ExpiresAt: 2000, + ExpiresAt: now, Board: []*types.AccountID{ {Identifier: []byte{0xAB}}, {Identifier: []byte{0xCD}}, @@ -143,7 +143,7 @@ func Test_respJoinList_MarshalText(t *testing.T) { }, }, }, - want: "Pending join requests (2 approvals needed):\n Candidate | Power | Approvals | Expiration\n------------------------------------------------------------------+-------+-----------+------------\n AccountID{identifier = 1234, keyType = ed25519} | 100 | 2 | 1000\n AccountID{identifier = 5678, keyType = secp256k1} | 200 | 0 | 2000", + want: "Pending join requests (2 approvals needed):\n Candidate | Power | Approvals | Expiration\n------------------------------------------------------------------+-------+-----------+------------\n AccountID{identifier = 1234, keyType = ed25519} | 100 | 2 | " + nowStr + "\n AccountID{identifier = 5678, keyType = secp256k1} | 200 | 0 | " + nowStr, }, } diff --git a/config/config.go b/config/config.go index a35ee96a5..80203fa7c 100644 --- a/config/config.go +++ b/config/config.go @@ -28,24 +28,6 @@ const ( AdminCertName = "admin.cert" ) -// Duration is a wrapper around time.Duration that implements text -// (un)marshalling for the go-toml package to work with Go duration strings -// instead of integers. -type Duration time.Duration - -func (d Duration) MarshalText() ([]byte, error) { - return []byte(time.Duration(d).String()), nil -} - -func (d *Duration) UnmarshalText(text []byte) error { - duration, err := time.ParseDuration(string(text)) - if err != nil { - return err - } - *d = Duration(duration) - return nil -} - type GenesisAlloc struct { ID KeyHexBytes `json:"id"` KeyType string `json:"key_type"` @@ -239,7 +221,7 @@ func DefaultGenesisConfig() *GenesisConfig { Leader: types.PublicKey{}, DBOwner: "", MaxBlockSize: 6 * 1024 * 1024, - JoinExpiry: 14400, + JoinExpiry: types.Duration(86400 * time.Second), DisabledGasCosts: true, MaxVotesPerTx: 200, MigrationStatus: types.NoActiveMigration, @@ -259,9 +241,10 @@ func DefaultConfig() *Config { BootNodes: []string{}, }, Consensus: ConsensusConfig{ - ProposeTimeout: Duration(1000 * time.Millisecond), - BlockProposalInterval: Duration(1 * time.Second), - BlockAnnInterval: Duration(3 * time.Second), + ProposeTimeout: types.Duration(1000 * time.Millisecond), + EmptyBlockTimeout: types.Duration(1 * time.Minute), + BlockProposalInterval: types.Duration(1 * time.Second), + BlockAnnInterval: types.Duration(3 * time.Second), }, DB: DBConfig{ Host: "127.0.0.1", @@ -269,16 +252,16 @@ func DefaultConfig() *Config { User: "kwild", Pass: "", DBName: "kwild", - ReadTxTimeout: Duration(45 * time.Second), + ReadTxTimeout: types.Duration(45 * time.Second), MaxConns: 60, }, RPC: RPCConfig{ ListenAddress: "0.0.0.0:8484", - BroadcastTxTimeout: Duration(15 * time.Second), - Timeout: Duration(20 * time.Second), + BroadcastTxTimeout: types.Duration(15 * time.Second), + Timeout: types.Duration(20 * time.Second), MaxReqSize: 6_000_000, Private: false, - ChallengeExpiry: Duration(30 * time.Second), + ChallengeExpiry: types.Duration(30 * time.Second), ChallengeRateLimit: 10, }, Admin: AdminConfig{ @@ -296,7 +279,7 @@ func DefaultConfig() *Config { }, StateSync: StateSyncConfig{ Enable: false, - DiscoveryTimeout: Duration(30 * time.Second), + DiscoveryTimeout: types.Duration(30 * time.Second), MaxRetries: 3, }, Extensions: make(map[string]map[string]string), @@ -346,37 +329,39 @@ type DBConfig struct { // However, this is less error prone, and prevents passing settings that // would alter the functionality of the connection. An advanced option could // be added to supplement the conn string if that seems useful. - Host string `toml:"host" comment:"postgres host name (IP or UNIX socket path)"` - Port string `toml:"port" comment:"postgres TCP port (leave empty for UNIX socket)"` - User string `toml:"user" comment:"postgres role/user name"` - Pass string `toml:"pass" comment:"postgres password if required for the user and host"` - DBName string `toml:"dbname" comment:"postgres database name"` - ReadTxTimeout Duration `toml:"read_timeout" comment:"timeout on read transactions from user RPC calls and queries"` - MaxConns uint32 `toml:"max_connections" comment:"maximum number of DB connections to permit"` + Host string `toml:"host" comment:"postgres host name (IP or UNIX socket path)"` + Port string `toml:"port" comment:"postgres TCP port (leave empty for UNIX socket)"` + User string `toml:"user" comment:"postgres role/user name"` + Pass string `toml:"pass" comment:"postgres password if required for the user and host"` + DBName string `toml:"dbname" comment:"postgres database name"` + ReadTxTimeout types.Duration `toml:"read_timeout" comment:"timeout on read transactions from user RPC calls and queries"` + MaxConns uint32 `toml:"max_connections" comment:"maximum number of DB connections to permit"` } type ConsensusConfig struct { - ProposeTimeout Duration `toml:"propose_timeout" comment:"timeout for proposing a block (applies to leader)"` - // reannounce intervals + ProposeTimeout types.Duration `toml:"propose_timeout" comment:"minimum duration to wait before proposing a block with transactions (applies to leader). This value can't be zero, if set to 0, default of 1 sec will be used for block production."` + + EmptyBlockTimeout types.Duration `toml:"empty_block_timeout" comment:"timeout for proposing an empty block. If set to 0, disables empty blocks, leader will wait indefinitely until transactions are available to produce a block."` // BlockProposalInterval is the interval between block proposal reannouncements by the leader. - // This impacts the time it takes for an out-of-sync validator to receive the current block proposal, + // This affects the time it takes for an out-of-sync validator to receive the current block proposal, // thereby impacting the block times. Default is 1 second. - BlockProposalInterval Duration `toml:"block_proposal_interval" comment:"interval between block proposal reannouncements by the leader"` - // BlockAnnInterval is the frequency with which the block commit messages are reannouncements by the leader, - // and votes reannounced by validators. Default is 3 second. This impacts the time it takes for an + BlockProposalInterval types.Duration `toml:"block_proposal_interval" comment:"interval between block proposal reannouncements by the leader"` + + // BlockAnnInterval is the frequency with which the block commit messages are reannounced by the leader, + // and votes reannounced by validators. Default is 3 seconds. This affects the time it takes for // out-of-sync nodes to catch up with the latest block. - BlockAnnInterval Duration `toml:"block_ann_interval" comment:"interval between block commit reannouncements by the leader, and votes reannouncements by validators"` + BlockAnnInterval types.Duration `toml:"block_ann_interval" comment:"interval between block commit reannouncements by the leader, and votes reannouncements by validators"` } type RPCConfig struct { - ListenAddress string `toml:"listen" comment:"address in host:port format on which the RPC server will listen"` - BroadcastTxTimeout Duration `toml:"broadcast_tx_timeout" comment:"duration to wait for a tx to be committed when transactions are authored with --sync flag"` - Timeout Duration `toml:"timeout" comment:"user request duration limit after which it is cancelled"` - MaxReqSize int `toml:"max_req_size" comment:"largest permissible user request size"` - Private bool `toml:"private" comment:"enable private mode that requires challenge authentication for each call"` - ChallengeExpiry Duration `toml:"challenge_expiry" comment:"lifetime of a server-generated challenge"` - ChallengeRateLimit float64 `toml:"challenge_rate_limit" comment:"maximum number of challenges per second that a user can request"` + ListenAddress string `toml:"listen" comment:"address in host:port format on which the RPC server will listen"` + BroadcastTxTimeout types.Duration `toml:"broadcast_tx_timeout" comment:"duration to wait for a tx to be committed when transactions are authored with --sync flag"` + Timeout types.Duration `toml:"timeout" comment:"user request duration limit after which it is cancelled"` + MaxReqSize int `toml:"max_req_size" comment:"largest permissible user request size"` + Private bool `toml:"private" comment:"enable private mode that requires challenge authentication for each call"` + ChallengeExpiry types.Duration `toml:"challenge_expiry" comment:"lifetime of a server-generated challenge"` + ChallengeRateLimit float64 `toml:"challenge_rate_limit" comment:"maximum number of challenges per second that a user can request"` } type AdminConfig struct { @@ -398,8 +383,8 @@ type StateSyncConfig struct { Enable bool `toml:"enable" comment:"enable using statesync rather than blocksync"` TrustedProviders []string `toml:"trusted_providers" comment:"trusted snapshot providers in node ID format (see bootnodes)"` - DiscoveryTimeout Duration `toml:"discovery_time" comment:"how long to discover snapshots before selecting one to use"` - MaxRetries uint64 `toml:"max_retries" comment:"how many times to try after failing to apply a snapshot before switching to blocksync"` + DiscoveryTimeout types.Duration `toml:"discovery_time" comment:"how long to discover snapshots before selecting one to use"` + MaxRetries uint64 `toml:"max_retries" comment:"how many times to try after failing to apply a snapshot before switching to blocksync"` } type MigrationConfig struct { diff --git a/config/config_test.go b/config/config_test.go index 8670ed495..b2d5d8037 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -12,6 +12,7 @@ import ( "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/log" + "github.com/kwilteam/kwil-db/core/types" ) // TestMarshalDuration ensures that a time.Duration can be marshaled and @@ -19,10 +20,10 @@ import ( // case for some reason **cough specs cough**. func TestMarshalDuration(t *testing.T) { type td struct { - Duration Duration `koanf:"duration" toml:"duration"` + Duration types.Duration `koanf:"duration" toml:"duration"` } tt := td{ - Duration: Duration(10 * time.Second), + Duration: types.Duration(10 * time.Second), } bts, err := gotoml.Marshal(tt) if err != nil { @@ -63,7 +64,7 @@ func TestConfigSaveAndLoad(t *testing.T) { User: "kwild", Pass: "kwild", DBName: "kwild", - ReadTxTimeout: Duration(45 * time.Second), + ReadTxTimeout: types.Duration(45 * time.Second), MaxConns: 10, }, }, diff --git a/core/types/chain/types.go b/core/types/chain/types.go index 43c8fcdb4..498267027 100644 --- a/core/types/chain/types.go +++ b/core/types/chain/types.go @@ -64,7 +64,7 @@ type Genesis struct { MaxBlockSize int64 `json:"max_block_size"` // JoinExpiry is the number of blocks after which the validators // join request expires if not approved. - JoinExpiry int64 `json:"join_expiry"` + JoinExpiry types.Duration `json:"join_expiry"` // DisabledGasCosts dictates whether gas costs are disabled. DisabledGasCosts bool `json:"disabled_gas_costs"` // MaxVotesPerTx is the maximum number of votes that can be included in a diff --git a/core/types/params.go b/core/types/params.go index 7303131f0..49fc637e8 100644 --- a/core/types/params.go +++ b/core/types/params.go @@ -99,13 +99,15 @@ type NetworkParameters struct { // MaxBlockSize is the maximum size of a block in bytes. MaxBlockSize int64 `json:"max_block_size"` - // JoinExpiry is the number of blocks after which the validators - // join request expires if not approved. - JoinExpiry int64 `json:"join_expiry"` - // DisabledGasCosts dictates whether gas costs are disabled. + + // JoinExpiry is the time duration (in seconds) after which a resolution is + // considered expired since its creation. + JoinExpiry Duration `json:"join_expiry"` + + // DisabledGasCosts indicates whether gas costs are disabled. DisabledGasCosts bool `json:"disabled_gas_costs"` - // MaxVotesPerTx is the maximum number of votes that can be included in a - // single transaction. + + // MaxVotesPerTx is the maximum number of votes allowed in a single transaction. MaxVotesPerTx int64 `json:"max_votes_per_tx"` MigrationStatus MigrationStatus `json:"migration_status"` @@ -206,7 +208,7 @@ func MergeUpdates(np *NetworkParameters, updates ParamUpdates) (err error) { case ParamNameMaxBlockSize: np.MaxBlockSize = update.(int64) case ParamNameJoinExpiry: - np.JoinExpiry = update.(int64) + np.JoinExpiry = update.(Duration) case ParamNameDisabledGasCosts: np.DisabledGasCosts = update.(bool) case ParamNameMaxVotesPerTx: @@ -339,7 +341,15 @@ func (pu ParamUpdates) MarshalBinary() ([]byte, error) { } else { return nil, fmt.Errorf("invalid type for %s", key) } - case ParamNameMaxBlockSize, ParamNameJoinExpiry, ParamNameMaxVotesPerTx: + case ParamNameJoinExpiry: + if val, ok := value.(Duration); ok { + if err := binary.Write(buf, binary.LittleEndian, val); err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("invalid type for %s", key) + } + case ParamNameMaxBlockSize, ParamNameMaxVotesPerTx: if val, ok := value.(int64); ok { if err := binary.Write(buf, binary.LittleEndian, val); err != nil { return nil, err @@ -427,7 +437,13 @@ func (pu *ParamUpdates) UnmarshalBinary(data []byte) error { return err } updates[paramName] = string(val) - case ParamNameMaxBlockSize, ParamNameJoinExpiry, ParamNameMaxVotesPerTx: + case ParamNameJoinExpiry: + var expiry Duration + if err := binary.Read(buf, binary.LittleEndian, &expiry); err != nil { + return err + } + updates[paramName] = expiry + case ParamNameMaxBlockSize, ParamNameMaxVotesPerTx: var val int64 if err := binary.Read(buf, binary.LittleEndian, &val); err != nil { return err diff --git a/core/types/params_test.go b/core/types/params_test.go index 2a61b3bfc..ad8aa0a46 100644 --- a/core/types/params_test.go +++ b/core/types/params_test.go @@ -3,6 +3,7 @@ package types import ( "reflect" "testing" + "time" "github.com/stretchr/testify/require" @@ -105,7 +106,7 @@ func TestParamUpdatesMarshalBinary(t *testing.T) { ParamNameLeader: newPubKey(), ParamNameDBOwner: "test_owner", ParamNameMaxBlockSize: int64(1000), - ParamNameJoinExpiry: int64(3600), + ParamNameJoinExpiry: Duration(10 * time.Second), ParamNameDisabledGasCosts: true, ParamNameMaxVotesPerTx: int64(10), ParamNameMigrationStatus: MigrationStatus("pending"), diff --git a/core/types/time.go b/core/types/time.go new file mode 100644 index 000000000..efc227a05 --- /dev/null +++ b/core/types/time.go @@ -0,0 +1,21 @@ +package types + +import "time" + +// Duration is a wrapper around time.Duration that implements text +// (un)marshalling for the go-toml package to work with Go duration strings +// instead of integers. +type Duration time.Duration + +func (d Duration) MarshalText() ([]byte, error) { + return []byte(time.Duration(d).String()), nil +} + +func (d *Duration) UnmarshalText(text []byte) error { + duration, err := time.ParseDuration(string(text)) + if err != nil { + return err + } + *d = Duration(duration) + return nil +} diff --git a/core/types/types.go b/core/types/types.go index 552406caf..bc15a3cb1 100644 --- a/core/types/types.go +++ b/core/types/types.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "math/big" + "time" "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/crypto/auth" @@ -110,7 +111,7 @@ type ChainInfo struct { type JoinRequest struct { Candidate *AccountID `json:"candidate"` // pubkey of the candidate validator Power int64 `json:"power"` // the requested power - ExpiresAt int64 `json:"expires_at"` // the block height at which the join request expires + ExpiresAt time.Time `json:"expires_at"` // the timestamp at which the join request expires Board []*AccountID `json:"board"` // slice of pubkeys of all the eligible voting validators Approved []bool `json:"approved"` // slice of bools indicating if the corresponding validator approved } @@ -253,7 +254,7 @@ func (e *VotableEvent) UnmarshalBinary(b []byte) error { type PendingResolution struct { Type string `json:"type"` ResolutionID *UUID `json:"resolution_id"` // Resolution ID - ExpiresAt int64 `json:"expires_at"` // ExpiresAt is the block height at which the resolution expires + ExpiresAt time.Time `json:"expires_at"` // ExpiresAt is the timestamp at which the resolution expires Board []*AccountID `json:"board"` // Board is the list of validators who are eligible to vote on the resolution Approved []bool `json:"approved"` // Approved is the list of bools indicating if the corresponding validator approved the resolution } diff --git a/extensions/resolutions/resolutions.go b/extensions/resolutions/resolutions.go index fcf21d359..c43eb0321 100644 --- a/extensions/resolutions/resolutions.go +++ b/extensions/resolutions/resolutions.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "strings" + "time" "github.com/kwilteam/kwil-db/common" "github.com/kwilteam/kwil-db/core/types" @@ -102,14 +103,11 @@ type ResolutionConfig struct { // number must be a fraction between 0 and 1. If this field is // nil, it will default to 2/3. ConfirmationThreshold *big.Rat - // ExpirationPeriod is the amount of blocks that the resolution - // will be valid for before it expires. It is applied additively - // to the current block height when the resolution is proposed; - // if the current block height is 10 and the expiration height is - // 5, the resolution will expire at block 15. If this field is - // <1, it will default to 14400, which is approximately 1 day - // assuming 6 second blocks. - ExpirationPeriod int64 + // ExpirationPeriod is the duration for which the resolution will + // be valid for before it expires. This is applied additively to the + // block timestamp in the header when the resolution is created. + // If not set, the resolution expiry is defaulted to 86400 secs (1 day) + ExpirationPeriod time.Duration // ResolveFunc is a function that is called once a resolution has // received a required number of votes, as defined by the // ConfirmationThreshold. It is given a readwrite database @@ -133,10 +131,10 @@ type Resolution struct { // Type is the type of the resolution. It is used to determine // the logic for the resolution. Type string - // ExpirationHeight is the block height at which the resolution - // is set to expire, if it has not received the required number - // of votes. - ExpirationHeight int64 + // Expiration is the timestamp at which the resolution is set to + // Expire. + Expiration time.Time + // Expiration int64 // ApprovedPower is the total power of the voters that have // approved the resolution. ApprovedPower int64 diff --git a/node/block_processor/interfaces.go b/node/block_processor/interfaces.go index fbafacc4a..60ae5d7f5 100644 --- a/node/block_processor/interfaces.go +++ b/node/block_processor/interfaces.go @@ -77,6 +77,12 @@ type EventStore interface { // MarkBroadcasted marks list of events as broadcasted. MarkBroadcasted(ctx context.Context, ids []*types.UUID) error + + // HasEvents return true if there are any events to be broadcasted + HasEvents() bool + + // records the events for which the resolutions have been created. + UpdateStats(deleteCnt int64) } var ( diff --git a/node/block_processor/processor.go b/node/block_processor/processor.go index 4d059d00f..df2d3a87b 100644 --- a/node/block_processor/processor.go +++ b/node/block_processor/processor.go @@ -413,6 +413,8 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe inMigration := bp.chainCtx.NetworkParameters.MigrationStatus == ktypes.MigrationInProgress haltNetwork := bp.chainCtx.NetworkParameters.MigrationStatus == ktypes.MigrationCompleted + isLeader := bp.signer.PubKey().Equals(req.Proposer) + blockCtx := &common.BlockContext{ Height: req.Height, Timestamp: req.Block.Header.Timestamp.Unix(), @@ -472,6 +474,16 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe } txResults[i] = txResult + + if isLeader && tx.Body.PayloadType == ktypes.PayloadTypeValidatorVoteBodies { + body := &ktypes.ValidatorVoteBodies{} + if err := body.UnmarshalBinary(tx.Body.Payload); err != nil { + return nil, fmt.Errorf("failed to unmarshal validator votebody tx") + } + + numEvents := int64(len(body.Events)) + bp.events.UpdateStats(numEvents) + } } } @@ -554,7 +566,6 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe if err != nil { return nil, fmt.Errorf("failed to compute the consensus updates hash: %w", err) } - bp.log.Info("Consensus updates", "hash", paramUpdatesHash, "updates", bp.chainCtx.NetworkUpdates) nextHash := bp.nextAppHash(stateHashes{ prevApp: bp.appHash, @@ -577,8 +588,10 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe } } - bp.log.Info("Executed Block", "height", req.Height, "blkHash", req.BlockID, "appHash", nextHash) - bp.log.Infoln("network param updates:", bp.chainCtx.NetworkUpdates) + bp.log.Info("Executed Block", "height", req.Height, "blkID", req.BlockID, "appHash", nextHash, "numTxs", req.Block.Header.NumTxns) + if len(bp.chainCtx.NetworkUpdates) != 0 { + bp.log.Info("Consensus updates", "hash", paramUpdatesHash, "updates", bp.chainCtx.NetworkUpdates) + } return &ktypes.BlockExecResult{ TxResults: txResults, @@ -711,9 +724,9 @@ func (bp *BlockProcessor) Commit(ctx context.Context, req *ktypes.CommitRequest) bp.clearBlockExecutionStatus() // TODO: not very sure where to clear this // Announce final validators to subscribers - bp.announceValidators() // can be in goroutine? + bp.announceValidators() // can be in goroutine? no, because the modules state need to be updated by the next consensus round? - bp.log.Info("Committed Block", "height", req.Height, "appHash", req.AppHash.String()) + bp.log.Debug("Committed Block", "height", req.Height, "appHash", req.AppHash.String()) return nil } diff --git a/node/block_processor/transactions.go b/node/block_processor/transactions.go index fcb241956..a4902be0e 100644 --- a/node/block_processor/transactions.go +++ b/node/block_processor/transactions.go @@ -206,6 +206,14 @@ func (bp *BlockProcessor) prepareBlockTransactions(ctx context.Context, txs []*t return finalTxs, invalidTxs, nil } +func (bp *BlockProcessor) HasEvents() bool { + return bp.events.HasEvents() +} + +func (bp *BlockProcessor) UpdateStats(delectCnt int64) { + bp.events.UpdateStats(delectCnt) +} + // prepareValidatorVoteBodyTx authors the ValidatorVoteBody transaction to be included by the leader in the block. // It fetches the events which does not have resolutions yet and creates a validator vote body transaction. // The number of events to be included in a single transaction is limited either by MaxVotesPerTx or the maxTxSize diff --git a/node/block_processor/transactions_test.go b/node/block_processor/transactions_test.go index 87d6d20dd..bb31ee1fe 100644 --- a/node/block_processor/transactions_test.go +++ b/node/block_processor/transactions_test.go @@ -812,6 +812,13 @@ func (m *mockEventStore) MarkBroadcasted(ctx context.Context, ids []*types.UUID) return nil } +func (m *mockEventStore) HasEvents() bool { + return true +} + +func (m *mockEventStore) UpdateStats(cnt int64) { +} + type mockValidatorStore struct { valSet []*types.Validator } diff --git a/node/consensus/block.go b/node/consensus/block.go index 529ea8281..6a1c17ee3 100644 --- a/node/consensus/block.go +++ b/node/consensus/block.go @@ -148,6 +148,9 @@ func (ce *ConsensusEngine) executeBlock(ctx context.Context, blkProp *blockPropo paramUpdates: results.ParamUpdates, } + // reset the catchup timer as we have successfully processed a new block proposal + ce.catchupTicker.Reset(ce.catchupTimeout) + ce.log.Info("Executed block", "height", blkProp.height, "blkID", blkProp.blkHash, "numTxs", blkProp.blk.Header.NumTxns, "appHash", results.AppHash.String()) return nil } @@ -197,6 +200,9 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error { // update the role of the node based on the final validator set at the end of the commit. ce.updateValidatorSetAndRole() + // reset the catchup timer as we have successfully processed a new block proposal + ce.catchupTicker.Reset(ce.catchupTimeout) + ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash.String(), "appHash", appHash.String(), "updates", ce.state.blockRes.paramUpdates) return nil diff --git a/node/consensus/blocksync.go b/node/consensus/blocksync.go index e71e35770..e744bbeb1 100644 --- a/node/consensus/blocksync.go +++ b/node/consensus/blocksync.go @@ -3,6 +3,7 @@ package consensus import ( "context" "encoding/hex" + "errors" "fmt" "time" @@ -17,37 +18,41 @@ import ( // leading to a fork. func (ce *ConsensusEngine) doBlockSync(ctx context.Context) error { if ce.role.Load() == types.RoleLeader { - // TODO: The validator set info that leader might have at the time it starts - // blocksync is outdated. And if the previous validators - if len(ce.validatorSet) == 1 { - return nil // we are the network - } - ce.log.Info("Starting block sync", "height", ce.state.lc.height) + return ce.leaderBlockSync(ctx) + } - // figure out the best height to sync with the network - // before starting to request blocks from the network. - bestHeight, err := ce.discoverBestHeight(ctx) - if err != nil { - return fmt.Errorf("failed to discover the network's best height: %w", err) - } + // Validators and sentry nodes can do best effort block sync + // and start the consensus engine at the latest height. If they + // are behind, they can catch up as they process blocks. + return ce.replayBlockFromNetwork(ctx, ce.syncBlock) +} - if bestHeight <= ce.state.lc.height { - // replay blocks from the network to catch up with the network. - ce.log.Info("Leader is up to date with the network", "height", ce.state.lc.height) +func (ce *ConsensusEngine) leaderBlockSync(ctx context.Context) error { + if len(ce.validatorSet) == 1 { + return nil // we are the network + } - return nil - } + startHeight := ce.lastCommitHeight() + ce.log.Info("Starting block sync", "height", startHeight+1) + // figure out the best height to sync with the network + // before starting to request blocks from the network. + bestHeight, err := ce.discoverBestHeight(ctx) + if err != nil { + return fmt.Errorf("failed to discover the network's best height: %w", err) + } + + if bestHeight <= startHeight { // replay blocks from the network to catch up with the network. - return ce.syncBlocksUntilHeight(ctx, ce.state.lc.height+1, bestHeight) + ce.log.Info("Leader is up to date with the network", "height", startHeight) + return nil } - // Validators and sentry nodes can do best effort block sync - // and start the consensus engine at the latest height. If they - // are behind, they can catch up as they process blocks. - return ce.replayBlockFromNetwork(ctx) + return ce.syncBlocksUntilHeight(ctx, startHeight+1, bestHeight) } +// discoverBestHeight is a discovery process that leader uses to figure out the +// latest network height from the validators. func (ce *ConsensusEngine) discoverBestHeight(ctx context.Context) (int64, error) { cancelCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -103,76 +108,83 @@ func (ce *ConsensusEngine) discoverBestHeight(ctx context.Context) (int64, error } } -// replayBlockFromNetwork requests the next blocks from the network and processes it -// until it catches up with its peers. -func (ce *ConsensusEngine) replayBlockFromNetwork(ctx context.Context) error { - startHeight := ce.state.lc.height + 1 +// replayBlockFromNetwork attempts to synchronize the local node with the network by fetching +// and processing blocks from peers. +func (ce *ConsensusEngine) replayBlockFromNetwork(ctx context.Context, requester func(context.Context, int64) error) error { + var startHeight, height int64 + startHeight = ce.lastCommitHeight() + 1 + height = startHeight t0 := time.Now() for { - ce.log.Info("Requesting block from network (replay mode)", "height", ce.state.lc.height+1) - _, rawblk, ci, err := ce.blkRequester(ctx, ce.state.lc.height+1) - if err != nil { // all kinds of errors? - ce.log.Info("Error requesting block from network", "height", ce.state.lc.height+1, "error", err) - break // no more blocks to sync from network. - } - - if ce.state.lc.height != 0 && ci != nil && ci.AppHash.IsZero() { - return nil - } - - blk, err := ktypes.DecodeBlock(rawblk) - if err != nil { - return fmt.Errorf("failed to decode block: %w", err) - } - - if err := ce.processAndCommit(ctx, blk, ci); err != nil { - return err + if err := requester(ctx, height); err != nil { + ce.log.Info("block request from the network failed", "height", height, "error", err) + break } + height++ } - ce.log.Info("Block sync completed", "startHeight", startHeight, "endHeight", ce.state.lc.height, "duration", time.Since(t0)) + ce.log.Info("Block sync completed", "startHeight", startHeight, "endHeight", height, "duration", time.Since(t0)) return nil } -// replayBlockFromNetwork requests the next blocks from the network and processes it -// until it catches up with its peers. +// syncBlocksUntilHeight fetches and processes blocks from startHeight to endHeight, +// retrying if necessary until successful or the maximum retries are reached. func (ce *ConsensusEngine) syncBlocksUntilHeight(ctx context.Context, startHeight, endHeight int64) error { - height := startHeight t0 := time.Now() for height <= endHeight { - // TODO: This is used in blocksync for leader, failure of fetching the block after certain retries should fail the node - _, rawblk, ci, err := ce.getBlock(ctx, height) - if err != nil { // all kinds of errors? - ce.log.Info("Error requesting block from network", "height", ce.state.lc.height+1, "error", err) - return fmt.Errorf("error requesting block from network: height : %d, error: %w", ce.state.lc.height+1, err) + if err := ce.syncBlockWithRetry(ctx, height); err != nil { + return err } + height++ + } - if ce.state.lc.height != 0 && ci != nil && ci.AppHash.IsZero() { - return nil - } + ce.log.Info("Block sync completed", "startHeight", startHeight, "endHeight", endHeight, "duration", time.Since(t0)) - blk, err := ktypes.DecodeBlock(rawblk) - if err != nil { - return fmt.Errorf("failed to decode block: %w", err) - } + return nil +} - if err := ce.processAndCommit(ctx, blk, ci); err != nil { - return err - } +// syncBlockWithRetry fetches the specified block from the network and keeps retrying until +// the block is successfully retrieved from the network. +func (ce *ConsensusEngine) syncBlockWithRetry(ctx context.Context, height int64) error { + _, rawblk, ci, err := ce.getBlock(ctx, height) + if err != nil { // all kinds of errors? + return fmt.Errorf("error requesting block from network: height : %d, error: %w", height, err) + } - height++ + return ce.applyBlock(ctx, rawblk, ci) +} + +// syncBlock fetches the specified block from the network +func (ce *ConsensusEngine) syncBlock(ctx context.Context, height int64) error { + _, rawblk, ci, err := ce.blkRequester(ctx, height) + if err != nil { // all kinds of errors? + return fmt.Errorf("error requesting block from network: height : %d, error: %w", height, err) } - ce.log.Info("Block sync completed", "startHeight", startHeight, "endHeight", endHeight, "duration", time.Since(t0)) + return ce.applyBlock(ctx, rawblk, ci) +} + +func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *ktypes.CommitInfo) error { + ce.state.mtx.Lock() + defer ce.state.mtx.Unlock() + + blk, err := ktypes.DecodeBlock(rawBlk) + if err != nil { + return fmt.Errorf("failed to decode block: %w", err) + } + + if err := ce.processAndCommit(ctx, blk, ci); err != nil { + return err + } return nil } func (ce *ConsensusEngine) getBlock(ctx context.Context, height int64) (blkID types.Hash, rawBlk []byte, ci *ktypes.CommitInfo, err error) { - err = retry(ctx, 15, func() error { + err = blkRetrier(ctx, 15, func() error { blkID, rawBlk, ci, err = ce.blkRequester(ctx, height) return err }) @@ -181,7 +193,7 @@ func (ce *ConsensusEngine) getBlock(ctx context.Context, height int64) (blkID ty } // retry will retry the function until it is successful, or reached the max retries -func retry(ctx context.Context, maxRetries int64, fn func() error) error { +func blkRetrier(ctx context.Context, maxRetries int64, fn func() error) error { retrier := &backoff.Backoff{ Min: 100 * time.Millisecond, Max: 500 * time.Millisecond, @@ -195,6 +207,10 @@ func retry(ctx context.Context, maxRetries int64, fn func() error) error { return nil } + if errors.Is(err, types.ErrBlkNotFound) { + return err + } + // fail after maxRetries retries if retrier.Attempt() > float64(maxRetries) { return err diff --git a/node/consensus/engine.go b/node/consensus/engine.go index ef8b2f936..19201b475 100644 --- a/node/consensus/engine.go +++ b/node/consensus/engine.go @@ -24,6 +24,8 @@ const ( // Use these accordingly MaxBlockSize = 4 * 1024 * 1024 // 1 MB blockTxCount = 50 + + defaultProposeTimeout = 1 * time.Second ) var zeroHash = types.Hash{} @@ -44,8 +46,11 @@ type ConsensusEngine struct { log log.Logger // proposeTimeout specifies the time duration to wait before proposing a new block for the next height. - // Default is 1 second. + // This timeout is used by the leader to propose a block if transactions are available. Default is 1 second. proposeTimeout time.Duration + // emptyBlockTimeout specifies the time duration to wait before proposing an empty block. + // This should always be greater than the proposeTimeout. Default is 1 minute. + emptyBlockTimeout time.Duration // blkProposalInterval specifies the time duration to wait before reannouncing the block proposal message. // This is only applicable for the leader. This timeout influences how quickly the out-of-sync nodes can @@ -77,6 +82,10 @@ type ConsensusEngine struct { numResets int64 // Channels + newBlockProposal chan struct{} // triggers block production in the leader + // newRound triggers the start of a new round in the consensus engine. + // leader waits for the minBlockInterval before proposing a new block if txs are available. + // if no txs are available for the maxBlockInterval duration, the leader proposes an empty block. newRound chan struct{} msgChan chan consensusMessage haltChan chan string // can take a msg or reason for halting the network @@ -109,6 +118,9 @@ type ConsensusEngine struct { // waitgroup to track all the consensus goroutines wg sync.WaitGroup + + catchupTicker *time.Ticker + catchupTimeout time.Duration } // Config is the struct given to the constructor, [New]. @@ -119,8 +131,13 @@ type Config struct { Leader crypto.PublicKey // GenesisHeight is the initial height of the network. GenesisHeight int64 - // ProposeTimeout is the timeout for proposing a block. + + // ProposeTimeout is the minimum time duration to wait before proposing a new block. + // Leader can propose a block with transactions as soon as this timeout is reached. Default is 1 second. ProposeTimeout time.Duration + // EmptyBlockTimeout is the maximum time duration to wait before proposing a new block without transactions. + // Default is 1 minute. + EmptyBlockTimeout time.Duration // BlkPropReannounceInterval is the frequency at which block proposal messages are reannounced by the Leader. BlockProposalInterval time.Duration // BlkAnnReannounceInterval is the frequency at which block commit messages are reannounced by the Leader. @@ -253,6 +270,7 @@ func New(cfg *Config) *ConsensusEngine { privKey: cfg.PrivateKey, leader: cfg.Leader, proposeTimeout: cfg.ProposeTimeout, + emptyBlockTimeout: cfg.EmptyBlockTimeout, blkProposalInterval: cfg.BlockProposalInterval, blkAnnInterval: cfg.BlockAnnInterval, broadcastTxTimeout: cfg.BroadcastTxTimeout, @@ -272,12 +290,14 @@ func New(cfg *Config) *ConsensusEngine { status: Committed, blkProp: nil, }, - genesisHeight: cfg.GenesisHeight, - msgChan: make(chan consensusMessage, 1), // buffer size?? - haltChan: make(chan string, 1), - resetChan: make(chan *resetMsg, 1), - bestHeightCh: make(chan *discoveryMsg, 1), - newRound: make(chan struct{}, 1), + genesisHeight: cfg.GenesisHeight, + msgChan: make(chan consensusMessage, 1), // buffer size?? + haltChan: make(chan string, 1), + resetChan: make(chan *resetMsg, 1), + bestHeightCh: make(chan *discoveryMsg, 1), + newRound: make(chan struct{}, 1), + newBlockProposal: make(chan struct{}, 1), + // interfaces mempool: cfg.Mempool, blockStore: cfg.BlockStore, @@ -291,6 +311,10 @@ func New(cfg *Config) *ConsensusEngine { // Status, etc. tries to access the role. ce.role.Store(types.RoleSentry) + if ce.proposeTimeout == 0 { // can't be zero + ce.proposeTimeout = defaultProposeTimeout + } + return ce } @@ -304,6 +328,8 @@ func (ce *ConsensusEngine) Start(ctx context.Context, fns BroadcastFns, peerFns ce.txAnnouncer = fns.TxAnnouncer ce.blockProcessor.SetCallbackFns(fns.TxBroadcaster, peerFns.AddPeer, peerFns.RemovePeer) + ce.catchupTimeout = min(5*time.Second, ce.emptyBlockTimeout+ce.blkProposalInterval) + ce.catchupTicker = time.NewTicker(ce.catchupTimeout) ce.log.Info("Starting the consensus engine") ctx, cancel := context.WithCancel(ctx) @@ -318,7 +344,7 @@ func (ce *ConsensusEngine) Start(ctx context.Context, fns BroadcastFns, peerFns // nodes are activated when they receive a block proposal or block announce msg. if ce.role.Load() == types.RoleLeader { ce.log.Infof("Starting the leader node") - ce.newRound <- struct{}{} // recv by runConsensusEventLoop, buffered + ce.newBlockProposal <- struct{}{} // recv by runConsensusEventLoop, buffered } else { ce.log.Infof("Starting the validator/sentry node") } @@ -392,10 +418,14 @@ func (ce *ConsensusEngine) Status() *ktypes.NodeStatus { // catchup with the network and reannounce the messages. func (ce *ConsensusEngine) runConsensusEventLoop(ctx context.Context) error { ce.log.Info("Starting the consensus event loop...") - catchUpTicker := time.NewTicker(5 * time.Second) // Should this be configurable?? reannounceTicker := time.NewTicker(ce.blkAnnInterval) // 3 secs (default) blkPropTicker := time.NewTicker(ce.blkProposalInterval) // 1 sec (default) + // If no messages are received within the below specified duration after the last consensus message, + // and given that the leader is expected to produce a block within the emptyBlockTimeout interval, + // initiate catchup mode to request any missed messages. + // The catchupticker resets with each processed consensus message that successfully advances the node's state + for { select { case <-ctx.Done(): @@ -407,18 +437,21 @@ func (ce *ConsensusEngine) runConsensusEventLoop(ctx context.Context) error { return nil case <-ce.newRound: + go ce.newBlockRound(ctx) + + case <-ce.newBlockProposal: params := ce.blockProcessor.ConsensusParams() if params.MigrationStatus == ktypes.MigrationCompleted { ce.log.Info("Network halted due to migration, no more blocks will be produced") } - if err := ce.startNewRound(ctx); err != nil { + if err := ce.proposeBlock(ctx); err != nil { ce.log.Error("Error starting a new round", "error", err) return err } - case <-catchUpTicker.C: - err := ce.doCatchup(ctx) // better name?? + case <-ce.catchupTicker.C: + err := ce.doCatchup(ctx) if err != nil { return err } @@ -477,8 +510,7 @@ func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg cons } -// catchup syncs the node first with the local blockstore and then with the network. -func (ce *ConsensusEngine) catchup(ctx context.Context) error { +func (ce *ConsensusEngine) initializeState(ctx context.Context) (int64, int64, error) { // Figure out the app state and initialize the node state. ce.state.mtx.Lock() defer ce.state.mtx.Unlock() @@ -488,7 +520,7 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error { readTx, err := ce.db.BeginReadTx(ctx) if err != nil { - return err + return -1, -1, err } defer readTx.Rollback(ctx) @@ -498,18 +530,18 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error { // retrieve the app state from the meta table appHeight, appHash, dirty, err := meta.GetChainState(ctx, readTx) if err != nil { - return err + return -1, -1, err } if dirty { - return fmt.Errorf("app state is dirty, error in the blockprocessor initialization, height: %d, appHash: %x", appHeight, appHash) + return -1, -1, fmt.Errorf("app state is dirty, error in the blockprocessor initialization, height: %d, appHash: %x", appHeight, appHash) } ce.log.Info("Initial Node state: ", "appHeight", appHeight, "storeHeight", storeHeight, "appHash", appHash, "storeAppHash", storeAppHash) if appHeight > storeHeight && appHeight != ce.genesisHeight { // This is not possible, App can't be ahead of the store - return fmt.Errorf("app height %d is greater than the store height %d (did you forget to reset postgres?)", appHeight, storeHeight) + return -1, -1, fmt.Errorf("app height %d is greater than the store height %d (did you forget to reset postgres?)", appHeight, storeHeight) } if appHeight == -1 { @@ -517,7 +549,7 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error { // initialize the db with the genesis state appHeight, appHash, err = ce.blockProcessor.InitChain(ctx) if err != nil { - return fmt.Errorf("error initializing the chain: %w", err) + return -1, -1, fmt.Errorf("error initializing the chain: %w", err) } ce.setLastCommitInfo(appHeight, nil, appHash) @@ -526,7 +558,7 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error { // restart or statesync init or zdt init if appHeight == storeHeight && !bytes.Equal(appHash, storeAppHash[:]) { // This is not possible, PG mismatches with the Blockstore return error - return fmt.Errorf("AppHash mismatch, appHash: %x, storeAppHash: %v", appHash, storeAppHash) + return -1, -1, fmt.Errorf("AppHash mismatch, appHash: %x, storeAppHash: %v", appHash, storeAppHash) } ce.setLastCommitInfo(appHeight, blkHash[:], appHash) } @@ -534,6 +566,17 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error { // Set the role and validator set based on the initial state of the voters before starting the replay ce.updateValidatorSetAndRole() + return appHeight, storeHeight, nil +} + +// catchup syncs the node first with the local blockstore and then with the network. +func (ce *ConsensusEngine) catchup(ctx context.Context) error { + // initialize the chain state + appHeight, storeHeight, err := ce.initializeState(ctx) + if err != nil { + return err + } + // Replay the blocks from the blockstore if the app hasn't played all the blocks yet. if appHeight < storeHeight { if err := ce.replayFromBlockStore(ctx, appHeight+1, storeHeight); err != nil { @@ -605,6 +648,9 @@ func (ce *ConsensusEngine) setLastCommitInfo(height int64, blkHash []byte, appHa // replayBlocks replays all the blocks from the blockstore if the app hasn't played all the blocks yet. func (ce *ConsensusEngine) replayFromBlockStore(ctx context.Context, startHeight, bestHeight int64) error { + ce.state.mtx.Lock() + defer ce.state.mtx.Unlock() + height := startHeight t0 := time.Now() @@ -671,30 +717,50 @@ func (ce *ConsensusEngine) rebroadcastBlkProposal(ctx context.Context) { func (ce *ConsensusEngine) doCatchup(ctx context.Context) error { // status check, nodes halt here if the migration is completed + ce.log.Info("No consensus messages received recently, initiating network catchup.") params := ce.blockProcessor.ConsensusParams() if params.MigrationStatus == ktypes.MigrationCompleted { ce.log.Info("Network halted due to migration, no more blocks will be produced") return nil } - ce.state.mtx.Lock() - defer ce.state.mtx.Unlock() - if ce.role.Load() == types.RoleLeader { return nil } - startHeight := ce.state.lc.height + 1 + startHeight := ce.lastCommitHeight() t0 := time.Now() + if err := ce.processCurrentBlock(ctx); err != nil { + if errors.Is(err, types.ErrBlkNotFound) { + return nil // retry again + } + ce.log.Error("error during block processing in catchup", "height", startHeight+1, "error", err) + return err + } + + err := ce.replayBlockFromNetwork(ctx, ce.syncBlockWithRetry) + if err != nil { + return err + } + + endHeight := ce.lastCommitHeight() + ce.log.Info("Network Sync: ", "from", startHeight, "to", endHeight, "time", time.Since(t0), "appHash", ce.state.lc.appHash) + + return nil +} + +func (ce *ConsensusEngine) processCurrentBlock(ctx context.Context) error { + ce.state.mtx.Lock() + defer ce.state.mtx.Unlock() + if ce.role.Load() == types.RoleValidator { // If validator is in the middle of processing a block, finish it first - if ce.state.blkProp != nil && ce.state.blockRes != nil { // Waiting for the commit message - blkHash, rawBlk, ci, err := ce.blkRequester(ctx, ce.state.blkProp.height) + blkHash, rawBlk, ci, err := ce.getBlock(ctx, ce.state.blkProp.height) // retries it if err != nil { - ce.log.Warn("Error requesting block from network", "height", ce.state.blkProp.height, "error", err) - return nil // not an error, just retry later + ce.log.Debug("Error requesting block from network", "height", ce.state.blkProp.height, "error", err) + return err } if blkHash != ce.state.blkProp.blkHash { // processed incorrect block @@ -729,14 +795,6 @@ func (ce *ConsensusEngine) doCatchup(ctx context.Context) error { } } } - - err := ce.replayBlockFromNetwork(ctx) - if err != nil { - return err - } - - ce.log.Info("Network Sync: ", "from", startHeight, "to", ce.state.lc.height, "time", time.Since(t0), "appHash", ce.state.lc.appHash) - return nil } @@ -840,3 +898,10 @@ func (ce *ConsensusEngine) UnsubscribeTx(txHash ktypes.Hash) { delete(ce.txSubscribers, txHash) } + +func (ce *ConsensusEngine) lastCommitHeight() int64 { + ce.stateInfo.mtx.RLock() + defer ce.stateInfo.mtx.RUnlock() + + return ce.stateInfo.height +} diff --git a/node/consensus/engine_test.go b/node/consensus/engine_test.go index 5ed70c9d4..d7580b9d0 100644 --- a/node/consensus/engine_test.go +++ b/node/consensus/engine_test.go @@ -164,6 +164,7 @@ func generateTestCEConfig(t *testing.T, nodes int, leaderDB bool) ([]*Config, ma // ValidatorSet: validatorSet, Logger: logger, ProposeTimeout: 1 * time.Second, + EmptyBlockTimeout: 1 * time.Second, BlockProposalInterval: 1 * time.Second, BlockAnnInterval: 3 * time.Second, BroadcastTxTimeout: 10 * time.Second, @@ -601,6 +602,76 @@ func TestValidatorStateMachine(t *testing.T) { }, }, }, + { + name: "Catchup mode with blk request fail", + setup: func(t *testing.T) ([]*Config, map[string]ktypes.Validator) { + return generateTestCEConfig(t, 2, false) + }, + actions: []action{ + { + name: "blkPropNew", + trigger: func(t *testing.T, leader, val *ConsensusEngine) { + val.NotifyBlockProposal(blkProp2.blk) + }, + verify: func(t *testing.T, leader, val *ConsensusEngine) error { + return verifyStatus(t, val, Executed, 0, blkProp2.blkHash) + }, + }, + { + name: "catchup", + trigger: func(t *testing.T, leader, val *ConsensusEngine) { + ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) + + rawBlk := ktypes.EncodeBlock(blkProp2.blk) + cnt := 0 + val.blkRequester = func(ctx context.Context, height int64) (types.Hash, []byte, *ktypes.CommitInfo, error) { + defer func() { cnt += 1 }() + + if cnt <= 1 { + return zeroHash, nil, nil, types.ErrBlkNotFound + } + return blkProp2.blkHash, rawBlk, ci, nil + } + val.doCatchup(context.Background()) + }, + verify: func(t *testing.T, leader, val *ConsensusEngine) error { + return verifyStatus(t, val, Committed, 1, blkProp2.blkHash) + }, + }, + }, + }, + { + name: "Catchup mode with blk request success", + setup: func(t *testing.T) ([]*Config, map[string]ktypes.Validator) { + return generateTestCEConfig(t, 2, false) + }, + actions: []action{ + { + name: "blkPropNew", + trigger: func(t *testing.T, leader, val *ConsensusEngine) { + val.NotifyBlockProposal(blkProp2.blk) + }, + verify: func(t *testing.T, leader, val *ConsensusEngine) error { + return verifyStatus(t, val, Executed, 0, blkProp2.blkHash) + }, + }, + { + name: "catchup", + trigger: func(t *testing.T, leader, val *ConsensusEngine) { + ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) + + rawBlk := ktypes.EncodeBlock(blkProp2.blk) + val.blkRequester = func(ctx context.Context, height int64) (types.Hash, []byte, *ktypes.CommitInfo, error) { + return blkProp2.blkHash, rawBlk, ci, nil + } + val.doCatchup(context.Background()) + }, + verify: func(t *testing.T, leader, val *ConsensusEngine) error { + return verifyStatus(t, val, Committed, 1, blkProp2.blkHash) + }, + }, + }, + }, } for _, tc := range testcases { @@ -640,7 +711,7 @@ func TestValidatorStateMachine(t *testing.T) { return false } return true - }, 6*time.Second, 100*time.Millisecond) + }, 6*time.Second, 500*time.Millisecond) } }) } @@ -838,7 +909,7 @@ func TestCELeaderTwoNodesMajorityNacks(t *testing.T) { // MockBroadcasters func mockBlkRequester(ctx context.Context, height int64) (types.Hash, []byte, *ktypes.CommitInfo, error) { - return types.Hash{}, nil, nil, fmt.Errorf("not implemented") + return types.Hash{}, nil, nil, types.ErrBlkNotFound } func mockBlockPropBroadcaster(_ context.Context, blk *ktypes.Block) {} @@ -933,13 +1004,6 @@ func (m *mockAccounts) Updates() []*ktypes.Account { return nil } -func (ce *ConsensusEngine) lastCommitHeight() int64 { - ce.stateInfo.mtx.RLock() - defer ce.stateInfo.mtx.RUnlock() - - return ce.stateInfo.height -} - func (ce *ConsensusEngine) info() (int64, Status, *blockProposal) { ce.stateInfo.mtx.RLock() defer ce.stateInfo.mtx.RUnlock() @@ -994,6 +1058,12 @@ func (m *mockEventStore) GetUnbroadcastedEvents(ctx context.Context) ([]*ktypes. return ids, nil } +func (m *mockEventStore) HasEvents() bool { + return true +} + +func (m *mockEventStore) UpdateStats(cnt int64) {} + type mockMigrator struct{} func (m *mockMigrator) NotifyHeight(ctx context.Context, block *common.BlockContext, db migrations.Database, tx sql.Executor) error { diff --git a/node/consensus/follower.go b/node/consensus/follower.go index 22a701306..a27046ebf 100644 --- a/node/consensus/follower.go +++ b/node/consensus/follower.go @@ -308,7 +308,6 @@ func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Blo ce.log.Info("Processing committed block", "height", blk.Header.Height, "hash", blkID, "appHash", ci.AppHash) if err := ce.validateBlock(blk); err != nil { - // ce.log.Errorf("Error validating block: %v", err) return err } @@ -331,12 +330,12 @@ func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Blo if !ce.state.blockRes.paramUpdates.Equals(ci.ParamUpdates) { // this is absorbed in apphash anyway, but helps diagnostics haltR := fmt.Sprintf("processAndCommit: Incorrect ParamUpdates, halting the node. received: %s, computed: %s", ci.ParamUpdates, ce.state.blockRes.paramUpdates) ce.haltChan <- haltR - return fmt.Errorf("paramUpdates mismatch, expected: %v, received: %v", ce.state.blockRes.paramUpdates, ci.ParamUpdates) + return errors.New(haltR) } if ce.state.blockRes.appHash != ci.AppHash { // do in acceptCommitInfo? - haltR := fmt.Sprintf("processAndCommit: Incorrect AppHash, halting the node. received: %s, computed: %s", ci.AppHash, ce.state.blockRes.appHash) + haltR := fmt.Sprintf("processAndCommit: AppHash mismatch, halting the node. expected: %s, received: %s", ce.state.blockRes.appHash, ci.AppHash) ce.haltChan <- haltR - return fmt.Errorf("appHash mismatch, expected: %s, received: %s", ci.AppHash, ce.state.blockRes.appHash) + return errors.New(haltR) } // Commit the block if the appHash and commitInfo is valid diff --git a/node/consensus/interfaces.go b/node/consensus/interfaces.go index 7c7586562..7142b204a 100644 --- a/node/consensus/interfaces.go +++ b/node/consensus/interfaces.go @@ -27,6 +27,7 @@ type Mempool interface { Remove(txid types.Hash) RecheckTxs(ctx context.Context, checkFn mempool.CheckFn) Store(types.Hash, *ktypes.Transaction) + TxsAvailable() bool } // BlockStore includes both txns and blocks @@ -54,4 +55,5 @@ type BlockProcessor interface { ConsensusParams() *ktypes.NetworkParameters BlockExecutionStatus() *ktypes.BlockExecutionStatus + HasEvents() bool } diff --git a/node/consensus/leader.go b/node/consensus/leader.go index 4a49707ad..eddea4c87 100644 --- a/node/consensus/leader.go +++ b/node/consensus/leader.go @@ -32,8 +32,45 @@ import ( // the current block, revert any state changes made, and remove the problematic transactions from the mempool before // reproposing the block. -// startNewRound initiates a new round of the consensus process (Prepare Phase). -func (ce *ConsensusEngine) startNewRound(ctx context.Context) error { +func (ce *ConsensusEngine) newBlockRound(ctx context.Context) error { + ticker := time.NewTicker(ce.proposeTimeout) + now := time.Now() + + // if EmptyBlockTimeout = 0, leader doesn't propose empty blocks. + // Behavior is similar to automine feature where the blocks are produced + // the moment transactions are available once the proposeTimeout is elapsed. + // if EmptyBlockTimeout is not 0, leader will propose an empty block + // if no transactions or events are available for emptyBlockTimeout duration. + allowEmptyBlocks := ce.emptyBlockTimeout != 0 + ce.log.Info("Starting a new consensus round", "height", ce.lastCommitHeight()+1) + + for { + select { + case <-ctx.Done(): + ce.log.Warn("Context cancelled, stopping the new block round") + return nil + case <-ticker.C: + // check for the availability of transactions in the mempool or + // if the leader has any new events to broadcast a voteID transaction + if ce.mempool.TxsAvailable() || ce.blockProcessor.HasEvents() { + ce.newBlockProposal <- struct{}{} + return nil + } + + // If the emptyBlockTimeout duration has elapsed, produce an empty block if + // empty blocks are allowed + if allowEmptyBlocks && time.Since(now) >= ce.emptyBlockTimeout { + ce.newBlockProposal <- struct{}{} + return nil + } + } + + // no transactions available, wait till the next tick to recheck the mempool + } +} + +// proposeBlock used by the leader to propose a new block to the network. +func (ce *ConsensusEngine) proposeBlock(ctx context.Context) error { ce.state.mtx.Lock() defer ce.state.mtx.Unlock() // Check if the network is halted due to migration or other reasons @@ -45,8 +82,6 @@ func (ce *ConsensusEngine) startNewRound(ctx context.Context) error { return nil } - ce.log.Info("Starting a new consensus round", "height", ce.state.lc.height+1) - blkProp, err := ce.createBlockProposal(ctx) if err != nil { ce.log.Errorf("Error creating a block proposal: %v", err) @@ -110,7 +145,9 @@ func (ce *ConsensusEngine) startNewRound(ctx context.Context) error { ce.mempoolMtx.Unlock() // signal ce to start a new round - ce.newRound <- struct{}{} + // ce.newRound <- struct{}{} + // repropse a new block + ce.newBlockProposal <- struct{}{} return nil } @@ -125,6 +162,8 @@ func (ce *ConsensusEngine) startNewRound(ctx context.Context) error { return err } + ce.log.Info("Waiting for votes from the validators", "height", blkProp.height, "hash", blkProp.blkHash) + ce.state.votes[string(ce.pubKey.Bytes())] = &ktypes.VoteInfo{ AppHash: &ce.state.blockRes.appHash, AckStatus: ktypes.AckStatusAgree, @@ -132,7 +171,6 @@ func (ce *ConsensusEngine) startNewRound(ctx context.Context) error { } ce.processVotes(ctx) - ce.log.Info("Waiting for votes from the validators", "height", blkProp.height, "hash", blkProp.blkHash) return nil } @@ -298,17 +336,8 @@ func (ce *ConsensusEngine) processVotes(ctx context.Context) error { // start the next round ce.nextState() - go func() { // must not sleep with ce.state mutex locked - // Wait for the timeout to start the next round - select { - case <-ctx.Done(): - return - case <-time.After(ce.proposeTimeout): - } - - // signal ce to start a new round - ce.newRound <- struct{}{} - }() + // signal ce to start a new round + ce.newRound <- struct{}{} } else if ce.hasMajorityCeil(nacks) { haltReason := fmt.Sprintf("Majority of the validators have rejected the block, halting the network: %d acks, %d nacks", acks, nacks) diff --git a/node/consensus/updates_test.go b/node/consensus/updates_test.go index 3988d5621..a671c9c92 100644 --- a/node/consensus/updates_test.go +++ b/node/consensus/updates_test.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "reflect" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -45,11 +46,25 @@ func TestParamUpdatesDeclaration_MarshalBinary(t *testing.T) { types.ParamNameLeader: types.PublicKey{PublicKey: pubkey}, types.ParamNameDBOwner: "0x1234567890123456789012345678901234567890", types.ParamNameDisabledGasCosts: false, - types.ParamNameJoinExpiry: int64(444), + types.ParamNameJoinExpiry: types.Duration(10 * time.Second), types.ParamNameMigrationStatus: types.MigrationCompleted, }, }, }, + { + name: "invalid expiry param updates", + declaration: ParamUpdatesDeclaration{ + Description: "test update", + ParamUpdates: types.ParamUpdates{ + types.ParamNameLeader: types.PublicKey{PublicKey: pubkey}, + types.ParamNameDBOwner: "0x1234567890123456789012345678901234567890", + types.ParamNameDisabledGasCosts: false, + types.ParamNameJoinExpiry: int64(10), + types.ParamNameMigrationStatus: types.MigrationCompleted, + }, + }, + wantErr: true, + }, } for _, tt := range tests { diff --git a/node/mempool/mempool.go b/node/mempool/mempool.go index 2e0ab3312..50c5645eb 100644 --- a/node/mempool/mempool.go +++ b/node/mempool/mempool.go @@ -128,3 +128,9 @@ func (mp *Mempool) RecheckTxs(ctx context.Context, fn CheckFn) { } } } + +func (mp *Mempool) TxsAvailable() bool { + mp.mtx.RLock() + defer mp.mtx.RUnlock() + return len(mp.txQ) > 0 +} diff --git a/node/meta/meta_test.go b/node/meta/meta_test.go index a29a874d7..562b6d810 100644 --- a/node/meta/meta_test.go +++ b/node/meta/meta_test.go @@ -5,6 +5,7 @@ package meta_test import ( "context" "testing" + "time" "github.com/stretchr/testify/require" @@ -53,7 +54,7 @@ func Test_NetworkParams(t *testing.T) { param := &common.NetworkParameters{ Leader: types.PublicKey{PublicKey: pubkey}, MaxBlockSize: 1000, - JoinExpiry: 100, + JoinExpiry: types.Duration(100 * time.Second), DisabledGasCosts: true, MaxVotesPerTx: 100, } diff --git a/node/migrations/migrator.go b/node/migrations/migrator.go index 85ff717ac..3bcdbd63a 100644 --- a/node/migrations/migrator.go +++ b/node/migrations/migrator.go @@ -669,7 +669,7 @@ func formatGenesisInfoFileName(mdir string) string { // - Remove all the pending migration, changeset, validator join and validator remove resolutions // - Fix the expiry heights of all the pending resolutions // (how to handle this for offline migrations? we have no way to know the last height of the old chain) -func CleanupResolutionsAfterMigration(ctx context.Context, db sql.DB, adjustExpiration bool, snapshotHeight int64) error { +func CleanupResolutionsAfterMigration(ctx context.Context, db sql.DB) error { tx, err := db.BeginTx(ctx) if err != nil { return err @@ -688,13 +688,8 @@ func CleanupResolutionsAfterMigration(ctx context.Context, db sql.DB, adjustExpi return err } - if adjustExpiration { - // Fix the expiry heights of all the pending resolutions - err = voting.ReadjustExpirations(ctx, tx, snapshotHeight) - if err != nil { - return err - } - } + // probably no need to adjust expirations, they should automatically be expired as we use timestamps + // unlike block heights in Kwil V1.0 where the heights get reset to 0 return tx.Commit(ctx) } diff --git a/node/node_live_test.go b/node/node_live_test.go index 087fbe7c8..bff9b4392 100644 --- a/node/node_live_test.go +++ b/node/node_live_test.go @@ -128,6 +128,7 @@ func TestSingleNodeMocknet(t *testing.T) { BlockProcessor: bp, Logger: log.New(log.WithName("CE1"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)), ProposeTimeout: 1 * time.Second, + EmptyBlockTimeout: 1 * time.Second, BlockProposalInterval: 1 * time.Second, BlockAnnInterval: 3 * time.Second, BroadcastTxTimeout: 15 * time.Second, @@ -266,6 +267,7 @@ func TestDualNodeMocknet(t *testing.T) { BlockStore: bs1, Logger: log.New(log.WithName("CE1"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)), ProposeTimeout: 1 * time.Second, + EmptyBlockTimeout: 1 * time.Second, BlockProposalInterval: 1 * time.Second, BlockAnnInterval: 3 * time.Second, DB: db1, @@ -335,6 +337,7 @@ func TestDualNodeMocknet(t *testing.T) { BlockProcessor: bp2, Logger: log.New(log.WithName("CE2"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)), ProposeTimeout: 1 * time.Second, + EmptyBlockTimeout: 1 * time.Second, BlockProposalInterval: 1 * time.Second, BlockAnnInterval: 3 * time.Second, DB: db2, @@ -509,6 +512,12 @@ func (m *mockEventStore) GetUnbroadcastedEvents(ctx context.Context) ([]*ktypes. return ids, nil } +func (m *mockEventStore) HasEvents() bool { + return true +} + +func (m *mockEventStore) UpdateStats(cnt int64) {} + // TODO: can test with real migrator /*type mockMigrator struct{} diff --git a/node/node_test.go b/node/node_test.go index 47e176c07..ebce2b06e 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -76,7 +76,7 @@ func makeTestHosts(t *testing.T, nNodes, nExtraHosts int, blockInterval time.Dur }) defaultConfigSet := config.DefaultConfig() - defaultConfigSet.Consensus.ProposeTimeout = config.Duration(blockInterval) + defaultConfigSet.Consensus.ProposeTimeout = ktypes.Duration(blockInterval) var nodes []*Node var hosts []host.Host diff --git a/node/services/jsonrpc/adminsvc/service.go b/node/services/jsonrpc/adminsvc/service.go index f1b7ce1bc..bf31b90ed 100644 --- a/node/services/jsonrpc/adminsvc/service.go +++ b/node/services/jsonrpc/adminsvc/service.go @@ -474,7 +474,7 @@ func (svc *Service) toPendingInfo(resolution *resolutions.Resolution, allVoters KeyType: resolutionBody.PubKeyType, }, Power: resolutionBody.Power, - ExpiresAt: resolution.ExpirationHeight, + ExpiresAt: resolution.Expiration, Board: board, Approved: approvals, }, nil @@ -597,7 +597,7 @@ func (svc *Service) ResolutionStatus(ctx context.Context, req *adminjson.Resolut Status: &ktypes.PendingResolution{ Type: resolution.Type, ResolutionID: req.ResolutionID, - ExpiresAt: resolution.ExpirationHeight, + ExpiresAt: resolution.Expiration, Board: board, Approved: approvals, }, diff --git a/node/statesync_test.go b/node/statesync_test.go index 1d37d8df4..62692aab1 100644 --- a/node/statesync_test.go +++ b/node/statesync_test.go @@ -165,7 +165,7 @@ func testSSConfig(enable bool, providers []string) *config.StateSyncConfig { return &config.StateSyncConfig{ Enable: enable, TrustedProviders: providers, - DiscoveryTimeout: config.Duration(5 * time.Second), + DiscoveryTimeout: ktypes.Duration(5 * time.Second), MaxRetries: 3, } } diff --git a/node/tx.go b/node/tx.go index bc0dbc7e3..8517429ec 100644 --- a/node/tx.go +++ b/node/tx.go @@ -16,9 +16,9 @@ import ( var ( ErrNotFound = errors.New("resource not available") - ErrTxNotFound = errors.New("tx not available") - ErrBlkNotFound = errors.New("block not available") - ErrNoResponse = errors.New("stream closed without response") + ErrTxNotFound = types.ErrTxNotFound + ErrBlkNotFound = types.ErrBlkNotFound + ErrNoResponse = types.ErrNoResponse ) const ( diff --git a/node/txapp/routes.go b/node/txapp/routes.go index 527853f0c..b09431cb0 100644 --- a/node/txapp/routes.go +++ b/node/txapp/routes.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "sync" + "time" "github.com/kwilteam/kwil-db/common" "github.com/kwilteam/kwil-db/core/crypto" @@ -458,7 +459,8 @@ func (d *validatorJoinRoute) InTx(ctx *common.TxContext, app *common.App, tx *ty Type: voting.ValidatorJoinEventType, } - expiry := ctx.BlockContext.Height + ctx.BlockContext.ChainContext.NetworkParameters.JoinExpiry + joinExpiry := time.Duration(ctx.BlockContext.ChainContext.NetworkParameters.JoinExpiry).Seconds() + expiry := ctx.BlockContext.Timestamp + int64(joinExpiry) err = createResolution(ctx.Ctx, app.DB, event, expiry, tx.Sender, keyType) if err != nil { return types.CodeUnknownError, err @@ -628,7 +630,8 @@ func (d *validatorRemoveRoute) InTx(ctx *common.TxContext, app *common.App, tx * // if the resolution does not exist, create it if !exists { - expiry := ctx.BlockContext.Height + ctx.BlockContext.ChainContext.NetworkParameters.JoinExpiry + joinExpiry := time.Duration(ctx.BlockContext.ChainContext.NetworkParameters.JoinExpiry).Seconds() + expiry := ctx.BlockContext.Timestamp + int64(joinExpiry) err = createResolution(ctx.Ctx, app.DB, event, expiry, tx.Sender, senderKeyType) if err != nil { return types.CodeUnknownError, err @@ -841,7 +844,7 @@ func (d *validatorVoteBodiesRoute) InTx(ctx *common.TxContext, app *common.App, return types.CodeInvalidSender, fmt.Errorf("failed to parse key type: %w", err) } - expiryHeight := ctx.BlockContext.Height + resCfg.ExpirationPeriod + expiryHeight := ctx.BlockContext.Timestamp + int64(resCfg.ExpirationPeriod.Seconds()) err = createResolution(ctx.Ctx, app.DB, ev, expiryHeight, tx.Sender, keyType) if err != nil { return types.CodeUnknownError, err @@ -917,7 +920,7 @@ func (d *createResolutionRoute) PreTx(ctx *common.TxContext, svc *common.Service } d.resolution = res.Resolution - d.expiry = resCfg.ExpirationPeriod + ctx.BlockContext.Height + d.expiry = int64(resCfg.ExpirationPeriod.Seconds()) + ctx.BlockContext.Timestamp return 0, nil } diff --git a/node/txapp/txapp.go b/node/txapp/txapp.go index 30f839b4e..35c28e0bc 100644 --- a/node/txapp/txapp.go +++ b/node/txapp/txapp.go @@ -162,6 +162,7 @@ func (r *TxApp) Execute(ctx *common.TxContext, db sql.DB, tx *types.Transaction) // no need to error out if we cannot track the validator join approval r.trackValidatorJoinApprovals(tx) + // track event count return route.Execute(ctx, r, db, tx) } @@ -319,7 +320,7 @@ func (r *TxApp) processVotes(ctx context.Context, db sql.DB, block *common.Block } // now we will expire resolutions - expired, err := getExpired(ctx, db, block.Height) + expired, err := getExpired(ctx, db, block.Timestamp) if err != nil { return nil, err } diff --git a/node/types/interfaces.go b/node/types/interfaces.go index a21f428c9..faafd5164 100644 --- a/node/types/interfaces.go +++ b/node/types/interfaces.go @@ -1,6 +1,7 @@ package types import ( + "errors" "time" "github.com/kwilteam/kwil-db/core/types" @@ -8,7 +9,12 @@ import ( var ErrNotFound = types.ErrNotFound -var HashBytes = types.HashBytes +var ( + HashBytes = types.HashBytes + ErrTxNotFound = errors.New("tx not available") + ErrBlkNotFound = errors.New("block not available") + ErrNoResponse = errors.New("stream closed without response") +) const HashLen = types.HashLen diff --git a/node/voting/events.go b/node/voting/events.go index 6cb0c7a78..6afa22c38 100644 --- a/node/voting/events.go +++ b/node/voting/events.go @@ -136,7 +136,13 @@ type EventStore struct { // connection. eventWriter DB - writerMtx sync.Mutex // protects eventWriter, not applicable to read-only operations + // protects eventWriter, not applicable to read-only operations + // also protects access to the numEvents + writerMtx sync.Mutex + + // numEvents tracks the count of events added by the listeners + // which doesn't have resolutions yet. + numEvents int64 } // NewEventStore will initialize the event and vote store with the provided DB @@ -236,6 +242,7 @@ func (e *EventStore) Store(ctx context.Context, data []byte, eventType string) e } // fmt.Printf("inserted event new event: type %v, id %v\n", eventType, id) + e.numEvents++ return tx.Commit(ctx) } @@ -346,6 +353,24 @@ func DeleteEvents(ctx context.Context, db sql.DB, ids ...*types.UUID) error { return err } +func (e *EventStore) UpdateStats(deletedEvts int64) { + e.writerMtx.Lock() + defer e.writerMtx.Unlock() + + if e.numEvents > deletedEvts { + e.numEvents -= deletedEvts + } else { + e.numEvents = 0 + } +} + +func (e *EventStore) HasEvents() bool { + e.writerMtx.Lock() + defer e.writerMtx.Unlock() + + return e.numEvents > 0 +} + // KV returns a kv store that is scoped to the given prefix. // It allows the user to define their own semantics // for tracking committed data. For example, it can be used to diff --git a/node/voting/sql.go b/node/voting/sql.go index 7395cc573..6ae4a029e 100644 --- a/node/voting/sql.go +++ b/node/voting/sql.go @@ -45,7 +45,7 @@ const ( body BYTEA, -- body is the actual resolution info type BYTEA NOT NULL, -- type is the type of resolution vote_body_proposer BYTEA, -- vote_body_proposer is the identifier of the node that supplied the vote body - expiration INT8 NOT NULL, -- expiration is the blockheight at which the resolution expires + expiration INT8 NOT NULL, -- expiration is the UNIX epoch time in secs at which the resolution expires extra_vote_id BOOLEAN NOT NULL DEFAULT FALSE, -- If vote_body_proposer had sent VoteID before VoteBody, this is set to true UNIQUE (id, type), FOREIGN KEY(type) REFERENCES ` + votingSchemaName + `.resolution_types(id) ON UPDATE CASCADE ON DELETE CASCADE @@ -151,9 +151,6 @@ const ( deleteResolutionsByTypeSQL = `DELETE FROM ` + votingSchemaName + `.resolutions WHERE type = ANY($1);` - // Subtracts the start height from the expiration height of all resolutions. - readjustExpirationsSQL = `UPDATE ` + votingSchemaName + `.resolutions SET expiration = expiration - $1;` - // createResolutionType creates a resolution type createResolutionType = `INSERT INTO ` + votingSchemaName + `.resolution_types (id, name) VALUES ($1, $2) ON CONFLICT(id) DO NOTHING;` diff --git a/node/voting/vote_test.go b/node/voting/vote_test.go index 9f388db90..52ba77843 100644 --- a/node/voting/vote_test.go +++ b/node/voting/vote_test.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "testing" + "time" "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/types" @@ -54,18 +55,20 @@ func Test_Voting(t *testing.T) { fn: func(t *testing.T, db sql.DB, v *VoteStore) { ctx := context.Background() - err := CreateResolution(ctx, db, dummyEvent, 10, []byte("a"), crypto.KeyTypeEd25519) + now := time.Now() + expiration := now.Unix() + err := CreateResolution(ctx, db, dummyEvent, expiration, []byte("a"), crypto.KeyTypeEd25519) require.Error(t, err) // Can't approve non-existent resolutions err = ApproveResolution(ctx, db, testEvent.ID(), []byte("a"), crypto.KeyTypeEd25519) require.Error(t, err) - err = CreateResolution(ctx, db, testEvent, 10, []byte("a"), crypto.KeyTypeEd25519) + err = CreateResolution(ctx, db, testEvent, expiration, []byte("a"), crypto.KeyTypeEd25519) require.NoError(t, err) // duplicate creation should fail - err = CreateResolution(ctx, db, testEvent, 10, []byte("a"), crypto.KeyTypeEd25519) + err = CreateResolution(ctx, db, testEvent, expiration, []byte("a"), crypto.KeyTypeEd25519) require.Error(t, err) // voter doesn't exist (non existent pubkey) @@ -90,7 +93,6 @@ func Test_Voting(t *testing.T) { require.Equal(t, testEvent.Body, events[0].Body) require.Equal(t, testEvent.Type, events[0].Type) require.Equal(t, testEvent.ID(), events[0].ID) - require.Equal(t, int64(10), events[0].ExpirationHeight) require.Equal(t, int64(200), events[0].ApprovedPower) }, }, @@ -215,7 +217,6 @@ func Test_Voting(t *testing.T) { require.Equal(t, testEvent.Body, info.Body) require.Equal(t, testEvent.Type, info.Type) require.Equal(t, testEvent.ID(), info.ID) - require.Equal(t, int64(10), info.ExpirationHeight) require.Equal(t, int64(200), info.ApprovedPower) hasValidator1Info := false @@ -243,10 +244,15 @@ func Test_Voting(t *testing.T) { fn: func(t *testing.T, db sql.DB, v *VoteStore) { ctx := context.Background() - err := CreateResolution(ctx, db, testEvent, 10, []byte("a"), crypto.KeyTypeEd25519) + expiration := time.Now().Unix() + err := CreateResolution(ctx, db, testEvent, expiration, []byte("a"), crypto.KeyTypeEd25519) require.NoError(t, err) - expired, err := GetExpired(ctx, db, 10) + expired, err := GetExpired(ctx, db, expiration-5) + require.NoError(t, err) + require.Equal(t, 0, len(expired)) + + expired, err = GetExpired(ctx, db, expiration+5) require.NoError(t, err) require.Equal(t, 1, len(expired)) @@ -257,6 +263,7 @@ func Test_Voting(t *testing.T) { require.Equal(t, resolutionInfo.Proposer.Power, int64(100)) require.Equal(t, resolutionInfo.Proposer.KeyType, crypto.KeyTypeEd25519) require.Equal(t, []byte(resolutionInfo.Proposer.Identifier[:]), []byte("a")) + }, }, { diff --git a/node/voting/voting.go b/node/voting/voting.go index e0950845e..6c65a4422 100644 --- a/node/voting/voting.go +++ b/node/voting/voting.go @@ -11,6 +11,7 @@ import ( "os" "slices" "sync" + "time" "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/types" @@ -200,7 +201,7 @@ func ApproveResolution(ctx context.Context, db sql.TxMaker, resolutionID *types. } // CreateResolution creates a resolution for a votable event. The expiration -// should be a block height. Resolution creation will fail if the resolution +// is the UNIX epoch timestamp in secs. Resolution creation will fail if the resolution // either already exists or has been processed. func CreateResolution(ctx context.Context, db sql.TxMaker, event *types.VotableEvent, expiration int64, voteBodyProposer []byte, proposerKeyType crypto.KeyType) error { tx, err := db.BeginTx(ctx) @@ -274,10 +275,11 @@ func fromRow(ctx context.Context, db sql.Executor, row []any) (*resolutions.Reso return nil, fmt.Errorf("invalid type for type (%T)", row[2]) } - v.ExpirationHeight, ok = sql.Int64(row[3]) + expiration, ok := sql.Int64(row[3]) if !ok { return nil, fmt.Errorf("invalid type for expiration (%T)", row[3]) } + v.Expiration = time.Unix(expiration, 0) if row[4] == nil { v.ApprovedPower = 0 @@ -376,10 +378,9 @@ func GetResolutionInfo(ctx context.Context, db sql.Executor, id *types.UUID) (*r return fromRow(ctx, db, res.Rows[0]) } -// GetExpired returns all resolutions with an expiration -// less than or equal to the given block height. -func GetExpired(ctx context.Context, db sql.Executor, blockHeight int64) ([]*resolutions.Resolution, error) { - res, err := db.Execute(ctx, getResolutionsFullInfoByExpiration, blockHeight) +// GetExpired returns all resolutions with an expiration time less than the currentTime +func GetExpired(ctx context.Context, db sql.Executor, currentTime int64) ([]*resolutions.Resolution, error) { + res, err := db.Execute(ctx, getResolutionsFullInfoByExpiration, currentTime) if err != nil { return nil, err } @@ -569,12 +570,6 @@ func DeleteResolutionsByType(ctx context.Context, db sql.Executor, resTypes []st return err } -func ReadjustExpirations(ctx context.Context, db sql.Executor, startHeight int64) error { - // Subtracts the start height from the expiration height of all resolutions - _, err := db.Execute(ctx, readjustExpirationsSQL, startHeight) - return err -} - // SetValidatorPower sets the power of a voter. // It will create the voter if it does not exist. // It will return an error if a negative power is given. diff --git a/test/integration/kwild_test.go b/test/integration/kwild_test.go index b8312852b..57c7c6050 100644 --- a/test/integration/kwild_test.go +++ b/test/integration/kwild_test.go @@ -11,6 +11,7 @@ import ( "github.com/kwilteam/kwil-db/config" "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/crypto/auth" + "github.com/kwilteam/kwil-db/core/types" ethdeposits "github.com/kwilteam/kwil-db/extensions/listeners/eth_deposits" "github.com/kwilteam/kwil-db/test/setup" "github.com/kwilteam/kwil-db/test/specifications" @@ -163,7 +164,7 @@ func TestValidatorJoinExpirySpecification(t *testing.T) { }), }, ConfigureGenesis: func(genDoc *config.GenesisConfig) { - genDoc.JoinExpiry = 5 // 5 sec at 1block/sec + genDoc.JoinExpiry = types.Duration(5 * time.Second) }, DBOwner: "0xabc", },