From 216536e258e9459e43a7c69cac78387dd2be869c Mon Sep 17 00:00:00 2001 From: Paul Steckler Date: Thu, 23 Mar 2023 11:35:58 -0700 Subject: [PATCH 01/12] Db migration app for berkeley --- src/app/archive/lib/dune | 5 +- src/app/archive/lib/extensional.ml | 2 +- src/app/archive/lib/processor.ml | 24 +- .../berkeley_migration/berkeley_migration.ml | 574 ++++++++++++++++++ src/app/berkeley_migration/dune | 48 ++ .../berkeley_migration/precomputed_block.ml | 83 +++ src/app/berkeley_migration/sql.ml | 287 +++++++++ src/app/replayer/replayer.ml | 1 + src/berkeley_migration.opam | 5 + src/lib/consensus/vrf/consensus_vrf.ml | 21 +- src/lib/consensus/vrf/dune | 1 + 11 files changed, 1032 insertions(+), 19 deletions(-) create mode 100644 src/app/berkeley_migration/berkeley_migration.ml create mode 100644 src/app/berkeley_migration/dune create mode 100644 src/app/berkeley_migration/precomputed_block.ml create mode 100644 src/app/berkeley_migration/sql.ml create mode 100644 src/berkeley_migration.opam diff --git a/src/app/archive/lib/dune b/src/app/archive/lib/dune index b89a0b91e15..cccb4788cfc 100644 --- a/src/app/archive/lib/dune +++ b/src/app/archive/lib/dune @@ -27,8 +27,8 @@ child_processes precomputed_values coda_genesis_ledger + consensus.vrf mina_runtime_config - hex sgn mina_base.util kimchi_backend.pasta @@ -72,9 +72,10 @@ mina_version staged_ledger_diff error_json + ppx_deriving_yojson.runtime ppx_version.runtime ) (inline_tests) (modes native) (instrumentation (backend bisect_ppx)) - (preprocess (pps ppx_mina ppx_version ppx_jane ppx_custom_printf h_list.ppx))) + (preprocess (pps ppx_mina ppx_version ppx_jane ppx_custom_printf ppx_deriving_yojson h_list.ppx))) diff --git a/src/app/archive/lib/extensional.ml b/src/app/archive/lib/extensional.ml index 421e21b16ea..ad4482a5a79 100644 --- a/src/app/archive/lib/extensional.ml +++ b/src/app/archive/lib/extensional.ml @@ -110,7 +110,7 @@ module Block = struct ; parent_hash : State_hash.Stable.V1.t ; creator : Public_key.Compressed.Stable.V1.t ; block_winner : Public_key.Compressed.Stable.V1.t - ; last_vrf_output : string + ; last_vrf_output : Consensus_vrf.Output.Truncated.Stable.V1.t ; snarked_ledger_hash : Frozen_ledger_hash.Stable.V1.t ; staking_epoch_data : Mina_base.Epoch_data.Value.Stable.V1.t ; next_epoch_data : Mina_base.Epoch_data.Value.Stable.V1.t diff --git a/src/app/archive/lib/processor.ml b/src/app/archive/lib/processor.ml index f72cc236eb1..8fccf4f14dd 100644 --- a/src/app/archive/lib/processor.ml +++ b/src/app/archive/lib/processor.ml @@ -1876,7 +1876,7 @@ module User_command = struct (Caqti_request.find typ Caqti_type.int (Mina_caqti.insert_into_cols ~returning:"id" ~table_name ~tannot:(function - | "typ" -> Some "user_command_type" | _ -> None ) + | "command_type" -> Some "user_command_type" | _ -> None ) ~cols:Fields.names () ) ) { command_type = user_cmd.command_type ; fee_payer_id @@ -2025,7 +2025,8 @@ module Internal_command = struct (Caqti_request.find typ Caqti_type.int (Mina_caqti.insert_into_cols ~returning:"id" ~table_name ~tannot:(function - | "typ" -> Some "internal_command_type" | _ -> None ) + | "command_type" -> Some "internal_command_type" | _ -> None + ) ~cols:Fields.names () ) ) { command_type = internal_cmd.command_type ; receiver_id @@ -2796,9 +2797,9 @@ module Block = struct (Consensus.Data.Consensus_state.block_stake_winner consensus_state) in let last_vrf_output = - (* encode as hex, Postgresql won't accept arbitrary bitstrings *) + (* encode as base64, same as in precomputed blocks JSON *) Consensus.Data.Consensus_state.last_vrf_output consensus_state - |> Hex.Safe.to_hex + |> Base64.encode_exn ~alphabet:Base64.uri_safe_alphabet in let%bind snarked_ledger_hash_id = Snarked_ledger_hash.add_if_doesn't_exist @@ -3200,8 +3201,9 @@ module Block = struct Public_key.add_if_doesn't_exist (module Conn) block.block_winner in let last_vrf_output = - (* already encoded as hex *) + (* encode as base64, same as in precomputed blocks JSON *) block.last_vrf_output + |> Base64.encode_exn ~alphabet:Base64.uri_safe_alphabet in let%bind snarked_ledger_hash_id = Snarked_ledger_hash.add_if_doesn't_exist @@ -3245,9 +3247,11 @@ module Block = struct snarked_ledger_hash_id, staking_epoch_data_id, next_epoch_data_id, min_window_density, sub_window_densities, total_currency, - ledger_hash, height, global_slot_since_hard_fork, - global_slot_since_genesis, protocol_version, timestamp, chain_status) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::chain_status_type) + ledger_hash, height, + global_slot_since_hard_fork, global_slot_since_genesis, + protocol_version_id, proposed_protocol_version_id, + timestamp, chain_status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::bigint[], ?, ?, ?, ?, ?, ?, ?, ?, ?::chain_status_type) RETURNING id |sql} ) { state_hash = block.state_hash |> State_hash.to_base58_check @@ -4011,7 +4015,9 @@ let setup_server ~metrics_server_port ~constraint_constants ~logger let where_to_listen = Async.Tcp.Where_to_listen.bind_to All_addresses (On_port server_port) in - let reader, writer = Strict_pipe.create ~name:"archive" Synchronous in + let reader, writer = + Strict_pipe.create ~name:"archive_migrated_to_berkeley" Synchronous + in let precomputed_block_reader, precomputed_block_writer = Strict_pipe.create ~name:"precomputed_archive_block" Synchronous in diff --git a/src/app/berkeley_migration/berkeley_migration.ml b/src/app/berkeley_migration/berkeley_migration.ml new file mode 100644 index 00000000000..79bce73ae75 --- /dev/null +++ b/src/app/berkeley_migration/berkeley_migration.ml @@ -0,0 +1,574 @@ +(* berkeley_migration.ml -- migrate archive db from original Mina mainnet to berkeley schema *) + +open Core_kernel +open Async + +(* before running this program for the first time, import the berkeley schema to the + migrated database name +*) + +(* TODO: table of old, new txn hashes *) + +let mainnet_transaction_failure_of_string s : + Mina_base.Transaction_status.Failure.t = + match s with + | "Predicate" -> + Predicate + | "Source_not_present" -> + Source_not_present + | "Receiver_not_present" -> + Receiver_not_present + | "Amount_insufficient_to_create_account" -> + Amount_insufficient_to_create_account + | "Cannot_pay_creation_fee_in_token" -> + Cannot_pay_creation_fee_in_token + | "Source_insufficient_balance" -> + Source_insufficient_balance + | "Source_minimum_balance_violation" -> + Source_minimum_balance_violation + | "Receiver_already_exists" -> + Receiver_already_exists + | "Overflow" -> + Overflow + | "Incorrect_nonce" -> + Incorrect_nonce + (* these should never have occurred *) + | "Signed_command_on_snapp_account" + | "Not_token_owner" + | "Snapp_account_not_present" + | "Update_not_permitted" -> + failwith "Transaction failure unrepresentable in Berkeley" + | _ -> + failwith "No such transaction failure" + +let get_created paid pk = + match paid with + | None -> + [] + | Some fee -> + let acct_id = Mina_base.Account_id.create pk Mina_base.Token_id.default in + [ (acct_id, Unsigned.UInt64.of_int64 fee |> Currency.Fee.of_uint64) ] + +let internal_commands_from_block_id ~mainnet_pool block_id = + let query_mainnet_db ~f = Mina_caqti.query ~f mainnet_pool in + let pk_of_id id = + let%map pk_str = + query_mainnet_db ~f:(fun db -> Sql.Mainnet.Public_key.find_by_id db id) + in + Signature_lib.Public_key.Compressed.of_base58_check_exn pk_str + in + let%bind block_internal_cmds = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Block_internal_command.load_block db ~block_id ) + in + Deferred.List.fold block_internal_cmds ~init:([], []) + ~f:(fun (created, cmds) block_internal_cmd -> + let%bind internal_cmd = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Internal_command.load db + ~id:block_internal_cmd.internal_command_id ) + in + (* some fields come from blocks_internal_commands, others from user_commands *) + let sequence_no = block_internal_cmd.sequence_no in + let secondary_sequence_no = block_internal_cmd.secondary_sequence_no in + let command_type = internal_cmd.typ in + let%bind receiver = pk_of_id internal_cmd.receiver_id in + let fee = + internal_cmd.fee |> Unsigned.UInt64.of_int64 |> Currency.Fee.of_uint64 + in + let hash = + match command_type with + | "coinbase" -> + let coinbase = + (* always valid, since fee transfer is None *) + let amount = Currency.Amount.of_fee fee in + Mina_base.Coinbase.create ~amount ~receiver ~fee_transfer:None + |> Or_error.ok_exn + in + Mina_transaction.Transaction_hash.hash_coinbase coinbase + | "fee_transfer" | "fee_transfer_via_coinbase" -> ( + let fee_transfer = + Mina_base.Fee_transfer.create_single ~receiver_pk:receiver ~fee + ~fee_token:Mina_base.Token_id.default + in + match Mina_base.Fee_transfer.to_singles fee_transfer with + | `One single -> + Mina_transaction.Transaction_hash.hash_fee_transfer single + | `Two _ -> + failwith "Expected one single in fee transfer" ) + | s -> + failwithf "Unknown command type %s" s () + in + let status = "applied" in + let failure_reason = None in + let cmd : Archive_lib.Extensional.Internal_command.t = + { sequence_no + ; secondary_sequence_no + ; command_type + ; receiver + ; fee + ; hash + ; status + ; failure_reason + } + in + let created_by_cmd = + get_created block_internal_cmd.receiver_account_creation_fee_paid + receiver + in + return (created_by_cmd @ created, cmd :: cmds) ) + +let user_commands_from_block_id ~mainnet_pool block_id = + let query_mainnet_db ~f = Mina_caqti.query ~f mainnet_pool in + let%bind block_user_cmds = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Block_user_command.load_block db ~block_id ) + in + let pk_of_id id = + let%map pk_str = + query_mainnet_db ~f:(fun db -> Sql.Mainnet.Public_key.find_by_id db id) + in + Signature_lib.Public_key.Compressed.of_base58_check_exn pk_str + in + Deferred.List.fold block_user_cmds ~init:([], []) + ~f:(fun (created, cmds) block_user_cmd -> + let%bind user_cmd = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.User_command.load db ~id:block_user_cmd.user_command_id ) + in + (* some fields come from blocks_user_commands, others from user_commands *) + let sequence_no = block_user_cmd.sequence_no in + let command_type = user_cmd.typ in + let%bind fee_payer = pk_of_id user_cmd.fee_payer_id in + let%bind source = pk_of_id user_cmd.source_id in + let%bind receiver = pk_of_id user_cmd.receiver_id in + let nonce = Mina_numbers.Account_nonce.of_int user_cmd.nonce in + let amount = + Option.map user_cmd.amount ~f:(fun amount -> + Unsigned.UInt64.of_int64 amount |> Currency.Amount.of_uint64 ) + in + let fee = + user_cmd.fee |> Unsigned.UInt64.of_int64 |> Currency.Fee.of_uint64 + in + let valid_until = + Option.map user_cmd.valid_until ~f:(fun valid_until -> + Unsigned.UInt32.of_int64 valid_until + |> Mina_numbers.Global_slot.of_uint32 ) + in + let memo = + Mina_base.Signed_command_memo.of_base58_check_exn user_cmd.memo + in + let status = block_user_cmd.status in + let failure_reason = + Option.map block_user_cmd.failure_reason + ~f:mainnet_transaction_failure_of_string + in + (* a V2 signed command *) + let signed_cmd = + let payload = + let body = + match command_type with + | "payment" -> + let amount = Option.value_exn amount in + Mina_base.Signed_command_payload.Body.Payment + { source_pk = source; receiver_pk = receiver; amount } + | "delegation" -> + let set_delegate = + Mina_base.Stake_delegation.Set_delegate + { delegator = source; new_delegate = receiver } + in + Mina_base.Signed_command_payload.Body.Stake_delegation + set_delegate + | s -> + failwithf "Unknown command type: %s" s () + in + Mina_base.Signed_command_payload.create ~fee ~fee_payer_pk:fee_payer + ~nonce ~valid_until ~memo ~body + in + let signer = Signature_lib.Public_key.decompress_exn source in + let signature = Mina_base.Signature.dummy in + ({ payload; signer; signature } : Mina_base.Signed_command.t) + in + let hash = + Mina_transaction.Transaction_hash.hash_signed_command signed_cmd + in + let cmd : Archive_lib.Extensional.User_command.t = + { sequence_no + ; command_type + ; fee_payer + ; source + ; receiver + ; nonce + ; amount + ; fee + ; valid_until + ; memo + ; hash + ; status + ; failure_reason + } + in + let created_by_cmd = + let fee_payer_created = + get_created block_user_cmd.fee_payer_account_creation_fee_paid + fee_payer + in + let receiver_created = + get_created block_user_cmd.receiver_account_creation_fee_paid receiver + in + fee_payer_created @ receiver_created + in + return (created_by_cmd @ created, cmd :: cmds) ) + +let first_batch = ref true + +(* from src/config/protocol_version/current.mlh in `compatible` *) +let mainnet_protocol_version = + Protocol_version.create_exn ~major:2 ~minor:0 ~patch:0 + +let mainnet_block_to_extensional ~logger ~mainnet_pool + ~(genesis_block : Mina_block.t) (block : Sql.Mainnet.Block.t) = + let query_mainnet_db ~f = Mina_caqti.query ~f mainnet_pool in + let is_genesis_block = Int64.equal block.height Int64.one in + let genesis_consensus_state = + lazy + (let protocol_state = + Mina_block.Header.protocol_state (Mina_block.header genesis_block) + in + let body = Mina_state.Protocol_state.body protocol_state in + Mina_state.Protocol_state.Body.consensus_state body ) + in + let%bind () = + (* we may try to be fetching more blocks than exist + gsutil seems to get the ones that do exist, in that exist + *) + let batch_size = 1000L in + if is_genesis_block then Deferred.unit + else if !first_batch then ( + let num_blocks = Int64.(batch_size - (block.height % batch_size)) in + [%log info] "Fetching first batch of precomputed blocks" + ~metadata: + [ ("height", `Int (Int64.to_int_exn block.height)) + ; ("num_blocks", `Int (Int64.to_int_exn num_blocks)) + ] ; + let%bind () = + Precomputed_block.fetch_batch ~height:block.height ~num_blocks + in + [%log info] "Done fetching first batch of precomputed blocks" ; + first_batch := false ; + Deferred.unit ) + else if Int64.(equal (block.height % batch_size)) 0L then ( + [%log info] "Deleting all precomputed blocks" ; + let%bind () = Precomputed_block.delete_fetched () in + [%log info] "Fetching batch of precomputed blocks" + ~metadata: + [ ("height", `Int (Int64.to_int_exn block.height)) + ; ("num_blocks", `Int (Int64.to_int_exn batch_size)) + ] ; + let%bind () = + Precomputed_block.fetch_batch ~height:block.height + ~num_blocks:batch_size + in + [%log info] "Done fetching batch of precomputed blocks" ; + Deferred.unit ) + else Deferred.unit + in + let pk_of_id id = + let%map pk_str = + query_mainnet_db ~f:(fun db -> Sql.Mainnet.Public_key.find_by_id db id) + in + Signature_lib.Public_key.Compressed.of_base58_check_exn pk_str + in + let state_hash = Mina_base.State_hash.of_base58_check_exn block.state_hash in + let parent_hash = + Mina_base.State_hash.of_base58_check_exn block.parent_hash + in + let%bind creator = pk_of_id block.creator_id in + let%bind block_winner = pk_of_id block.block_winner_id in + let%bind last_vrf_output = + (* the unencoded string, not base64 *) + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.last_vrf_output consensus + else + let%map json = + Precomputed_block.last_vrf_output ~state_hash:block.state_hash + ~height:block.height + in + Consensus_vrf.Output.Truncated.of_yojson json |> Result.ok_or_failwith + in + let%bind snarked_ledger_hash = + let%map hash_str = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Snarked_ledger_hash.find_by_id db + block.snarked_ledger_hash_id ) + in + Mina_base.Frozen_ledger_hash.of_base58_check_exn hash_str + in + (* TODO: confirm these match mainnet db ? *) + let%bind staking_epoch_data = + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.staking_epoch_data consensus + else + let%map json = + Precomputed_block.staking_epoch_data ~state_hash:block.state_hash + ~height:block.height + in + match Mina_base.Epoch_data.Value.of_yojson json with + | Ok epoch_data -> + epoch_data + | Error err -> + failwithf "Could not get staking epoch data, error: %s" err () + in + let%bind next_epoch_data = + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.next_epoch_data consensus + else + let%map json = + Precomputed_block.next_epoch_data ~state_hash:block.state_hash + ~height:block.height + in + match Mina_base.Epoch_data.Value.of_yojson json with + | Ok epoch_data -> + epoch_data + | Error err -> + failwithf "Could not get next epoch data, error: %s" err () + in + let%bind min_window_density = + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.min_window_density consensus + else + match%map + Precomputed_block.min_window_density ~state_hash:block.state_hash + ~height:block.height + with + | `String s -> + Mina_numbers.Length.of_string s + | _ -> + failwith "min_window_density: unexpected JSON" + in + let%bind total_currency = + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.total_currency consensus + else + match%map + Precomputed_block.total_currency ~state_hash:block.state_hash + ~height:block.height + with + | `String s -> + Currency.Amount.of_string s + | _ -> + failwith "total currency: unexpected JSON" + in + let%bind sub_window_densities = + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.sub_window_densities consensus + else + match%map + Precomputed_block.subwindow_densities ~state_hash:block.state_hash + ~height:block.height + with + | `List items -> + List.map items ~f:(function + | `String s -> + Mina_numbers.Length.of_string s + | _ -> + failwith "Expected string for subwindow density" ) + | _ -> + failwith "sub_window_densities: unexpected JSON" + in + let ledger_hash = + Mina_base.Frozen_ledger_hash.of_base58_check_exn block.ledger_hash + in + let height = Unsigned.UInt32.of_int64 block.height in + let global_slot_since_hard_fork = + block.global_slot_since_hard_fork |> Unsigned.UInt32.of_int64 + |> Mina_numbers.Global_slot.of_uint32 + in + let global_slot_since_genesis = + block.global_slot_since_genesis |> Unsigned.UInt32.of_int64 + |> Mina_numbers.Global_slot.of_uint32 + in + let timestamp = Block_time.of_int64 block.timestamp in + let%bind block_id = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Block.id_from_state_hash db block.state_hash ) + in + let%bind user_accounts_created, user_cmds_rev = + user_commands_from_block_id ~mainnet_pool block_id + in + let user_cmds = List.rev user_cmds_rev in + let%bind internal_accounts_created, internal_cmds_rev = + internal_commands_from_block_id ~mainnet_pool block_id + in + let internal_cmds = List.rev internal_cmds_rev in + let accounts_created = user_accounts_created @ internal_accounts_created in + let zkapp_cmds = [] in + let protocol_version = mainnet_protocol_version in + let proposed_protocol_version = None in + let chain_status = Archive_lib.Chain_status.of_string block.chain_status in + (* always the default token *) + let tokens_used = + if is_genesis_block then [] else [ (Mina_base.Token_id.default, None) ] + in + return + ( { state_hash + ; parent_hash + ; creator + ; block_winner + ; last_vrf_output + ; snarked_ledger_hash + ; staking_epoch_data + ; next_epoch_data + ; min_window_density + ; total_currency + ; sub_window_densities + ; ledger_hash + ; height + ; global_slot_since_hard_fork + ; global_slot_since_genesis + ; timestamp + ; user_cmds + ; internal_cmds + ; zkapp_cmds + ; protocol_version + ; proposed_protocol_version + ; chain_status + ; accounts_accessed = [] + ; accounts_created + ; tokens_used + } + : Archive_lib.Extensional.Block.t ) + +(* archive_uri is for the db to be migrated *) +let main ~mainnet_archive_uri ~migrated_archive_uri ~runtime_config_file () = + let logger = Logger.create () in + let mainnet_archive_uri = Uri.of_string mainnet_archive_uri in + let migrated_archive_uri = Uri.of_string migrated_archive_uri in + let mainnet_pool = + Caqti_async.connect_pool ~max_size:128 mainnet_archive_uri + in + let migrated_pool = + Caqti_async.connect_pool ~max_size:128 migrated_archive_uri + in + match (mainnet_pool, migrated_pool) with + | Error e, _ | _, Error e -> + [%log fatal] + ~metadata:[ ("error", `String (Caqti_error.show e)) ] + "Failed to create Caqti pools for Postgresql" ; + exit 1 + | Ok mainnet_pool, Ok migrated_pool -> + [%log info] "Successfully created Caqti pools for databases" ; + (* use Processor to write to migrated db; separate code to read from mainnet db *) + let query_mainnet_db ~f = Mina_caqti.query ~f mainnet_pool in + let query_migrated_db ~f = Mina_caqti.query ~f migrated_pool in + (* dummy protocol version *) + Protocol_version.(set_current @@ create_exn ~major:1 ~minor:0 ~patch:0) ; + let runtime_config = + Yojson.Safe.from_file runtime_config_file + |> Runtime_config.of_yojson |> Result.ok_or_failwith + in + let proof_level = Genesis_constants.Proof_level.compiled in + let%bind precomputed_values = + match%map + Genesis_ledger_helper.init_from_config_file ~logger + ~proof_level:(Some proof_level) runtime_config + with + | Ok (precomputed_values, _) -> + precomputed_values + | Error err -> + failwithf "Could not get precomputed values, error: %s" + (Error.to_string_hum err) () + in + let With_hash.{ data = genesis_block; _ }, _validation = + Mina_block.genesis ~precomputed_values + in + [%log info] "Querying mainnet canonical blocks" ; + let%bind mainnet_block_ids = + query_mainnet_db ~f:(fun db -> Sql.Mainnet.Block.canonical_blocks db ()) + in + let%bind mainnet_blocks_unsorted = + Deferred.List.map mainnet_block_ids ~f:(fun id -> + query_mainnet_db ~f:(fun db -> Sql.Mainnet.Block.load db ~id) ) + in + (* remove blocks we've already migrated + we don't migrate genesis block at height 1, the archive process will write that + *) + let%bind greatest_migrated_height = + let%bind count = + query_migrated_db ~f:(fun db -> Sql.Berkeley.Block.count db ()) + in + if count = 0 then return Int64.zero + else + query_migrated_db ~f:(fun db -> + Sql.Berkeley.Block.greatest_block_height db () ) + in + if Int64.is_positive greatest_migrated_height then + [%log info] + "Already migrated blocks through height %Ld, resuming migration" + greatest_migrated_height ; + let mainnet_blocks_unmigrated = + if Int64.equal greatest_migrated_height Int64.zero then + mainnet_blocks_unsorted + else + List.filter mainnet_blocks_unsorted ~f:(fun block -> + Int64.( > ) block.height greatest_migrated_height ) + in + (* blocks in height order *) + let mainnet_blocks_to_migrate = + List.sort mainnet_blocks_unmigrated ~compare:(fun block1 block2 -> + Int64.compare block1.height block2.height ) + in + [%log info] "Migrating mainnet blocks" ; + let%bind () = + Deferred.List.iter mainnet_blocks_to_migrate ~f:(fun block -> + [%log info] + "Migrating mainnet block at height %Ld with state hash %s" + block.height block.state_hash ; + let%bind extensional_block = + mainnet_block_to_extensional ~logger ~mainnet_pool ~genesis_block + block + in + query_migrated_db ~f:(fun db -> + match%map + Archive_lib.Processor.Block.add_from_extensional db + extensional_block + with + | Ok _id -> + Ok () + | Error err -> + failwithf + "Could not archive extensional block from mainnet block \ + with state hash %s, error: %s" + block.state_hash (Caqti_error.show err) () ) ) + in + [%log info] "Deleting all precomputed blocks" ; + let%bind () = Precomputed_block.delete_fetched () in + [%log info] "Done migrating mainnet blocks!" ; + Deferred.unit + +let () = + Command.( + run + (let open Let_syntax in + Command.async + ~summary:"Migrate mainnet archive database to Berkeley schema" + (let%map mainnet_archive_uri = + Param.flag "--mainnet-archive-uri" + ~doc:"URI URI for connecting to the mainnet archive database" + Param.(required string) + and migrated_archive_uri = + Param.flag "--migrated-archive-uri" + ~doc:"URI URI for connecting to the migrated archive database" + Param.(required string) + and runtime_config_file = + Param.flag "--config-file" ~aliases:[ "-config-file" ] + Param.(required string) + ~doc:"PATH to the configuration file containing the genesis ledger" + in + main ~mainnet_archive_uri ~migrated_archive_uri ~runtime_config_file ))) diff --git a/src/app/berkeley_migration/dune b/src/app/berkeley_migration/dune new file mode 100644 index 00000000000..d47fbc5f739 --- /dev/null +++ b/src/app/berkeley_migration/dune @@ -0,0 +1,48 @@ +(executable + (package berkeley_migration) + (name berkeley_migration) + (public_name berkeley_migration) + (libraries + ;; opam libraries + async_unix + core + result + async_kernel + uri + stdio + caqti-driver-postgresql + caqti + async + core_kernel + caqti-async + base + base.caml + async.async_command + integers + ;; local libraries + logger + archive_lib + block_time + consensus + consensus_vrf + currency + genesis_constants + genesis_ledger_helper + mina_base + mina_base.import + mina_block + mina_caqti + mina_numbers + mina_state + mina_transaction + mina_wire_types + one_or_two + protocol_version + runtime_config + signature_lib + unsigned_extended + with_hash + ) + (preprocessor_deps ../../config.mlh) + (instrumentation (backend bisect_ppx)) + (preprocess (pps ppx_version ppx_mina ppx_let ppx_hash ppx_compare ppx_sexp_conv h_list.ppx))) diff --git a/src/app/berkeley_migration/precomputed_block.ml b/src/app/berkeley_migration/precomputed_block.ml new file mode 100644 index 00000000000..b423250aeab --- /dev/null +++ b/src/app/berkeley_migration/precomputed_block.ml @@ -0,0 +1,83 @@ +(* precomputed_block.ml *) + +open Core +open Async + +(* Precomputed_block.t type has changed from mainnet, too hard to recreate old type + get what we need by traversing JSON +*) + +let make_target ~state_hash ~height = + sprintf "mainnet-%Ld-%s.json" height state_hash + +(* no precomputed genesis block at height 1 *) +let min_fetchable_height = 2L + +let make_batch_args ~height ~num_blocks = + let start_height = Int64.max min_fetchable_height height in + let actual_num_blocks = + if Int64.( < ) height min_fetchable_height then + Int64.(num_blocks - min_fetchable_height) + else num_blocks + in + List.init (Int64.to_int_exn actual_num_blocks) ~f:(fun n -> + sprintf "mainnet-%Ld-*.json" Int64.(start_height + Int64.of_int n) ) + +let fetch_batch ~height ~num_blocks = + let batch_args = make_batch_args ~height ~num_blocks in + let block_uris = + List.map batch_args ~f:(fun arg -> + sprintf "gs://mina_network_block_data/%s" arg ) + in + match%map + Process.run ~prog:"gsutil" ~args:([ "-m"; "cp" ] @ block_uris @ [ "." ]) () + with + | Ok _ -> + () + | Error err -> + failwithf + "Could not download batch of precomputed blocks at height %Ld, error: \ + %s" + height (Error.to_string_hum err) () + +let block_re = Str.regexp "mainnet-[0-9]+-.+\\.json" + +let delete_fetched () : unit Deferred.t = + let%bind files = Sys.readdir "." in + let block_files = + Array.filter files ~f:(fun file -> Str.string_match block_re file 0) + in + let args = Array.to_list block_files in + match%map Process.run ~prog:"rm" ~args () with + | Ok _ -> + () + | Error err -> + failwithf "Could not delete fetched precomputed blocks, error %s" + (Error.to_string_hum err) () + +let get_json_item filter ~state_hash ~height = + let target = make_target ~state_hash ~height in + match%map Process.run ~prog:"jq" ~args:[ filter; target ] () with + | Ok s -> + Yojson.Safe.from_string s + | Error err -> + failwithf + "Could not get JSON item with filter %s for state hash %s, error: %s" + filter state_hash (Error.to_string_hum err) () + +let consensus_state_item s = sprintf ".protocol_state.body.consensus_state.%s" s + +let last_vrf_output = get_json_item (consensus_state_item "last_vrf_output") + +let staking_epoch_data = + get_json_item (consensus_state_item "staking_epoch_data") + +let next_epoch_data = get_json_item (consensus_state_item "next_epoch_data") + +let min_window_density = + get_json_item (consensus_state_item "min_window_density") + +let subwindow_densities = + get_json_item (consensus_state_item "sub_window_densities") + +let total_currency = get_json_item (consensus_state_item "total_currency") diff --git a/src/app/berkeley_migration/sql.ml b/src/app/berkeley_migration/sql.ml new file mode 100644 index 00000000000..6cb0ce5c894 --- /dev/null +++ b/src/app/berkeley_migration/sql.ml @@ -0,0 +1,287 @@ +(* sql.ml -- for reading the mainnet database (no writing!) *) + +open Core +open Caqti_async + +module Mainnet = struct + module Public_key = struct + let find_by_id (module Conn : CONNECTION) id = + Conn.find + (Caqti_request.find Caqti_type.int Caqti_type.string + "SELECT value FROM public_keys WHERE id = ?" ) + id + end + + module Snarked_ledger_hash = struct + let find_by_id (module Conn : CONNECTION) id = + Conn.find + (Caqti_request.find Caqti_type.int Caqti_type.string + "SELECT value FROM snarked_ledger_hashes WHERE id = ?" ) + id + end + + module Block = struct + type t = + { state_hash : string + ; parent_id : int option + ; parent_hash : string + ; creator_id : int + ; block_winner_id : int + ; snarked_ledger_hash_id : int + ; staking_epoch_data_id : int + ; next_epoch_data_id : int + ; ledger_hash : string + ; height : int64 + ; global_slot_since_hard_fork : int64 + ; global_slot_since_genesis : int64 + ; timestamp : int64 + ; chain_status : string + } + [@@deriving hlist] + + let typ = + let open Mina_caqti.Type_spec in + let spec = + Caqti_type. + [ string + ; option int + ; string + ; int + ; int + ; int + ; int + ; int + ; string + ; int64 + ; int64 + ; int64 + ; int64 + ; string + ] + in + let encode t = Ok (hlist_to_tuple spec (to_hlist t)) in + let decode t = Ok (of_hlist (tuple_to_hlist spec t)) in + Caqti_type.custom ~encode ~decode (to_rep spec) + + let id_from_state_hash (module Conn : CONNECTION) state_hash = + Conn.find + (Caqti_request.find Caqti_type.string Caqti_type.int + {sql| SELECT id + FROM blocks + WHERE state_hash = ? + |sql} ) + state_hash + + let load (module Conn : CONNECTION) ~(id : int) = + Conn.find + (Caqti_request.find Caqti_type.int typ + {sql| SELECT state_hash, parent_id, parent_hash, creator_id, + block_winner_id, snarked_ledger_hash_id, staking_epoch_data_id, + next_epoch_data_id, ledger_hash, height, global_slot, + global_slot_since_genesis, timestamp, chain_status FROM blocks + WHERE id = ? |sql} ) + id + + let canonical_blocks (module Conn : CONNECTION) = + Conn.collect_list + (Caqti_request.collect Caqti_type.unit Caqti_type.int + {sql| SELECT id + FROM blocks + WHERE chain_status = 'canonical' + |sql} ) + + let pending_blocks (module Conn : CONNECTION) = + Conn.collect_list + (Caqti_request.collect Caqti_type.unit Caqti_type.int + {sql| SELECT id + FROM blocks + WHERE chain_status = 'pending' + |sql} ) + end + + module Block_user_command = struct + type t = + { block_id : int + ; user_command_id : int + ; sequence_no : int + ; status : string + ; failure_reason : string option + ; fee_payer_account_creation_fee_paid : int64 option + ; receiver_account_creation_fee_paid : int64 option + ; created_token : int64 option + ; fee_payer_balance_id : int + ; source_balance_id : int option + ; receiver_balance_id : int option + } + [@@deriving hlist] + + let typ = + let open Mina_caqti.Type_spec in + let spec = + Caqti_type. + [ int + ; int + ; int + ; string + ; option string + ; option int64 + ; option int64 + ; option int64 + ; int + ; option int + ; option int + ] + in + let encode t = Ok (hlist_to_tuple spec (to_hlist t)) in + let decode t = Ok (of_hlist (tuple_to_hlist spec t)) in + Caqti_type.custom ~encode ~decode (to_rep spec) + + let load_block (module Conn : CONNECTION) ~block_id = + Conn.collect_list + (Caqti_request.collect Caqti_type.int typ + {sql| SELECT block_id, user_command_id, + sequence_no, + status,failure_reason, + fee_payer_account_creation_fee_paid, + receiver_account_creation_fee_paid, + created_token, + fee_payer_balance, + source_balance, + receiver_balance + FROM blocks_user_commands + WHERE block_id = $1 + ORDER BY sequence_no + |sql} ) + block_id + end + + module Block_internal_command = struct + type t = + { block_id : int + ; internal_command_id : int + ; sequence_no : int + ; secondary_sequence_no : int + ; receiver_account_creation_fee_paid : int64 option + ; receiver_balance : int + } + [@@deriving hlist] + + let typ = + let open Mina_caqti.Type_spec in + let spec = Caqti_type.[ int; int; int; int; option int64; int ] in + let encode t = Ok (hlist_to_tuple spec (to_hlist t)) in + let decode t = Ok (of_hlist (tuple_to_hlist spec t)) in + Caqti_type.custom ~encode ~decode (to_rep spec) + + let load_block (module Conn : CONNECTION) ~block_id = + Conn.collect_list + (Caqti_request.collect Caqti_type.int typ + {sql| SELECT block_id, internal_command_id, + sequence_no, secondary_sequence_no, + receiver_account_creation_fee_paid, + receiver_balance + FROM blocks_internal_commands + WHERE block_id = $1 + ORDER BY sequence_no, secondary_sequence_no + |sql} ) + block_id + end + + module Internal_command = struct + type t = + { typ : string + ; receiver_id : int + ; fee : int64 + ; token : int64 + ; hash : string + } + + let typ = + let encode t = Ok ((t.typ, t.receiver_id, t.fee, t.token), t.hash) in + let decode ((typ, receiver_id, fee, token), hash) = + Ok { typ; receiver_id; fee; token; hash } + in + let rep = Caqti_type.(tup2 (tup4 string int int64 int64) string) in + Caqti_type.custom ~encode ~decode rep + + let load (module Conn : CONNECTION) ~(id : int) = + Conn.find + (Caqti_request.find Caqti_type.int typ + {sql| SELECT type,receiver_id,fee,token,hash + FROM internal_commands + WHERE id = ? + |sql} ) + id + end + + module User_command = struct + type t = + { typ : string + ; fee_payer_id : int + ; source_id : int + ; receiver_id : int + ; fee_token : int64 + ; token : int64 + ; nonce : int + ; amount : int64 option + ; fee : int64 + ; valid_until : int64 option + ; memo : string + ; hash : string + } + [@@deriving hlist] + + let typ = + let open Mina_caqti.Type_spec in + let spec = + Caqti_type. + [ string + ; int + ; int + ; int + ; int64 + ; int64 + ; int + ; option int64 + ; int64 + ; option int64 + ; string + ; string + ] + in + let encode t = Ok (hlist_to_tuple spec (to_hlist t)) in + let decode t = Ok (of_hlist (tuple_to_hlist spec t)) in + Caqti_type.custom ~encode ~decode (to_rep spec) + + let load (module Conn : CONNECTION) ~(id : int) = + Conn.find + (Caqti_request.find Caqti_type.int typ + {sql| SELECT type,fee_payer_id,source_id,receiver_id, + fee_token,token, + nonce,amount,fee,valid_until,memo,hash + FROM user_commands + WHERE id = ? |sql} ) + id + end +end + +module Berkeley = struct + module Block = struct + let count (module Conn : CONNECTION) = + Conn.find + (Caqti_request.find Caqti_type.unit Caqti_type.int + {sql| SELECT count (*) + FROM blocks + |sql} ) + + let greatest_block_height (module Conn : CONNECTION) = + Conn.find + (Caqti_request.find Caqti_type.unit Caqti_type.int64 + {sql| SELECT height + FROM blocks + WHERE chain_status <> 'orphaned' + ORDER BY height DESC + LIMIT 1 + |sql} ) + end +end diff --git a/src/app/replayer/replayer.ml b/src/app/replayer/replayer.ml index 6ee72e5ff41..623e2f12854 100644 --- a/src/app/replayer/replayer.ml +++ b/src/app/replayer/replayer.ml @@ -1131,6 +1131,7 @@ let main ~input_file ~output_file_opt ~archive_uri ~continue_on_error () = , `String (Int64.to_string last_global_slot_since_genesis) ) ; ("block_id", `Int last_block_id) ] ; + check_ledger_hash_at_slot () ; Deferred.unit ) else let%bind () = run_transactions () in diff --git a/src/berkeley_migration.opam b/src/berkeley_migration.opam new file mode 100644 index 00000000000..7be19e3d612 --- /dev/null +++ b/src/berkeley_migration.opam @@ -0,0 +1,5 @@ +opam-version: "2.0" +version: "0.1" +build: [ + ["dune" "build" "--only" "src" "--root" "." "-j" jobs "@install"] +] diff --git a/src/lib/consensus/vrf/consensus_vrf.ml b/src/lib/consensus/vrf/consensus_vrf.ml index 25eedac8a13..4a95e7bc7c7 100644 --- a/src/lib/consensus/vrf/consensus_vrf.ml +++ b/src/lib/consensus/vrf/consensus_vrf.ml @@ -167,13 +167,17 @@ module Output = struct let of_yojson = function | `String s -> - Result.map_error - (Base64.decode ~alphabet:Base64.uri_safe_alphabet s) - ~f:(function `Msg err -> - sprintf - "Error decoding vrf output in \ - Vrf.Output.Truncated.Stable.V1.of_yojson: %s" - err ) + (* missing type equation somewhere, add explicit type *) + ( match Base64.decode ~alphabet:Base64.uri_safe_alphabet s with + | Ok b64 -> + Ppx_deriving_yojson_runtime.Result.Ok b64 + | Error (`Msg err) -> + Error + (sprintf + "Error decoding vrf output in \ + Vrf.Output.Truncated.Stable.V1.of_yojson: %s" + err ) + : (t, string) Ppx_deriving_yojson_runtime.Result.result ) | _ -> Error "Vrf.Output.Truncated.Stable.V1.of_yojson: Expected a string" @@ -190,6 +194,9 @@ module Output = struct let description = "Vrf Truncated Output" end) + (* don't want the yojson functions from Make_base58_check *) + [%%define_locally Stable.Latest.(of_yojson, to_yojson)] + open Tick let length_in_bits = Int.min 256 (Field.size_in_bits - 2) diff --git a/src/lib/consensus/vrf/dune b/src/lib/consensus/vrf/dune index 84676835dd0..901c031e78b 100644 --- a/src/lib/consensus/vrf/dune +++ b/src/lib/consensus/vrf/dune @@ -47,6 +47,7 @@ kimchi_bindings kimchi_types pasta_bindings + ppx_deriving_yojson.runtime ppx_version.runtime ) (inline_tests) From 30c28886ba62978ddd5cbdef05aff1646bf97dd0 Mon Sep 17 00:00:00 2001 From: Paul Steckler Date: Thu, 23 Mar 2023 12:00:22 -0700 Subject: [PATCH 02/12] revert name --- src/app/archive/lib/processor.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/app/archive/lib/processor.ml b/src/app/archive/lib/processor.ml index 8fccf4f14dd..4e466f5e92e 100644 --- a/src/app/archive/lib/processor.ml +++ b/src/app/archive/lib/processor.ml @@ -4016,7 +4016,7 @@ let setup_server ~metrics_server_port ~constraint_constants ~logger Async.Tcp.Where_to_listen.bind_to All_addresses (On_port server_port) in let reader, writer = - Strict_pipe.create ~name:"archive_migrated_to_berkeley" Synchronous + Strict_pipe.create ~name:"archive" Synchronous in let precomputed_block_reader, precomputed_block_writer = Strict_pipe.create ~name:"precomputed_archive_block" Synchronous From fb2ac80a7cc1d4be3f9e0e18f55d25e30a4362dc Mon Sep 17 00:00:00 2001 From: Paul Steckler Date: Thu, 23 Mar 2023 12:07:30 -0700 Subject: [PATCH 03/12] reformat --- src/app/archive/lib/processor.ml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/app/archive/lib/processor.ml b/src/app/archive/lib/processor.ml index 4e466f5e92e..f0735e62002 100644 --- a/src/app/archive/lib/processor.ml +++ b/src/app/archive/lib/processor.ml @@ -4015,9 +4015,7 @@ let setup_server ~metrics_server_port ~constraint_constants ~logger let where_to_listen = Async.Tcp.Where_to_listen.bind_to All_addresses (On_port server_port) in - let reader, writer = - Strict_pipe.create ~name:"archive" Synchronous - in + let reader, writer = Strict_pipe.create ~name:"archive" Synchronous in let precomputed_block_reader, precomputed_block_writer = Strict_pipe.create ~name:"precomputed_archive_block" Synchronous in From aeb1a0cb55959434ddca32349dc200956db3853d Mon Sep 17 00:00:00 2001 From: Paul Steckler Date: Thu, 23 Mar 2023 12:29:48 -0700 Subject: [PATCH 04/12] fix extract_blocks --- src/app/extract_blocks/dune | 1 + src/app/extract_blocks/extract_blocks.ml | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/app/extract_blocks/dune b/src/app/extract_blocks/dune index a0df1831b25..e3bf8046c9e 100644 --- a/src/app/extract_blocks/dune +++ b/src/app/extract_blocks/dune @@ -18,6 +18,7 @@ uri async.async_command ;; local libraries + consensus_vrf mina_wire_types mina_base mina_base.import diff --git a/src/app/extract_blocks/extract_blocks.ml b/src/app/extract_blocks/extract_blocks.ml index 774821f2439..d65698f758a 100644 --- a/src/app/extract_blocks/extract_blocks.ml +++ b/src/app/extract_blocks/extract_blocks.ml @@ -48,8 +48,7 @@ let fill_in_block pool (block : Archive_lib.Processor.Block.t) : let%bind creator = pk_of_id block.creator_id in let%bind block_winner = pk_of_id block.block_winner_id in let last_vrf_output = - (* keep hex encoding *) - block.last_vrf_output + Base64.decode_exn ~alphabet:Base64.uri_safe_alphabet block.last_vrf_output in let%bind snarked_ledger_hash_str = query_db ~f:(fun db -> From 0d9b605f5fc6c84ada9f9381f2109b8eb5cd14fc Mon Sep 17 00:00:00 2001 From: Paul Steckler Date: Sat, 1 Apr 2023 12:24:27 -0700 Subject: [PATCH 05/12] move berkeley_migration OPAM package to dune-project --- src/berkeley_migration.opam | 5 ----- src/dune-project | 1 + 2 files changed, 1 insertion(+), 5 deletions(-) delete mode 100644 src/berkeley_migration.opam diff --git a/src/berkeley_migration.opam b/src/berkeley_migration.opam deleted file mode 100644 index 7be19e3d612..00000000000 --- a/src/berkeley_migration.opam +++ /dev/null @@ -1,5 +0,0 @@ -opam-version: "2.0" -version: "0.1" -build: [ - ["dune" "build" "--only" "src" "--root" "." "-j" jobs "@install"] -] diff --git a/src/dune-project b/src/dune-project index 59b72424864..0e4882405e5 100644 --- a/src/dune-project +++ b/src/dune-project @@ -5,6 +5,7 @@ (package (name archive_blocks)) (package (name base58_check)) (package (name bash_colors)) +(package (name berkeley_migration)) (package (name best_tip_merger)) (package (name best_tip_prover)) (package (name bignum_bigint)) From 80ad0d5b6d40a423b93e4e774eb237666745e628 Mon Sep 17 00:00:00 2001 From: Paul Steckler Date: Wed, 6 Sep 2023 16:14:10 -0700 Subject: [PATCH 06/12] Patch over bit-rot --- src/app/berkeley_migration/berkeley_migration.ml | 10 +++++----- src/app/replayer/replayer.ml | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/app/berkeley_migration/berkeley_migration.ml b/src/app/berkeley_migration/berkeley_migration.ml index 79bce73ae75..d6bf75470a0 100644 --- a/src/app/berkeley_migration/berkeley_migration.ml +++ b/src/app/berkeley_migration/berkeley_migration.ml @@ -153,7 +153,7 @@ let user_commands_from_block_id ~mainnet_pool block_id = let valid_until = Option.map user_cmd.valid_until ~f:(fun valid_until -> Unsigned.UInt32.of_int64 valid_until - |> Mina_numbers.Global_slot.of_uint32 ) + |> Mina_numbers.Global_slot_since_genesis.of_uint32 ) in let memo = Mina_base.Signed_command_memo.of_base58_check_exn user_cmd.memo @@ -171,11 +171,11 @@ let user_commands_from_block_id ~mainnet_pool block_id = | "payment" -> let amount = Option.value_exn amount in Mina_base.Signed_command_payload.Body.Payment - { source_pk = source; receiver_pk = receiver; amount } + { receiver_pk = receiver; amount } | "delegation" -> let set_delegate = Mina_base.Stake_delegation.Set_delegate - { delegator = source; new_delegate = receiver } + { new_delegate = receiver } in Mina_base.Signed_command_payload.Body.Stake_delegation set_delegate @@ -388,11 +388,11 @@ let mainnet_block_to_extensional ~logger ~mainnet_pool let height = Unsigned.UInt32.of_int64 block.height in let global_slot_since_hard_fork = block.global_slot_since_hard_fork |> Unsigned.UInt32.of_int64 - |> Mina_numbers.Global_slot.of_uint32 + |> Mina_numbers.Global_slot_since_hard_fork.of_uint32 in let global_slot_since_genesis = block.global_slot_since_genesis |> Unsigned.UInt32.of_int64 - |> Mina_numbers.Global_slot.of_uint32 + |> Mina_numbers.Global_slot_since_genesis.of_uint32 in let timestamp = Block_time.of_int64 block.timestamp in let%bind block_id = diff --git a/src/app/replayer/replayer.ml b/src/app/replayer/replayer.ml index b1f17d58773..ca9492c319c 100644 --- a/src/app/replayer/replayer.ml +++ b/src/app/replayer/replayer.ml @@ -1574,7 +1574,7 @@ let () = Command.async ~summary:"Replay transactions from Mina archive database" (let%map input_file = Param.flag "--input-file" - ~doc:"file File containing the genesis ledger" + ~doc:"file File containing the starting ledger" Param.(required string) and output_file_opt = Param.flag "--output-file" From b8f8a356120eea5cfb62820d54f10df25d60ac42 Mon Sep 17 00:00:00 2001 From: Paul Steckler Date: Fri, 8 Sep 2023 17:09:50 -0700 Subject: [PATCH 07/12] add VALUES with type annos --- src/app/archive/lib/processor.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/app/archive/lib/processor.ml b/src/app/archive/lib/processor.ml index cfaf9be9acd..fa9af2d2600 100644 --- a/src/app/archive/lib/processor.ml +++ b/src/app/archive/lib/processor.ml @@ -3291,6 +3291,7 @@ module Block = struct global_slot_since_hard_fork, global_slot_since_genesis, protocol_version_id, proposed_protocol_version_id, timestamp, chain_status) + VALUES (?,?,?,?,?,?,?,?,?,?,?::bigint[],?,?,?,?,?,?,?,?,?::chain_status_type) RETURNING id |sql} ) { state_hash = block.state_hash |> State_hash.to_base58_check From 4a90ec1e5a21f2831da1f8a53c58bd641eb9361c Mon Sep 17 00:00:00 2001 From: "It's me, CI" Date: Fri, 20 Oct 2023 14:25:08 -0700 Subject: [PATCH 08/12] Add end-global-slot arg --- .../berkeley_migration/berkeley_migration.ml | 46 +++++++++++++------ src/app/berkeley_migration/sql.ml | 35 +++++++------- 2 files changed, 50 insertions(+), 31 deletions(-) diff --git a/src/app/berkeley_migration/berkeley_migration.ml b/src/app/berkeley_migration/berkeley_migration.ml index d6bf75470a0..45f8018f4d3 100644 --- a/src/app/berkeley_migration/berkeley_migration.ml +++ b/src/app/berkeley_migration/berkeley_migration.ml @@ -222,9 +222,7 @@ let user_commands_from_block_id ~mainnet_pool block_id = let first_batch = ref true -(* from src/config/protocol_version/current.mlh in `compatible` *) -let mainnet_protocol_version = - Protocol_version.create_exn ~major:2 ~minor:0 ~patch:0 +let mainnet_protocol_version = Protocol_version.current let mainnet_block_to_extensional ~logger ~mainnet_pool ~(genesis_block : Mina_block.t) (block : Sql.Mainnet.Block.t) = @@ -445,8 +443,8 @@ let mainnet_block_to_extensional ~logger ~mainnet_pool } : Archive_lib.Extensional.Block.t ) -(* archive_uri is for the db to be migrated *) -let main ~mainnet_archive_uri ~migrated_archive_uri ~runtime_config_file () = +let main ~mainnet_archive_uri ~migrated_archive_uri ~runtime_config_file + ~end_global_slot () = let logger = Logger.create () in let mainnet_archive_uri = Uri.of_string mainnet_archive_uri in let migrated_archive_uri = Uri.of_string migrated_archive_uri in @@ -467,12 +465,11 @@ let main ~mainnet_archive_uri ~migrated_archive_uri ~runtime_config_file () = (* use Processor to write to migrated db; separate code to read from mainnet db *) let query_mainnet_db ~f = Mina_caqti.query ~f mainnet_pool in let query_migrated_db ~f = Mina_caqti.query ~f migrated_pool in - (* dummy protocol version *) - Protocol_version.(set_current @@ create_exn ~major:1 ~minor:0 ~patch:0) ; let runtime_config = Yojson.Safe.from_file runtime_config_file |> Runtime_config.of_yojson |> Result.ok_or_failwith in + [%log info] "Getting precomputed values from runtime config" ; let proof_level = Genesis_constants.Proof_level.compiled in let%bind precomputed_values = match%map @@ -485,20 +482,30 @@ let main ~mainnet_archive_uri ~migrated_archive_uri ~runtime_config_file () = failwithf "Could not get precomputed values, error: %s" (Error.to_string_hum err) () in + [%log info] "Got precomputed values from runtime config" ; let With_hash.{ data = genesis_block; _ }, _validation = Mina_block.genesis ~precomputed_values in - [%log info] "Querying mainnet canonical blocks" ; let%bind mainnet_block_ids = - query_mainnet_db ~f:(fun db -> Sql.Mainnet.Block.canonical_blocks db ()) + match end_global_slot with + | None -> + [%log info] "Querying mainnet canonical blocks" ; + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Block.canonical_blocks db () ) + | Some slot -> + [%log info] + "Querying mainnet blocks at or below global slot since genesis \ + %d (but not orphaned blocks)" + slot ; + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Block.blocks_at_or_below db slot ) in + [%log info] "Found %d mainnet blocks" (List.length mainnet_block_ids) ; let%bind mainnet_blocks_unsorted = Deferred.List.map mainnet_block_ids ~f:(fun id -> query_mainnet_db ~f:(fun db -> Sql.Mainnet.Block.load db ~id) ) in - (* remove blocks we've already migrated - we don't migrate genesis block at height 1, the archive process will write that - *) + (* remove blocks we've already migrated *) let%bind greatest_migrated_height = let%bind count = query_migrated_db ~f:(fun db -> Sql.Berkeley.Block.count db ()) @@ -519,6 +526,8 @@ let main ~mainnet_archive_uri ~migrated_archive_uri ~runtime_config_file () = List.filter mainnet_blocks_unsorted ~f:(fun block -> Int64.( > ) block.height greatest_migrated_height ) in + [%log info] "Will migrate %d mainnet blocks" + (List.length mainnet_blocks_unmigrated) ; (* blocks in height order *) let mainnet_blocks_to_migrate = List.sort mainnet_blocks_unmigrated ~compare:(fun block1 block2 -> @@ -569,6 +578,15 @@ let () = and runtime_config_file = Param.flag "--config-file" ~aliases:[ "-config-file" ] Param.(required string) - ~doc:"PATH to the configuration file containing the genesis ledger" + ~doc: + "PATH to the configuration file containing the berkeley genesis \ + ledger" + and end_global_slot = + Param.flag "--end-global-slot" ~aliases:[ "-end-global-slot" ] + Param.(optional int) + ~doc: + "NN Last global slot since genesis to include in the migration \ + (if omitted, only canonical blocks will be migrated)" in - main ~mainnet_archive_uri ~migrated_archive_uri ~runtime_config_file ))) + main ~mainnet_archive_uri ~migrated_archive_uri ~runtime_config_file + ~end_global_slot ))) diff --git a/src/app/berkeley_migration/sql.ml b/src/app/berkeley_migration/sql.ml index 6cb0ce5c894..ed9eda2ae90 100644 --- a/src/app/berkeley_migration/sql.ml +++ b/src/app/berkeley_migration/sql.ml @@ -1,4 +1,4 @@ -(* sql.ml -- for reading the mainnet database (no writing!) *) +(* sql.ml -- for reading the mainnet and berkeley databases (no writing!) *) open Core open Caqti_async @@ -66,7 +66,7 @@ module Mainnet = struct let id_from_state_hash (module Conn : CONNECTION) state_hash = Conn.find (Caqti_request.find Caqti_type.string Caqti_type.int - {sql| SELECT id + {sql| SELECT id FROM blocks WHERE state_hash = ? |sql} ) @@ -76,27 +76,28 @@ module Mainnet = struct Conn.find (Caqti_request.find Caqti_type.int typ {sql| SELECT state_hash, parent_id, parent_hash, creator_id, - block_winner_id, snarked_ledger_hash_id, staking_epoch_data_id, - next_epoch_data_id, ledger_hash, height, global_slot, - global_slot_since_genesis, timestamp, chain_status FROM blocks - WHERE id = ? |sql} ) + block_winner_id, snarked_ledger_hash_id, staking_epoch_data_id, + next_epoch_data_id, ledger_hash, height, global_slot, + global_slot_since_genesis, timestamp, chain_status FROM blocks + WHERE id = ? |sql} ) id let canonical_blocks (module Conn : CONNECTION) = Conn.collect_list (Caqti_request.collect Caqti_type.unit Caqti_type.int {sql| SELECT id - FROM blocks - WHERE chain_status = 'canonical' + FROM blocks + WHERE chain_status = 'canonical' |sql} ) - let pending_blocks (module Conn : CONNECTION) = + let blocks_at_or_below (module Conn : CONNECTION) slot = Conn.collect_list - (Caqti_request.collect Caqti_type.unit Caqti_type.int - {sql| SELECT id - FROM blocks - WHERE chain_status = 'pending' - |sql} ) + (Caqti_request.collect Caqti_type.int Caqti_type.int + {sql| SELECT id,parent_id,global_slot_since_genesis FROM blocks + WHERE global_slot_since_genesis <= $1 + AND chain_status <> 'orphaned' + |sql} ) + slot end module Block_user_command = struct @@ -144,9 +145,9 @@ module Mainnet = struct status,failure_reason, fee_payer_account_creation_fee_paid, receiver_account_creation_fee_paid, - created_token, - fee_payer_balance, - source_balance, + created_token, + fee_payer_balance, + source_balance, receiver_balance FROM blocks_user_commands WHERE block_id = $1 From 922bd6e0d35cf127250f21a048b1464c70489ccc Mon Sep 17 00:00:00 2001 From: "It's me, CI" Date: Sat, 21 Oct 2023 20:09:28 -0700 Subject: [PATCH 09/12] set mainnet protocol version --- src/app/berkeley_migration/berkeley_migration.ml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/app/berkeley_migration/berkeley_migration.ml b/src/app/berkeley_migration/berkeley_migration.ml index 45f8018f4d3..bcfae49c734 100644 --- a/src/app/berkeley_migration/berkeley_migration.ml +++ b/src/app/berkeley_migration/berkeley_migration.ml @@ -222,7 +222,8 @@ let user_commands_from_block_id ~mainnet_pool block_id = let first_batch = ref true -let mainnet_protocol_version = Protocol_version.current +let mainnet_protocol_version = + Protocol_version.create ~transaction:2 ~network:1 ~patch:0 let mainnet_block_to_extensional ~logger ~mainnet_pool ~(genesis_block : Mina_block.t) (block : Sql.Mainnet.Block.t) = From ce8e0422ab2783d6fa0d308f88f465af60e8f206 Mon Sep 17 00:00:00 2001 From: "It's me, CI" Date: Mon, 23 Oct 2023 11:24:41 -0700 Subject: [PATCH 10/12] Mainnet protocol version is 1.0 --- src/app/berkeley_migration/berkeley_migration.ml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/app/berkeley_migration/berkeley_migration.ml b/src/app/berkeley_migration/berkeley_migration.ml index bcfae49c734..65e3cb7a6c7 100644 --- a/src/app/berkeley_migration/berkeley_migration.ml +++ b/src/app/berkeley_migration/berkeley_migration.ml @@ -223,7 +223,13 @@ let user_commands_from_block_id ~mainnet_pool block_id = let first_batch = ref true let mainnet_protocol_version = - Protocol_version.create ~transaction:2 ~network:1 ~patch:0 + (* It would be more accurate to posit distinct patches for each + mainnet release, but it's sufficient to have a protocol version + earlier than the berkeley hard fork protocol version. After the + hard fork, the replayer won't check ledger hashes for blocks with + an earlier protocol version. + *) + Protocol_version.create ~transaction:1 ~network:0 ~patch:0 let mainnet_block_to_extensional ~logger ~mainnet_pool ~(genesis_block : Mina_block.t) (block : Sql.Mainnet.Block.t) = From c97ae58202c4e1897aa81c8b6a26755502ac28f3 Mon Sep 17 00:00:00 2001 From: "It's me, CI" Date: Mon, 30 Oct 2023 13:46:04 -0700 Subject: [PATCH 11/12] Pass thru old txn hashes --- .../berkeley_migration/berkeley_migration.ml | 54 ++----------------- 1 file changed, 3 insertions(+), 51 deletions(-) diff --git a/src/app/berkeley_migration/berkeley_migration.ml b/src/app/berkeley_migration/berkeley_migration.ml index 65e3cb7a6c7..6fa1476ae30 100644 --- a/src/app/berkeley_migration/berkeley_migration.ml +++ b/src/app/berkeley_migration/berkeley_migration.ml @@ -7,8 +7,6 @@ open Async migrated database name *) -(* TODO: table of old, new txn hashes *) - let mainnet_transaction_failure_of_string s : Mina_base.Transaction_status.Failure.t = match s with @@ -68,7 +66,7 @@ let internal_commands_from_block_id ~mainnet_pool block_id = Sql.Mainnet.Internal_command.load db ~id:block_internal_cmd.internal_command_id ) in - (* some fields come from blocks_internal_commands, others from user_commands *) + (* some fields come from blocks_internal_commands, others from internal_commands *) let sequence_no = block_internal_cmd.sequence_no in let secondary_sequence_no = block_internal_cmd.secondary_sequence_no in let command_type = internal_cmd.typ in @@ -77,27 +75,7 @@ let internal_commands_from_block_id ~mainnet_pool block_id = internal_cmd.fee |> Unsigned.UInt64.of_int64 |> Currency.Fee.of_uint64 in let hash = - match command_type with - | "coinbase" -> - let coinbase = - (* always valid, since fee transfer is None *) - let amount = Currency.Amount.of_fee fee in - Mina_base.Coinbase.create ~amount ~receiver ~fee_transfer:None - |> Or_error.ok_exn - in - Mina_transaction.Transaction_hash.hash_coinbase coinbase - | "fee_transfer" | "fee_transfer_via_coinbase" -> ( - let fee_transfer = - Mina_base.Fee_transfer.create_single ~receiver_pk:receiver ~fee - ~fee_token:Mina_base.Token_id.default - in - match Mina_base.Fee_transfer.to_singles fee_transfer with - | `One single -> - Mina_transaction.Transaction_hash.hash_fee_transfer single - | `Two _ -> - failwith "Expected one single in fee transfer" ) - | s -> - failwithf "Unknown command type %s" s () + Mina_transaction.Transaction_hash.of_base58_check_exn internal_cmd.hash in let status = "applied" in let failure_reason = None in @@ -163,34 +141,8 @@ let user_commands_from_block_id ~mainnet_pool block_id = Option.map block_user_cmd.failure_reason ~f:mainnet_transaction_failure_of_string in - (* a V2 signed command *) - let signed_cmd = - let payload = - let body = - match command_type with - | "payment" -> - let amount = Option.value_exn amount in - Mina_base.Signed_command_payload.Body.Payment - { receiver_pk = receiver; amount } - | "delegation" -> - let set_delegate = - Mina_base.Stake_delegation.Set_delegate - { new_delegate = receiver } - in - Mina_base.Signed_command_payload.Body.Stake_delegation - set_delegate - | s -> - failwithf "Unknown command type: %s" s () - in - Mina_base.Signed_command_payload.create ~fee ~fee_payer_pk:fee_payer - ~nonce ~valid_until ~memo ~body - in - let signer = Signature_lib.Public_key.decompress_exn source in - let signature = Mina_base.Signature.dummy in - ({ payload; signer; signature } : Mina_base.Signed_command.t) - in let hash = - Mina_transaction.Transaction_hash.hash_signed_command signed_cmd + Mina_transaction.Transaction_hash.of_base58_check_exn user_cmd.hash in let cmd : Archive_lib.Extensional.User_command.t = { sequence_no From fc599db8f5817e2c18e0b64ff0d34e80f26b0378 Mon Sep 17 00:00:00 2001 From: "It's me, CI" Date: Mon, 30 Oct 2023 17:42:05 -0700 Subject: [PATCH 12/12] use generated field names for extensional blocks --- src/app/archive/lib/processor.ml | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/app/archive/lib/processor.ml b/src/app/archive/lib/processor.ml index 73812e2e14b..1b1cf4161e6 100644 --- a/src/app/archive/lib/processor.ml +++ b/src/app/archive/lib/processor.ml @@ -3235,19 +3235,15 @@ module Block = struct in Conn.find (Caqti_request.find typ Caqti_type.int - {sql| INSERT INTO blocks - (state_hash, parent_id, parent_hash, - creator_id, block_winner_id,last_vrf_output, - snarked_ledger_hash_id, staking_epoch_data_id, - next_epoch_data_id, - min_window_density, sub_window_densities, total_currency, - ledger_hash, height, - global_slot_since_hard_fork, global_slot_since_genesis, - protocol_version_id, proposed_protocol_version_id, - timestamp, chain_status) - VALUES (?,?,?,?,?,?,?,?,?,?,?::bigint[],?,?,?,?,?,?,?,?,?::chain_status_type) - RETURNING id - |sql} ) + (Mina_caqti.insert_into_cols ~returning:"id" ~table_name + ~tannot:(function + | "sub_window_densities" -> + Some "bigint[]" + | "chain_status" -> + Some "chain_status_type" + | _ -> + None ) + ~cols:Fields.names () ) ) { state_hash = block.state_hash |> State_hash.to_base58_check ; parent_id ; parent_hash = block.parent_hash |> State_hash.to_base58_check