diff --git a/src/app/archive/lib/dune b/src/app/archive/lib/dune index 0117fce5bb8..385408aca97 100644 --- a/src/app/archive/lib/dune +++ b/src/app/archive/lib/dune @@ -27,8 +27,8 @@ child_processes precomputed_values coda_genesis_ledger + consensus.vrf mina_runtime_config - hex sgn mina_base.util kimchi_backend.pasta @@ -72,9 +72,10 @@ mina_version staged_ledger_diff error_json + ppx_deriving_yojson.runtime ppx_version.runtime ) (inline_tests (flags -verbose -show-counts)) (modes native) (instrumentation (backend bisect_ppx)) - (preprocess (pps ppx_mina ppx_version ppx_jane ppx_custom_printf h_list.ppx))) + (preprocess (pps ppx_mina ppx_version ppx_jane ppx_custom_printf ppx_deriving_yojson h_list.ppx))) diff --git a/src/app/archive/lib/extensional.ml b/src/app/archive/lib/extensional.ml index bd4c962e21a..30d02cce6f4 100644 --- a/src/app/archive/lib/extensional.ml +++ b/src/app/archive/lib/extensional.ml @@ -111,7 +111,7 @@ module Block = struct ; parent_hash : State_hash.Stable.V1.t ; creator : Public_key.Compressed.Stable.V1.t ; block_winner : Public_key.Compressed.Stable.V1.t - ; last_vrf_output : string + ; last_vrf_output : Consensus_vrf.Output.Truncated.Stable.V1.t ; snarked_ledger_hash : Frozen_ledger_hash.Stable.V1.t ; staking_epoch_data : Mina_base.Epoch_data.Value.Stable.V1.t ; next_epoch_data : Mina_base.Epoch_data.Value.Stable.V1.t diff --git a/src/app/archive/lib/processor.ml b/src/app/archive/lib/processor.ml index bb1b346640f..1b1cf4161e6 100644 --- a/src/app/archive/lib/processor.ml +++ b/src/app/archive/lib/processor.ml @@ -1864,7 +1864,7 @@ module User_command = struct (Caqti_request.find typ Caqti_type.int (Mina_caqti.insert_into_cols ~returning:"id" ~table_name ~tannot:(function - | "typ" -> Some "user_command_type" | _ -> None ) + | "command_type" -> Some "user_command_type" | _ -> None ) ~cols:Fields.names () ) ) { command_type = user_cmd.command_type ; fee_payer_id @@ -2013,7 +2013,8 @@ module Internal_command = struct (Caqti_request.find typ Caqti_type.int (Mina_caqti.insert_into_cols ~returning:"id" ~table_name ~tannot:(function - | "typ" -> Some "internal_command_type" | _ -> None ) + | "command_type" -> Some "internal_command_type" | _ -> None + ) ~cols:Fields.names () ) ) { command_type = internal_cmd.command_type ; receiver_id @@ -2786,9 +2787,9 @@ module Block = struct (Consensus.Data.Consensus_state.block_stake_winner consensus_state) in let last_vrf_output = - (* encode as hex, Postgresql won't accept arbitrary bitstrings *) + (* encode as base64, same as in precomputed blocks JSON *) Consensus.Data.Consensus_state.last_vrf_output consensus_state - |> Hex.Safe.to_hex + |> Base64.encode_exn ~alphabet:Base64.uri_safe_alphabet in let%bind snarked_ledger_hash_id = Snarked_ledger_hash.add_if_doesn't_exist @@ -3192,8 +3193,9 @@ module Block = struct Public_key.add_if_doesn't_exist (module Conn) block.block_winner in let last_vrf_output = - (* already encoded as hex *) + (* encode as base64, same as in precomputed blocks JSON *) block.last_vrf_output + |> Base64.encode_exn ~alphabet:Base64.uri_safe_alphabet in let%bind snarked_ledger_hash_id = Snarked_ledger_hash.add_if_doesn't_exist @@ -3233,19 +3235,15 @@ module Block = struct in Conn.find (Caqti_request.find typ Caqti_type.int - {sql| INSERT INTO blocks - (state_hash, parent_id, parent_hash, - creator_id, block_winner_id,last_vrf_output, - snarked_ledger_hash_id, staking_epoch_data_id, - next_epoch_data_id, - min_window_density, sub_window_densities, total_currency, - ledger_hash, height, - global_slot_since_hard_fork, global_slot_since_genesis, - protocol_version, proposed_protocol_version, - timestamp, chain_status) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::bigint[], ?, ?, ?, ?, ?, ?, ?, ?, ?::chain_status_type) - RETURNING id - |sql} ) + (Mina_caqti.insert_into_cols ~returning:"id" ~table_name + ~tannot:(function + | "sub_window_densities" -> + Some "bigint[]" + | "chain_status" -> + Some "chain_status_type" + | _ -> + None ) + ~cols:Fields.names () ) ) { state_hash = block.state_hash |> State_hash.to_base58_check ; parent_id ; parent_hash = block.parent_hash |> State_hash.to_base58_check diff --git a/src/app/berkeley_migration/berkeley_migration.ml b/src/app/berkeley_migration/berkeley_migration.ml new file mode 100644 index 00000000000..6fa1476ae30 --- /dev/null +++ b/src/app/berkeley_migration/berkeley_migration.ml @@ -0,0 +1,551 @@ +(* berkeley_migration.ml -- migrate archive db from original Mina mainnet to berkeley schema *) + +open Core_kernel +open Async + +(* before running this program for the first time, import the berkeley schema to the + migrated database name +*) + +let mainnet_transaction_failure_of_string s : + Mina_base.Transaction_status.Failure.t = + match s with + | "Predicate" -> + Predicate + | "Source_not_present" -> + Source_not_present + | "Receiver_not_present" -> + Receiver_not_present + | "Amount_insufficient_to_create_account" -> + Amount_insufficient_to_create_account + | "Cannot_pay_creation_fee_in_token" -> + Cannot_pay_creation_fee_in_token + | "Source_insufficient_balance" -> + Source_insufficient_balance + | "Source_minimum_balance_violation" -> + Source_minimum_balance_violation + | "Receiver_already_exists" -> + Receiver_already_exists + | "Overflow" -> + Overflow + | "Incorrect_nonce" -> + Incorrect_nonce + (* these should never have occurred *) + | "Signed_command_on_snapp_account" + | "Not_token_owner" + | "Snapp_account_not_present" + | "Update_not_permitted" -> + failwith "Transaction failure unrepresentable in Berkeley" + | _ -> + failwith "No such transaction failure" + +let get_created paid pk = + match paid with + | None -> + [] + | Some fee -> + let acct_id = Mina_base.Account_id.create pk Mina_base.Token_id.default in + [ (acct_id, Unsigned.UInt64.of_int64 fee |> Currency.Fee.of_uint64) ] + +let internal_commands_from_block_id ~mainnet_pool block_id = + let query_mainnet_db ~f = Mina_caqti.query ~f mainnet_pool in + let pk_of_id id = + let%map pk_str = + query_mainnet_db ~f:(fun db -> Sql.Mainnet.Public_key.find_by_id db id) + in + Signature_lib.Public_key.Compressed.of_base58_check_exn pk_str + in + let%bind block_internal_cmds = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Block_internal_command.load_block db ~block_id ) + in + Deferred.List.fold block_internal_cmds ~init:([], []) + ~f:(fun (created, cmds) block_internal_cmd -> + let%bind internal_cmd = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Internal_command.load db + ~id:block_internal_cmd.internal_command_id ) + in + (* some fields come from blocks_internal_commands, others from internal_commands *) + let sequence_no = block_internal_cmd.sequence_no in + let secondary_sequence_no = block_internal_cmd.secondary_sequence_no in + let command_type = internal_cmd.typ in + let%bind receiver = pk_of_id internal_cmd.receiver_id in + let fee = + internal_cmd.fee |> Unsigned.UInt64.of_int64 |> Currency.Fee.of_uint64 + in + let hash = + Mina_transaction.Transaction_hash.of_base58_check_exn internal_cmd.hash + in + let status = "applied" in + let failure_reason = None in + let cmd : Archive_lib.Extensional.Internal_command.t = + { sequence_no + ; secondary_sequence_no + ; command_type + ; receiver + ; fee + ; hash + ; status + ; failure_reason + } + in + let created_by_cmd = + get_created block_internal_cmd.receiver_account_creation_fee_paid + receiver + in + return (created_by_cmd @ created, cmd :: cmds) ) + +let user_commands_from_block_id ~mainnet_pool block_id = + let query_mainnet_db ~f = Mina_caqti.query ~f mainnet_pool in + let%bind block_user_cmds = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Block_user_command.load_block db ~block_id ) + in + let pk_of_id id = + let%map pk_str = + query_mainnet_db ~f:(fun db -> Sql.Mainnet.Public_key.find_by_id db id) + in + Signature_lib.Public_key.Compressed.of_base58_check_exn pk_str + in + Deferred.List.fold block_user_cmds ~init:([], []) + ~f:(fun (created, cmds) block_user_cmd -> + let%bind user_cmd = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.User_command.load db ~id:block_user_cmd.user_command_id ) + in + (* some fields come from blocks_user_commands, others from user_commands *) + let sequence_no = block_user_cmd.sequence_no in + let command_type = user_cmd.typ in + let%bind fee_payer = pk_of_id user_cmd.fee_payer_id in + let%bind source = pk_of_id user_cmd.source_id in + let%bind receiver = pk_of_id user_cmd.receiver_id in + let nonce = Mina_numbers.Account_nonce.of_int user_cmd.nonce in + let amount = + Option.map user_cmd.amount ~f:(fun amount -> + Unsigned.UInt64.of_int64 amount |> Currency.Amount.of_uint64 ) + in + let fee = + user_cmd.fee |> Unsigned.UInt64.of_int64 |> Currency.Fee.of_uint64 + in + let valid_until = + Option.map user_cmd.valid_until ~f:(fun valid_until -> + Unsigned.UInt32.of_int64 valid_until + |> Mina_numbers.Global_slot_since_genesis.of_uint32 ) + in + let memo = + Mina_base.Signed_command_memo.of_base58_check_exn user_cmd.memo + in + let status = block_user_cmd.status in + let failure_reason = + Option.map block_user_cmd.failure_reason + ~f:mainnet_transaction_failure_of_string + in + let hash = + Mina_transaction.Transaction_hash.of_base58_check_exn user_cmd.hash + in + let cmd : Archive_lib.Extensional.User_command.t = + { sequence_no + ; command_type + ; fee_payer + ; source + ; receiver + ; nonce + ; amount + ; fee + ; valid_until + ; memo + ; hash + ; status + ; failure_reason + } + in + let created_by_cmd = + let fee_payer_created = + get_created block_user_cmd.fee_payer_account_creation_fee_paid + fee_payer + in + let receiver_created = + get_created block_user_cmd.receiver_account_creation_fee_paid receiver + in + fee_payer_created @ receiver_created + in + return (created_by_cmd @ created, cmd :: cmds) ) + +let first_batch = ref true + +let mainnet_protocol_version = + (* It would be more accurate to posit distinct patches for each + mainnet release, but it's sufficient to have a protocol version + earlier than the berkeley hard fork protocol version. After the + hard fork, the replayer won't check ledger hashes for blocks with + an earlier protocol version. + *) + Protocol_version.create ~transaction:1 ~network:0 ~patch:0 + +let mainnet_block_to_extensional ~logger ~mainnet_pool + ~(genesis_block : Mina_block.t) (block : Sql.Mainnet.Block.t) = + let query_mainnet_db ~f = Mina_caqti.query ~f mainnet_pool in + let is_genesis_block = Int64.equal block.height Int64.one in + let genesis_consensus_state = + lazy + (let protocol_state = + Mina_block.Header.protocol_state (Mina_block.header genesis_block) + in + let body = Mina_state.Protocol_state.body protocol_state in + Mina_state.Protocol_state.Body.consensus_state body ) + in + let%bind () = + (* we may try to be fetching more blocks than exist + gsutil seems to get the ones that do exist, in that exist + *) + let batch_size = 1000L in + if is_genesis_block then Deferred.unit + else if !first_batch then ( + let num_blocks = Int64.(batch_size - (block.height % batch_size)) in + [%log info] "Fetching first batch of precomputed blocks" + ~metadata: + [ ("height", `Int (Int64.to_int_exn block.height)) + ; ("num_blocks", `Int (Int64.to_int_exn num_blocks)) + ] ; + let%bind () = + Precomputed_block.fetch_batch ~height:block.height ~num_blocks + in + [%log info] "Done fetching first batch of precomputed blocks" ; + first_batch := false ; + Deferred.unit ) + else if Int64.(equal (block.height % batch_size)) 0L then ( + [%log info] "Deleting all precomputed blocks" ; + let%bind () = Precomputed_block.delete_fetched () in + [%log info] "Fetching batch of precomputed blocks" + ~metadata: + [ ("height", `Int (Int64.to_int_exn block.height)) + ; ("num_blocks", `Int (Int64.to_int_exn batch_size)) + ] ; + let%bind () = + Precomputed_block.fetch_batch ~height:block.height + ~num_blocks:batch_size + in + [%log info] "Done fetching batch of precomputed blocks" ; + Deferred.unit ) + else Deferred.unit + in + let pk_of_id id = + let%map pk_str = + query_mainnet_db ~f:(fun db -> Sql.Mainnet.Public_key.find_by_id db id) + in + Signature_lib.Public_key.Compressed.of_base58_check_exn pk_str + in + let state_hash = Mina_base.State_hash.of_base58_check_exn block.state_hash in + let parent_hash = + Mina_base.State_hash.of_base58_check_exn block.parent_hash + in + let%bind creator = pk_of_id block.creator_id in + let%bind block_winner = pk_of_id block.block_winner_id in + let%bind last_vrf_output = + (* the unencoded string, not base64 *) + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.last_vrf_output consensus + else + let%map json = + Precomputed_block.last_vrf_output ~state_hash:block.state_hash + ~height:block.height + in + Consensus_vrf.Output.Truncated.of_yojson json |> Result.ok_or_failwith + in + let%bind snarked_ledger_hash = + let%map hash_str = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Snarked_ledger_hash.find_by_id db + block.snarked_ledger_hash_id ) + in + Mina_base.Frozen_ledger_hash.of_base58_check_exn hash_str + in + (* TODO: confirm these match mainnet db ? *) + let%bind staking_epoch_data = + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.staking_epoch_data consensus + else + let%map json = + Precomputed_block.staking_epoch_data ~state_hash:block.state_hash + ~height:block.height + in + match Mina_base.Epoch_data.Value.of_yojson json with + | Ok epoch_data -> + epoch_data + | Error err -> + failwithf "Could not get staking epoch data, error: %s" err () + in + let%bind next_epoch_data = + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.next_epoch_data consensus + else + let%map json = + Precomputed_block.next_epoch_data ~state_hash:block.state_hash + ~height:block.height + in + match Mina_base.Epoch_data.Value.of_yojson json with + | Ok epoch_data -> + epoch_data + | Error err -> + failwithf "Could not get next epoch data, error: %s" err () + in + let%bind min_window_density = + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.min_window_density consensus + else + match%map + Precomputed_block.min_window_density ~state_hash:block.state_hash + ~height:block.height + with + | `String s -> + Mina_numbers.Length.of_string s + | _ -> + failwith "min_window_density: unexpected JSON" + in + let%bind total_currency = + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.total_currency consensus + else + match%map + Precomputed_block.total_currency ~state_hash:block.state_hash + ~height:block.height + with + | `String s -> + Currency.Amount.of_string s + | _ -> + failwith "total currency: unexpected JSON" + in + let%bind sub_window_densities = + if is_genesis_block then + let consensus = Lazy.force genesis_consensus_state in + return @@ Consensus.Data.Consensus_state.sub_window_densities consensus + else + match%map + Precomputed_block.subwindow_densities ~state_hash:block.state_hash + ~height:block.height + with + | `List items -> + List.map items ~f:(function + | `String s -> + Mina_numbers.Length.of_string s + | _ -> + failwith "Expected string for subwindow density" ) + | _ -> + failwith "sub_window_densities: unexpected JSON" + in + let ledger_hash = + Mina_base.Frozen_ledger_hash.of_base58_check_exn block.ledger_hash + in + let height = Unsigned.UInt32.of_int64 block.height in + let global_slot_since_hard_fork = + block.global_slot_since_hard_fork |> Unsigned.UInt32.of_int64 + |> Mina_numbers.Global_slot_since_hard_fork.of_uint32 + in + let global_slot_since_genesis = + block.global_slot_since_genesis |> Unsigned.UInt32.of_int64 + |> Mina_numbers.Global_slot_since_genesis.of_uint32 + in + let timestamp = Block_time.of_int64 block.timestamp in + let%bind block_id = + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Block.id_from_state_hash db block.state_hash ) + in + let%bind user_accounts_created, user_cmds_rev = + user_commands_from_block_id ~mainnet_pool block_id + in + let user_cmds = List.rev user_cmds_rev in + let%bind internal_accounts_created, internal_cmds_rev = + internal_commands_from_block_id ~mainnet_pool block_id + in + let internal_cmds = List.rev internal_cmds_rev in + let accounts_created = user_accounts_created @ internal_accounts_created in + let zkapp_cmds = [] in + let protocol_version = mainnet_protocol_version in + let proposed_protocol_version = None in + let chain_status = Archive_lib.Chain_status.of_string block.chain_status in + (* always the default token *) + let tokens_used = + if is_genesis_block then [] else [ (Mina_base.Token_id.default, None) ] + in + return + ( { state_hash + ; parent_hash + ; creator + ; block_winner + ; last_vrf_output + ; snarked_ledger_hash + ; staking_epoch_data + ; next_epoch_data + ; min_window_density + ; total_currency + ; sub_window_densities + ; ledger_hash + ; height + ; global_slot_since_hard_fork + ; global_slot_since_genesis + ; timestamp + ; user_cmds + ; internal_cmds + ; zkapp_cmds + ; protocol_version + ; proposed_protocol_version + ; chain_status + ; accounts_accessed = [] + ; accounts_created + ; tokens_used + } + : Archive_lib.Extensional.Block.t ) + +let main ~mainnet_archive_uri ~migrated_archive_uri ~runtime_config_file + ~end_global_slot () = + let logger = Logger.create () in + let mainnet_archive_uri = Uri.of_string mainnet_archive_uri in + let migrated_archive_uri = Uri.of_string migrated_archive_uri in + let mainnet_pool = + Caqti_async.connect_pool ~max_size:128 mainnet_archive_uri + in + let migrated_pool = + Caqti_async.connect_pool ~max_size:128 migrated_archive_uri + in + match (mainnet_pool, migrated_pool) with + | Error e, _ | _, Error e -> + [%log fatal] + ~metadata:[ ("error", `String (Caqti_error.show e)) ] + "Failed to create Caqti pools for Postgresql" ; + exit 1 + | Ok mainnet_pool, Ok migrated_pool -> + [%log info] "Successfully created Caqti pools for databases" ; + (* use Processor to write to migrated db; separate code to read from mainnet db *) + let query_mainnet_db ~f = Mina_caqti.query ~f mainnet_pool in + let query_migrated_db ~f = Mina_caqti.query ~f migrated_pool in + let runtime_config = + Yojson.Safe.from_file runtime_config_file + |> Runtime_config.of_yojson |> Result.ok_or_failwith + in + [%log info] "Getting precomputed values from runtime config" ; + let proof_level = Genesis_constants.Proof_level.compiled in + let%bind precomputed_values = + match%map + Genesis_ledger_helper.init_from_config_file ~logger + ~proof_level:(Some proof_level) runtime_config + with + | Ok (precomputed_values, _) -> + precomputed_values + | Error err -> + failwithf "Could not get precomputed values, error: %s" + (Error.to_string_hum err) () + in + [%log info] "Got precomputed values from runtime config" ; + let With_hash.{ data = genesis_block; _ }, _validation = + Mina_block.genesis ~precomputed_values + in + let%bind mainnet_block_ids = + match end_global_slot with + | None -> + [%log info] "Querying mainnet canonical blocks" ; + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Block.canonical_blocks db () ) + | Some slot -> + [%log info] + "Querying mainnet blocks at or below global slot since genesis \ + %d (but not orphaned blocks)" + slot ; + query_mainnet_db ~f:(fun db -> + Sql.Mainnet.Block.blocks_at_or_below db slot ) + in + [%log info] "Found %d mainnet blocks" (List.length mainnet_block_ids) ; + let%bind mainnet_blocks_unsorted = + Deferred.List.map mainnet_block_ids ~f:(fun id -> + query_mainnet_db ~f:(fun db -> Sql.Mainnet.Block.load db ~id) ) + in + (* remove blocks we've already migrated *) + let%bind greatest_migrated_height = + let%bind count = + query_migrated_db ~f:(fun db -> Sql.Berkeley.Block.count db ()) + in + if count = 0 then return Int64.zero + else + query_migrated_db ~f:(fun db -> + Sql.Berkeley.Block.greatest_block_height db () ) + in + if Int64.is_positive greatest_migrated_height then + [%log info] + "Already migrated blocks through height %Ld, resuming migration" + greatest_migrated_height ; + let mainnet_blocks_unmigrated = + if Int64.equal greatest_migrated_height Int64.zero then + mainnet_blocks_unsorted + else + List.filter mainnet_blocks_unsorted ~f:(fun block -> + Int64.( > ) block.height greatest_migrated_height ) + in + [%log info] "Will migrate %d mainnet blocks" + (List.length mainnet_blocks_unmigrated) ; + (* blocks in height order *) + let mainnet_blocks_to_migrate = + List.sort mainnet_blocks_unmigrated ~compare:(fun block1 block2 -> + Int64.compare block1.height block2.height ) + in + [%log info] "Migrating mainnet blocks" ; + let%bind () = + Deferred.List.iter mainnet_blocks_to_migrate ~f:(fun block -> + [%log info] + "Migrating mainnet block at height %Ld with state hash %s" + block.height block.state_hash ; + let%bind extensional_block = + mainnet_block_to_extensional ~logger ~mainnet_pool ~genesis_block + block + in + query_migrated_db ~f:(fun db -> + match%map + Archive_lib.Processor.Block.add_from_extensional db + extensional_block + with + | Ok _id -> + Ok () + | Error err -> + failwithf + "Could not archive extensional block from mainnet block \ + with state hash %s, error: %s" + block.state_hash (Caqti_error.show err) () ) ) + in + [%log info] "Deleting all precomputed blocks" ; + let%bind () = Precomputed_block.delete_fetched () in + [%log info] "Done migrating mainnet blocks!" ; + Deferred.unit + +let () = + Command.( + run + (let open Let_syntax in + Command.async + ~summary:"Migrate mainnet archive database to Berkeley schema" + (let%map mainnet_archive_uri = + Param.flag "--mainnet-archive-uri" + ~doc:"URI URI for connecting to the mainnet archive database" + Param.(required string) + and migrated_archive_uri = + Param.flag "--migrated-archive-uri" + ~doc:"URI URI for connecting to the migrated archive database" + Param.(required string) + and runtime_config_file = + Param.flag "--config-file" ~aliases:[ "-config-file" ] + Param.(required string) + ~doc: + "PATH to the configuration file containing the berkeley genesis \ + ledger" + and end_global_slot = + Param.flag "--end-global-slot" ~aliases:[ "-end-global-slot" ] + Param.(optional int) + ~doc: + "NN Last global slot since genesis to include in the migration \ + (if omitted, only canonical blocks will be migrated)" + in + main ~mainnet_archive_uri ~migrated_archive_uri ~runtime_config_file + ~end_global_slot ))) diff --git a/src/app/berkeley_migration/dune b/src/app/berkeley_migration/dune new file mode 100644 index 00000000000..d47fbc5f739 --- /dev/null +++ b/src/app/berkeley_migration/dune @@ -0,0 +1,48 @@ +(executable + (package berkeley_migration) + (name berkeley_migration) + (public_name berkeley_migration) + (libraries + ;; opam libraries + async_unix + core + result + async_kernel + uri + stdio + caqti-driver-postgresql + caqti + async + core_kernel + caqti-async + base + base.caml + async.async_command + integers + ;; local libraries + logger + archive_lib + block_time + consensus + consensus_vrf + currency + genesis_constants + genesis_ledger_helper + mina_base + mina_base.import + mina_block + mina_caqti + mina_numbers + mina_state + mina_transaction + mina_wire_types + one_or_two + protocol_version + runtime_config + signature_lib + unsigned_extended + with_hash + ) + (preprocessor_deps ../../config.mlh) + (instrumentation (backend bisect_ppx)) + (preprocess (pps ppx_version ppx_mina ppx_let ppx_hash ppx_compare ppx_sexp_conv h_list.ppx))) diff --git a/src/app/berkeley_migration/precomputed_block.ml b/src/app/berkeley_migration/precomputed_block.ml new file mode 100644 index 00000000000..b423250aeab --- /dev/null +++ b/src/app/berkeley_migration/precomputed_block.ml @@ -0,0 +1,83 @@ +(* precomputed_block.ml *) + +open Core +open Async + +(* Precomputed_block.t type has changed from mainnet, too hard to recreate old type + get what we need by traversing JSON +*) + +let make_target ~state_hash ~height = + sprintf "mainnet-%Ld-%s.json" height state_hash + +(* no precomputed genesis block at height 1 *) +let min_fetchable_height = 2L + +let make_batch_args ~height ~num_blocks = + let start_height = Int64.max min_fetchable_height height in + let actual_num_blocks = + if Int64.( < ) height min_fetchable_height then + Int64.(num_blocks - min_fetchable_height) + else num_blocks + in + List.init (Int64.to_int_exn actual_num_blocks) ~f:(fun n -> + sprintf "mainnet-%Ld-*.json" Int64.(start_height + Int64.of_int n) ) + +let fetch_batch ~height ~num_blocks = + let batch_args = make_batch_args ~height ~num_blocks in + let block_uris = + List.map batch_args ~f:(fun arg -> + sprintf "gs://mina_network_block_data/%s" arg ) + in + match%map + Process.run ~prog:"gsutil" ~args:([ "-m"; "cp" ] @ block_uris @ [ "." ]) () + with + | Ok _ -> + () + | Error err -> + failwithf + "Could not download batch of precomputed blocks at height %Ld, error: \ + %s" + height (Error.to_string_hum err) () + +let block_re = Str.regexp "mainnet-[0-9]+-.+\\.json" + +let delete_fetched () : unit Deferred.t = + let%bind files = Sys.readdir "." in + let block_files = + Array.filter files ~f:(fun file -> Str.string_match block_re file 0) + in + let args = Array.to_list block_files in + match%map Process.run ~prog:"rm" ~args () with + | Ok _ -> + () + | Error err -> + failwithf "Could not delete fetched precomputed blocks, error %s" + (Error.to_string_hum err) () + +let get_json_item filter ~state_hash ~height = + let target = make_target ~state_hash ~height in + match%map Process.run ~prog:"jq" ~args:[ filter; target ] () with + | Ok s -> + Yojson.Safe.from_string s + | Error err -> + failwithf + "Could not get JSON item with filter %s for state hash %s, error: %s" + filter state_hash (Error.to_string_hum err) () + +let consensus_state_item s = sprintf ".protocol_state.body.consensus_state.%s" s + +let last_vrf_output = get_json_item (consensus_state_item "last_vrf_output") + +let staking_epoch_data = + get_json_item (consensus_state_item "staking_epoch_data") + +let next_epoch_data = get_json_item (consensus_state_item "next_epoch_data") + +let min_window_density = + get_json_item (consensus_state_item "min_window_density") + +let subwindow_densities = + get_json_item (consensus_state_item "sub_window_densities") + +let total_currency = get_json_item (consensus_state_item "total_currency") diff --git a/src/app/berkeley_migration/sql.ml b/src/app/berkeley_migration/sql.ml new file mode 100644 index 00000000000..ed9eda2ae90 --- /dev/null +++ b/src/app/berkeley_migration/sql.ml @@ -0,0 +1,288 @@ +(* sql.ml -- for reading the mainnet and berkeley databases (no writing!) *) + +open Core +open Caqti_async + +module Mainnet = struct + module Public_key = struct + let find_by_id (module Conn : CONNECTION) id = + Conn.find + (Caqti_request.find Caqti_type.int Caqti_type.string + "SELECT value FROM public_keys WHERE id = ?" ) + id + end + + module Snarked_ledger_hash = struct + let find_by_id (module Conn : CONNECTION) id = + Conn.find + (Caqti_request.find Caqti_type.int Caqti_type.string + "SELECT value FROM snarked_ledger_hashes WHERE id = ?" ) + id + end + + module Block = struct + type t = + { state_hash : string + ; parent_id : int option + ; parent_hash : string + ; creator_id : int + ; block_winner_id : int + ; snarked_ledger_hash_id : int + ; staking_epoch_data_id : int + ; next_epoch_data_id : int + ; ledger_hash : string + ; height : int64 + ; global_slot_since_hard_fork : int64 + ; global_slot_since_genesis : int64 + ; timestamp : int64 + ; chain_status : string + } + [@@deriving hlist] + + let typ = + let open Mina_caqti.Type_spec in + let spec = + Caqti_type. + [ string + ; option int + ; string + ; int + ; int + ; int + ; int + ; int + ; string + ; int64 + ; int64 + ; int64 + ; int64 + ; string + ] + in + let encode t = Ok (hlist_to_tuple spec (to_hlist t)) in + let decode t = Ok (of_hlist (tuple_to_hlist spec t)) in + Caqti_type.custom ~encode ~decode (to_rep spec) + + let id_from_state_hash (module Conn : CONNECTION) state_hash = + Conn.find + (Caqti_request.find Caqti_type.string Caqti_type.int + {sql| SELECT id + FROM blocks + WHERE state_hash = ? + |sql} ) + state_hash + + let load (module Conn : CONNECTION) ~(id : int) = + Conn.find + (Caqti_request.find Caqti_type.int typ + {sql| SELECT state_hash, parent_id, parent_hash, creator_id, + block_winner_id, snarked_ledger_hash_id, staking_epoch_data_id, + next_epoch_data_id, ledger_hash, height, global_slot, + global_slot_since_genesis, timestamp, chain_status FROM blocks + WHERE id = ? |sql} ) + id + + let canonical_blocks (module Conn : CONNECTION) = + Conn.collect_list + (Caqti_request.collect Caqti_type.unit Caqti_type.int + {sql| SELECT id + FROM blocks + WHERE chain_status = 'canonical' + |sql} ) + + let blocks_at_or_below (module Conn : CONNECTION) slot = + Conn.collect_list + (Caqti_request.collect Caqti_type.int Caqti_type.int + {sql| SELECT id,parent_id,global_slot_since_genesis FROM blocks + WHERE global_slot_since_genesis <= $1 + AND chain_status <> 'orphaned' + |sql} ) + slot + end + + module Block_user_command = struct + type t = + { block_id : int + ; user_command_id : int + ; sequence_no : int + ; status : string + ; failure_reason : string option + ; fee_payer_account_creation_fee_paid : int64 option + ; receiver_account_creation_fee_paid : int64 option + ; created_token : int64 option + ; fee_payer_balance_id : int + ; source_balance_id : int option + ; receiver_balance_id : int option + } + [@@deriving hlist] + + let typ = + let open Mina_caqti.Type_spec in + let spec = + Caqti_type. + [ int + ; int + ; int + ; string + ; option string + ; option int64 + ; option int64 + ; option int64 + ; int + ; option int + ; option int + ] + in + let encode t = Ok (hlist_to_tuple spec (to_hlist t)) in + let decode t = Ok (of_hlist (tuple_to_hlist spec t)) in + Caqti_type.custom ~encode ~decode (to_rep spec) + + let load_block (module Conn : CONNECTION) ~block_id = + Conn.collect_list + (Caqti_request.collect Caqti_type.int typ + {sql| SELECT block_id, user_command_id, + sequence_no, + status,failure_reason, + fee_payer_account_creation_fee_paid, + receiver_account_creation_fee_paid, + created_token, + fee_payer_balance, + source_balance, + receiver_balance + FROM blocks_user_commands + WHERE block_id = $1 + ORDER BY sequence_no + |sql} ) + block_id + end + + module Block_internal_command = struct + type t = + { block_id : int + ; internal_command_id : int + ; sequence_no : int + ; secondary_sequence_no : int + ; receiver_account_creation_fee_paid : int64 option + ; receiver_balance : int + } + [@@deriving hlist] + + let typ = + let open Mina_caqti.Type_spec in + let spec = Caqti_type.[ int; int; int; int; option int64; int ] in + let encode t = Ok (hlist_to_tuple spec (to_hlist t)) in + let decode t = Ok (of_hlist (tuple_to_hlist spec t)) in + Caqti_type.custom ~encode ~decode (to_rep spec) + + let load_block (module Conn : CONNECTION) ~block_id = + Conn.collect_list + (Caqti_request.collect Caqti_type.int typ + {sql| SELECT block_id, internal_command_id, + sequence_no, secondary_sequence_no, + receiver_account_creation_fee_paid, + receiver_balance + FROM blocks_internal_commands + WHERE block_id = $1 + ORDER BY sequence_no, secondary_sequence_no + |sql} ) + block_id + end + + module Internal_command = struct + type t = + { typ : string + ; receiver_id : int + ; fee : int64 + ; token : int64 + ; hash : string + } + + let typ = + let encode t = Ok ((t.typ, t.receiver_id, t.fee, t.token), t.hash) in + let decode ((typ, receiver_id, fee, token), hash) = + Ok { typ; receiver_id; fee; token; hash } + in + let rep = Caqti_type.(tup2 (tup4 string int int64 int64) string) in + Caqti_type.custom ~encode ~decode rep + + let load (module Conn : CONNECTION) ~(id : int) = + Conn.find + (Caqti_request.find Caqti_type.int typ + {sql| SELECT type,receiver_id,fee,token,hash + FROM internal_commands + WHERE id = ? + |sql} ) + id + end + + module User_command = struct + type t = + { typ : string + ; fee_payer_id : int + ; source_id : int + ; receiver_id : int + ; fee_token : int64 + ; token : int64 + ; nonce : int + ; amount : int64 option + ; fee : int64 + ; valid_until : int64 option + ; memo : string + ; hash : string + } + [@@deriving hlist] + + let typ = + let open Mina_caqti.Type_spec in + let spec = + Caqti_type. + [ string + ; int + ; int + ; int + ; int64 + ; int64 + ; int + ; option int64 + ; int64 + ; option int64 + ; string + ; string + ] + in + let encode t = Ok (hlist_to_tuple spec (to_hlist t)) in + let decode t = Ok (of_hlist (tuple_to_hlist spec t)) in + Caqti_type.custom ~encode ~decode (to_rep spec) + + let load (module Conn : CONNECTION) ~(id : int) = + Conn.find + (Caqti_request.find Caqti_type.int typ + {sql| SELECT type,fee_payer_id,source_id,receiver_id, + fee_token,token, + nonce,amount,fee,valid_until,memo,hash + FROM user_commands + WHERE id = ? |sql} ) + id + end +end + +module Berkeley = struct + module Block = struct + let count (module Conn : CONNECTION) = + Conn.find + (Caqti_request.find Caqti_type.unit Caqti_type.int + {sql| SELECT count (*) + FROM blocks + |sql} ) + + let greatest_block_height (module Conn : CONNECTION) = + Conn.find + (Caqti_request.find Caqti_type.unit Caqti_type.int64 + {sql| SELECT height + FROM blocks + WHERE chain_status <> 'orphaned' + ORDER BY height DESC + LIMIT 1 + |sql} ) + end +end diff --git a/src/app/extract_blocks/dune b/src/app/extract_blocks/dune index a0df1831b25..e3bf8046c9e 100644 --- a/src/app/extract_blocks/dune +++ b/src/app/extract_blocks/dune @@ -18,6 +18,7 @@ uri async.async_command ;; local libraries + consensus_vrf mina_wire_types mina_base mina_base.import diff --git a/src/app/extract_blocks/extract_blocks.ml b/src/app/extract_blocks/extract_blocks.ml index f4fa0ac2e77..3605538ae44 100644 --- a/src/app/extract_blocks/extract_blocks.ml +++ b/src/app/extract_blocks/extract_blocks.ml @@ -49,8 +49,7 @@ let fill_in_block pool (block : Archive_lib.Processor.Block.t) : let%bind creator = pk_of_id block.creator_id in let%bind block_winner = pk_of_id block.block_winner_id in let last_vrf_output = - (* keep hex encoding *) - block.last_vrf_output + Base64.decode_exn ~alphabet:Base64.uri_safe_alphabet block.last_vrf_output in let%bind snarked_ledger_hash_str = query_db ~f:(fun db -> diff --git a/src/app/replayer/replayer.ml b/src/app/replayer/replayer.ml index a47e9eaeca4..e35e3622d2f 100644 --- a/src/app/replayer/replayer.ml +++ b/src/app/replayer/replayer.ml @@ -1574,7 +1574,7 @@ let () = Command.async ~summary:"Replay transactions from Mina archive database" (let%map input_file = Param.flag "--input-file" - ~doc:"file File containing the genesis ledger" + ~doc:"file File containing the starting ledger" Param.(required string) and output_file_opt = Param.flag "--output-file" diff --git a/src/dune-project b/src/dune-project index d67327cecab..6ec170959d6 100644 --- a/src/dune-project +++ b/src/dune-project @@ -6,6 +6,7 @@ (package (name base58_check)) (package (name bash_colors)) (package (name berkeley_account_tables)) +(package (name berkeley_migration)) (package (name best_tip_merger)) (package (name best_tip_prover)) (package (name bignum_bigint)) diff --git a/src/lib/consensus/vrf/consensus_vrf.ml b/src/lib/consensus/vrf/consensus_vrf.ml index 550f59dd770..c3cb981f21c 100644 --- a/src/lib/consensus/vrf/consensus_vrf.ml +++ b/src/lib/consensus/vrf/consensus_vrf.ml @@ -172,13 +172,17 @@ module Output = struct let of_yojson = function | `String s -> - Result.map_error - (Base64.decode ~alphabet:Base64.uri_safe_alphabet s) - ~f:(function `Msg err -> - sprintf - "Error decoding vrf output in \ - Vrf.Output.Truncated.Stable.V1.of_yojson: %s" - err ) + (* missing type equation somewhere, add explicit type *) + ( match Base64.decode ~alphabet:Base64.uri_safe_alphabet s with + | Ok b64 -> + Ppx_deriving_yojson_runtime.Result.Ok b64 + | Error (`Msg err) -> + Error + (sprintf + "Error decoding vrf output in \ + Vrf.Output.Truncated.Stable.V1.of_yojson: %s" + err ) + : (t, string) Ppx_deriving_yojson_runtime.Result.result ) | _ -> Error "Vrf.Output.Truncated.Stable.V1.of_yojson: Expected a string" @@ -195,6 +199,9 @@ module Output = struct let description = "Vrf Truncated Output" end) + (* don't want the yojson functions from Make_base58_check *) + [%%define_locally Stable.Latest.(of_yojson, to_yojson)] + open Tick let length_in_bits = Int.min 256 (Field.size_in_bits - 2) diff --git a/src/lib/consensus/vrf/dune b/src/lib/consensus/vrf/dune index d9057e3abc1..9b76dd6b1a2 100644 --- a/src/lib/consensus/vrf/dune +++ b/src/lib/consensus/vrf/dune @@ -47,6 +47,7 @@ kimchi_bindings kimchi_types pasta_bindings + ppx_deriving_yojson.runtime ppx_version.runtime ) (inline_tests (flags -verbose -show-counts))