diff --git a/.gitmodules b/.gitmodules index f82c5ecf..f7c3adf9 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "autobahn"] path = autobahn url = https://github.com/grapheo12/autobahn-artifact.git - branch = autobahn-pirateship-baseline-2 + branch = autobahn-tputvslatency diff --git a/Cargo.toml b/Cargo.toml index 39e093ef..8fc34cce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,6 +103,7 @@ evil = [] extra_2pc = [] no_pipeline = [] extra_qc_check = [] +witness_forwarding = [] # Applications app_logger = [] @@ -116,8 +117,10 @@ chained_pbft = ["round_robin_leader", "always_sign", "view_change"] jolteon = ["round_robin_leader", "always_sign", "view_change", "no_pipeline"] hotstuff = ["round_robin_leader", "always_sign", "view_change", "no_pipeline", "extra_qc_check"] pirateship = ["round_robin_leader", "dynamic_sign", "view_change"] +peerreview = ["fixed_leader", "always_sign", "witness_forwarding"] -default = ["pirateship", "app_logger", "storage", "fast_path", "platforms"] +default = ["peerreview", "app_logger", "storage", "platforms", "fast_path"] +# default = ["pirateship", "app_logger", "storage", "platforms", "fast_path"] # default = ["jolteon", "app_logger", "storage"] # default = ["engraft", "app_kvs", "storage", "fast_path", "platforms"] diff --git a/Makefile b/Makefile index c6ef0091..57bb9d76 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,11 @@ pirateship_logger: CC=clang CXX=clang++ cargo build --release + +.PHONY: peerreview +peerreview: + CC=clang CXX=clang++ cargo build --release --features peerreview,app_logger,storage,platforms,fast_path --no-default-features + .PHONY: contrib contrib: CC=clang CXX=clang++ cargo build --release --manifest-path contrib/Cargo.toml diff --git a/autobahn b/autobahn index d5441811..8a48c9af 160000 --- a/autobahn +++ b/autobahn @@ -1 +1 @@ -Subproject commit d5441811353f00db776d2c65f3e2bf32cb71e771 +Subproject commit 8a48c9af6cffebdeefc3de61110d72fff8c32a32 diff --git a/deployment/azure-tf/main.tf b/deployment/azure-tf/main.tf index 803627de..72149a32 100644 --- a/deployment/azure-tf/main.tf +++ b/deployment/azure-tf/main.tf @@ -74,7 +74,9 @@ resource "azurerm_public_ip" "sevpool_public_ip" { count = length(local.sevpool_ids_flattened_) location = var.platform_locations[local.sevpool_ids_flattened_[count.index][0]] resource_group_name = azurerm_resource_group.rg.name - allocation_method = "Dynamic" + # allocation_method = "Dynamic" + allocation_method = "Static" + sku = "Standard" } resource "azurerm_public_ip" "tdxpool_public_ip" { @@ -82,7 +84,9 @@ resource "azurerm_public_ip" "tdxpool_public_ip" { count = length(local.tdxpool_ids_flattened_) location = var.platform_locations[local.tdxpool_ids_flattened_[count.index][0]] resource_group_name = azurerm_resource_group.rg.name - allocation_method = "Dynamic" + # allocation_method = "Dynamic" + allocation_method = "Static" + sku = "Standard" } resource "azurerm_public_ip" "clientpool_public_ip" { @@ -90,7 +94,9 @@ resource "azurerm_public_ip" "clientpool_public_ip" { count = length(local.clientpool_ids_flattened_) location = var.platform_locations[local.clientpool_ids_flattened_[count.index][0]] resource_group_name = azurerm_resource_group.rg.name - allocation_method = "Dynamic" + # allocation_method = "Dynamic" + allocation_method = "Static" + sku = "Standard" } resource "azurerm_public_ip" "nonteepool_public_ip" { @@ -98,7 +104,9 @@ resource "azurerm_public_ip" "nonteepool_public_ip" { count = length(local.nonteepool_ids_flattened_) location = var.platform_locations[local.nonteepool_ids_flattened_[count.index][0]] resource_group_name = azurerm_resource_group.rg.name - allocation_method = "Dynamic" + # allocation_method = "Dynamic" + allocation_method = "Static" + sku = "Standard" } @@ -255,8 +263,8 @@ resource "azurerm_managed_disk" "sevpool_disk" { resource "azurerm_managed_disk" "tdxpool_disk" { name = "tdxpoolDataDisk${count.index}" - count = length(local.sevpool_ids_flattened_) - location = var.platform_locations[local.sevpool_ids_flattened_[count.index][0]] + count = length(local.tdxpool_ids_flattened_) + location = var.platform_locations[local.tdxpool_ids_flattened_[count.index][0]] resource_group_name = azurerm_resource_group.rg.name storage_account_type = "Premium_LRS" disk_size_gb = 2048 # P40 ssd Gives enough throughput for NIC bottleneck in 7+ machine clusters. @@ -265,8 +273,8 @@ resource "azurerm_managed_disk" "tdxpool_disk" { resource "azurerm_managed_disk" "nonteepool_disk" { name = "nonteepoolDataDisk${count.index}" - count = length(local.sevpool_ids_flattened_) - location = var.platform_locations[local.sevpool_ids_flattened_[count.index][0]] + count = length(local.nonteepool_ids_flattened_) + location = var.platform_locations[local.nonteepool_ids_flattened_[count.index][0]] resource_group_name = azurerm_resource_group.rg.name storage_account_type = "Premium_LRS" disk_size_gb = 2048 # P40 ssd Gives enough throughput for NIC bottleneck in 7+ machine clusters. @@ -400,7 +408,7 @@ resource "azurerm_linux_virtual_machine" "clientpool_vm" { location = var.platform_locations[local.clientpool_ids_flattened_[count.index][0]] resource_group_name = azurerm_resource_group.rg.name network_interface_ids = [azurerm_network_interface.clientpool_nic[count.index].id] - size = "Standard_D8ds_v5" + size = "Standard_D8ds_v6" # delete_os_disk_on_termination = true # delete_data_disks_on_termination = true diff --git a/deployment/azure-tf/setups/lan-2.tfvars b/deployment/azure-tf/setups/lan-2.tfvars new file mode 100644 index 00000000..711499bc --- /dev/null +++ b/deployment/azure-tf/setups/lan-2.tfvars @@ -0,0 +1,7 @@ +platform_locations = ["eastus"] +sevpool_count = [7] +# sevpool_count = [1] +tdxpool_count = [0] +nonteepool_count = [0] +clientpool_count = [1] +# clientpool_count = [1] diff --git a/deployment/azure-tf/setups/lan.tfvars b/deployment/azure-tf/setups/lan.tfvars index 5e5d3589..8fedd814 100644 --- a/deployment/azure-tf/setups/lan.tfvars +++ b/deployment/azure-tf/setups/lan.tfvars @@ -1,5 +1,7 @@ platform_locations = ["eastus"] sevpool_count = [7] +# sevpool_count = [1] tdxpool_count = [0] nonteepool_count = [0] clientpool_count = [3] +# clientpool_count = [1] diff --git a/experiments/lan_manual.toml b/experiments/lan_manual.toml new file mode 100644 index 00000000..1fe6fee2 --- /dev/null +++ b/experiments/lan_manual.toml @@ -0,0 +1,181 @@ +workdir = "deployment_artifacts" +project_home = "https://github.com/grapheo12/pirateship" + +[deployment_config] +mode = "manual" +ssh_key = "id_ed25519" +ssh_user = "shubham_mishra" +node_port_base = 3000 + + +[deployment_config.node_list] +[deployment_config.node_list.nodepool_vm0] +private_ip = "10.0.1.16" +public_ip = "34.9.210.58" +tee_type = "sev" +region_id = 0 + +[deployment_config.node_list.nodepool_vm1] +private_ip = "10.0.1.10" +public_ip = "34.132.89.189" +tee_type = "sev" +region_id = 0 + +[deployment_config.node_list.nodepool_vm2] +private_ip = "10.0.1.15" +public_ip = "34.170.163.79" +tee_type = "sev" +region_id = 0 + +[deployment_config.node_list.nodepool_vm3] +private_ip = "10.0.1.13" +public_ip = "34.45.236.240" +tee_type = "sev" +region_id = 0 + +[deployment_config.node_list.nodepool_vm4] +private_ip = "10.0.1.14" +public_ip = "34.57.92.93" +tee_type = "sev" +region_id = 0 + +[deployment_config.node_list.nodepool_vm5] +private_ip = "10.0.1.11" +public_ip = "34.10.69.175" +tee_type = "sev" +region_id = 0 + +[deployment_config.node_list.nodepool_vm6] +private_ip = "10.0.1.12" +public_ip = "104.198.56.254" +tee_type = "sev" +region_id = 0 + +[deployment_config.node_list.clientpool_vm0] +private_ip = "10.0.2.3" +public_ip = "34.31.76.85" +[deployment_config.node_list.clientpool_vm1] +private_ip = "10.0.2.4" +public_ip = "34.31.167.16" +[deployment_config.node_list.clientpool_vm2] +private_ip = "10.0.2.5" +public_ip = "35.239.110.106" + + +[node_config] + + +[node_config.net_config] +client_max_retry = 10 + +[node_config.rpc_config] +recv_buffer_size = 32768 +channel_depth = 1000 + +[node_config.consensus_config] +commit_index_gap_soft = 250 +commit_index_gap_hard = 500 +liveness_u = 1 +max_backlog_batch_size = 1000 +signature_max_delay_blocks = 10 +signature_max_delay_ms = 102 # roughly batch_max_delay_ms * signature_max_delay_blocks +num_crypto_workers = 5 +view_timeout_ms = 4000 +batch_max_delay_ms = 2 + +[node_config.consensus_config.log_storage_config.RocksDB] +write_buffer_size = 67_108_864 # 64 MiB +max_write_buffer_number = 8 +max_write_buffers_to_merge = 1 + +[node_config.app_config] +logger_stats_report_ms = 100 +checkpoint_interval_ms = 1000 + +[node_config.evil_config] +simulate_byzantine_behavior = false +byzantine_start_block = 0 + + +[client_config] +full_duplex = true +client_sub_id = 0 # This is filled up later by the caller code. + +[client_config.net_config] +client_max_retry = 10 + +[client_config.workload_config] +num_requests = 100_000_000 +request_config = "Blanks" +max_concurrent_requests = 16 + + +[[experiments]] +name = "pirateship" +repeats = 1 +num_nodes = 4 +node_distribution = "uniform" +build_command = "make signed_raft_logger" +duration = 60 + +[experiments.sweeping_parameters] +# num_clients = [10, 20, 30, 50, 100, 200, 400, 500, 600] +num_clients = [400, 500, 600] + +# [[experiments]] +# name = "autobahn" +# type = "autobahn" +# repeats = 1 +# num_nodes = 7 +# node_distribution = "uniform" +# build_command = "make -C autobahn" +# duration = 60 + +# [experiments.sweeping_parameters] +# # num_clients = [100_000, 120_000, 150_000, 200_000, 250_000, 300_000, 310_000, 320_000] +# num_clients = [250_000, 300_000] + + +# [[experiments]] +# name = "pirateship_sig_1" +# repeats = 1 +# num_nodes = 7 +# node_distribution = "uniform" +# build_command = "make" +# duration = 60 + +# [experiments.node_config.consensus_config] +# signature_max_delay_blocks = 1 + +# [experiments.sweeping_parameters] +# num_clients = [10, 20, 30, 50, 100, 200, 400, 500, 600] + + + + +[[results]] +name = "tput_latency_client_sweep" +plotter = "tput_latency_sweep" +ramp_up = 5 +ramp_down = 2 +output = "lan_final.pdf" +force_parse = true +# skip_indices = [0, 1] + +[results.legends] +# Experiment group to legend mapping +pirateship = "ps(sig=10)+ps(sig=10)-aud+byz" +pirateship_sig_1 = "ps(sig=1)+ps(sig=1)-aud+byz" +hotstuff = "hotstuff+onlybyz" +engraft = "engraft+onlybyz" +signed_raft = "signed_raft(n=5)" +autobahn = "autobahn" +signed_raft_n7 = "signed_raft(n=7)" + + +# [results.partitions] +# cft = ["ps(sig=10)", "ps(sig=1)", "engraft", "signed_raft(n=5)", "signed_raft(n=7)"] +# bft = ["ps(sig=10)-aud", "ps(sig=1)-aud", "hotstuff", "autobahn"] + +[results.font] +size = 55 diff --git a/experiments/lan_tee_experiments.toml b/experiments/lan_tee_experiments.toml index fac48bb7..b7671e58 100644 --- a/experiments/lan_tee_experiments.toml +++ b/experiments/lan_tee_experiments.toml @@ -132,9 +132,9 @@ num_clients = [10, 20, 30, 50, 100, 200, 400, 500, 600] [[results]] name = "tput_latency_client_sweep" plotter = "tput_latency_sweep" -ramp_up = 15 -ramp_down = 15 -output = "lan_final.pdf" +ramp_up = 12 +ramp_down = 12 +output = "lan_with_peerreview.pdf" force_parse = true skip_indices = [0, 1] @@ -147,10 +147,11 @@ engraft = "engraft+onlybyz" signed_raft = "signed_raft(n=5)" autobahn = "autobahn" signed_raft_n7 = "signed_raft(n=7)" +peerreview = "peerreview" [results.partitions] -cft = ["ps(sig=10)", "ps(sig=1)", "engraft", "signed_raft(n=5)", "signed_raft(n=7)"] +cft = ["ps(sig=10)", "ps(sig=1)", "engraft", "signed_raft(n=5)", "signed_raft(n=7)", "peerreview"] bft = ["ps(sig=10)-aud", "ps(sig=1)-aud", "hotstuff", "autobahn"] [results.font] diff --git a/experiments/lan_tee_experiments4.toml b/experiments/lan_tee_experiments4.toml index 035cb7d8..ad597ee0 100644 --- a/experiments/lan_tee_experiments4.toml +++ b/experiments/lan_tee_experiments4.toml @@ -64,22 +64,23 @@ build_command = "make" duration = 60 [experiments.sweeping_parameters] -num_clients = [10, 20, 30, 50, 100, 200, 400, 500, 600] +# num_clients = [10, 20, 30, 50, 100, 200, 400, 500, 600] +num_clients = [100, 200, 400] -[[experiments]] -name = "pirateship_sig_1" -repeats = 1 -num_nodes = 7 -node_distribution = "uniform" -build_command = "make" -duration = 60 +# [[experiments]] +# name = "pirateship_sig_1" +# repeats = 1 +# num_nodes = 7 +# node_distribution = "uniform" +# build_command = "make" +# duration = 60 -[experiments.node_config.consensus_config] -signature_max_delay_blocks = 1 +# [experiments.node_config.consensus_config] +# signature_max_delay_blocks = 1 -[experiments.sweeping_parameters] -num_clients = [10, 20, 30, 50, 100, 200, 400, 500, 600] +# [experiments.sweeping_parameters] +# num_clients = [10, 20, 30, 50, 100, 200, 400, 500, 600] @@ -91,7 +92,7 @@ ramp_up = 15 ramp_down = 15 output = "lan_final.pdf" force_parse = true -skip_indices = [0, 1] +# skip_indices = [0, 1] [results.legends] # Experiment group to legend mapping diff --git a/experiments/lan_tee_experiments5.toml b/experiments/lan_tee_experiments5.toml new file mode 100644 index 00000000..788dc488 --- /dev/null +++ b/experiments/lan_tee_experiments5.toml @@ -0,0 +1,130 @@ +workdir = "deployment_artifacts" +project_home = "https://github.com/grapheo12/pirateship" + +[deployment_config] +mode = "lan" +ssh_key = "cluster_key.pem" +ssh_user = "pftadmin" +node_port_base = 3000 + + +[node_config] + +[node_config.net_config] +client_max_retry = 10 + +[node_config.rpc_config] +recv_buffer_size = 32768 +channel_depth = 1000 + +[node_config.consensus_config] +commit_index_gap_soft = 250 +commit_index_gap_hard = 500 +liveness_u = 2 +max_backlog_batch_size = 1000 +signature_max_delay_blocks = 1 +signature_max_delay_ms = 102 # roughly batch_max_delay_ms * signature_max_delay_blocks +num_crypto_workers = 5 +view_timeout_ms = 4000 +batch_max_delay_ms = 2 + +[node_config.consensus_config.log_storage_config.RocksDB] +write_buffer_size = 2147483648 +max_write_buffer_number = 1 +max_write_buffers_to_merge = 1 + +[node_config.app_config] +logger_stats_report_ms = 100 +checkpoint_interval_ms = 1000 + +[node_config.evil_config] +simulate_byzantine_behavior = false +byzantine_start_block = 0 + + +[client_config] +full_duplex = true +client_sub_id = 0 # This is filled up later by the caller code. + +[client_config.net_config] +client_max_retry = 10 + +[client_config.workload_config] +num_requests = 100_000_000 +request_config = "Blanks" +max_concurrent_requests = 32 + + +# [[experiments]] +# name = "peerreview" +# repeats = 1 +# num_nodes = 7 +# node_distribution = "uniform" +# build_command = "make peerreview" +# duration = 60 +# # seq_start = 5 + +# [experiments.sweeping_parameters] +# # num_clients = [100, 200, 400, 500, 600] +# num_clients = [10, 20, 50, 70, 100, 105, 110] +# num_clients = [105, 110, 115] +# num_clients = [100] + + +# [[experiments]] +# name = "pirateship_sig_1" +# repeats = 1 +# num_nodes = 7 +# node_distribution = "uniform" +# build_command = "make" +# duration = 60 + +# [experiments.node_config.consensus_config] +# signature_max_delay_blocks = 1 + +# [experiments.sweeping_parameters] +# num_clients = [100, 200, 400, 500, 600] + +[[experiments]] +name = "autobahn" +type = "autobahn" +repeats = 1 +num_nodes = 7 +node_distribution = "uniform" +build_command = "make -C autobahn" +duration = 60 + +[experiments.sweeping_parameters] +# num_clients = [150_000, 170_000, 200_000, 205_000, 210_000, 215_000] +num_clients = [10_000, 20_000, 50_000, 70_000, 100_000, 120_000, 150_000] +# num_clients = [250_000] + + + +[[results]] +name = "tput_latency_client_sweep" +plotter = "tput_latency_sweep" +ramp_up = 15 +ramp_down = 15 +output = "lan_final.pdf" +force_parse = true +# skip_indices = [6] + +[results.legends] +# Experiment group to legend mapping +pirateship = "ps(sig=10)+ps(sig=10)-aud+byz" +pirateship_sig_1 = "ps(sig=1)+ps(sig=1)-aud+byz" +hotstuff = "hotstuff+onlybyz" +engraft = "engraft+onlybyz" +signed_raft = "signed_raft(n=5)" +autobahn = "autobahn" +signed_raft_n7 = "signed_raft(n=7)" +peerreview = "peerreview" + + +# [results.partitions] +# cft = ["ps(sig=10)", "ps(sig=1)", "engraft", "signed_raft(n=5)", "signed_raft(n=7)"] +# bft = ["ps(sig=10)-aud", "ps(sig=1)-aud", "hotstuff", "autobahn"] + +[results.font] +size = 55 diff --git a/experiments/lan_tee_experiments6.toml b/experiments/lan_tee_experiments6.toml new file mode 100644 index 00000000..ae7a3566 --- /dev/null +++ b/experiments/lan_tee_experiments6.toml @@ -0,0 +1,136 @@ +workdir = "deployment_artifacts" +project_home = "https://github.com/grapheo12/pirateship" + +[deployment_config] +# mode = "lan-2" +mode = "lan" +ssh_key = "cluster_key.pem" +ssh_user = "pftadmin" +node_port_base = 3000 + + +[node_config] + +[node_config.net_config] +client_max_retry = 10 + +[node_config.rpc_config] +recv_buffer_size = 32768 +channel_depth = 1000 + +[node_config.consensus_config] +commit_index_gap_soft = 25000000 +commit_index_gap_hard = 50000000 +liveness_u = 2 +max_backlog_batch_size = 1000 +signature_max_delay_blocks = 1 +signature_max_delay_ms = 102 # roughly batch_max_delay_ms * signature_max_delay_blocks +num_crypto_workers = 5 +view_timeout_ms = 4000 +batch_max_delay_ms = 2 + +[node_config.consensus_config.log_storage_config.RocksDB] +write_buffer_size = 2147483648 +max_write_buffer_number = 1 +max_write_buffers_to_merge = 1 + +[node_config.app_config] +logger_stats_report_ms = 100 +checkpoint_interval_ms = 1000 + +[node_config.evil_config] +simulate_byzantine_behavior = false +byzantine_start_block = 0 + + +[client_config] +full_duplex = true +client_sub_id = 0 # This is filled up later by the caller code. + +[client_config.net_config] +client_max_retry = 10 + +[client_config.workload_config] +num_requests = 100_000_000 +request_config = "Blanks" +max_concurrent_requests = 32 + + +[[experiments]] +name = "peerreview" +repeats = 1 +num_nodes = 7 +node_distribution = "uniform" +build_command = "make peerreview" +duration = 60 +# seq_start = 5 + +[experiments.sweeping_parameters] +# num_clients = [100, 200, 400, 500, 600] +# num_clients = [10, 20, 50, 70, 100, 105, 110] +# num_clients = [105, 110, 115] +# num_clients = [50, 55, 60, 65] +num_clients = [10, 20, 30, 50, 100, 200, 400, 500, 600] + + + +# [[experiments]] +# name = "pirateship_sig_1" +# repeats = 1 +# num_nodes = 7 +# node_distribution = "uniform" +# build_command = "make" +# duration = 60 + +# [experiments.node_config.consensus_config] +# signature_max_delay_blocks = 1 + +# [experiments.sweeping_parameters] +# # num_clients = [100, 200, 400, 500, 600] +# num_clients = [50, 55, 60, 65] + +# [[experiments]] +# name = "autobahn" +# type = "autobahn" +# repeats = 1 +# num_nodes = 7 +# node_distribution = "uniform" +# build_command = "make -C autobahn" +# duration = 60 +# seq_start = 7 + +# [experiments.sweeping_parameters] +# # num_clients = [150_000, 170_000, 200_000, 205_000, 210_000, 215_000] +# # num_clients = [10_000, 20_000, 50_000, 70_000, 100_000, 120_000, 150_000] +# num_clients = [170_000, 200_000, 250_000, 270_000, 300_000, 320_000, 350_000] +# # num_clients = [250_000] + + + +[[results]] +name = "tput_latency_client_sweep" +plotter = "tput_latency_sweep" +ramp_up = 12 +ramp_down = 12 +output = "lan_final.pdf" +force_parse = true +# skip_indices = [0, 1, 2, 3] + +[results.legends] +# Experiment group to legend mapping +pirateship = "ps(sig=10)+ps(sig=10)-aud+byz" +pirateship_sig_1 = "ps(sig=1)+ps(sig=1)-aud+byz" +hotstuff = "hotstuff+onlybyz" +engraft = "engraft+onlybyz" +signed_raft = "signed_raft(n=5)" +autobahn = "autobahn" +signed_raft_n7 = "signed_raft(n=7)" +peerreview = "peerreview" + + +# [results.partitions] +# cft = ["ps(sig=10)", "ps(sig=1)", "engraft", "signed_raft(n=5)", "signed_raft(n=7)"] +# bft = ["ps(sig=10)-aud", "ps(sig=1)-aud", "hotstuff", "autobahn"] + +[results.font] +size = 55 diff --git a/scripts/__main__.py b/scripts/__main__.py index bbe988fc..3c4d11b4 100644 --- a/scripts/__main__.py +++ b/scripts/__main__.py @@ -222,7 +222,7 @@ def main(ctx): ) @click.option( "-d", "--workdir", required=False, - type=click.Path(file_okay=False, resolve_path=True), + type=click.Path(file_okay=False, resolve_path=False), # Need relative paths here. default=None ) def all(config, workdir): diff --git a/scripts/autobahn_experiments.py b/scripts/autobahn_experiments.py index 30cbb30e..ca4604e9 100644 --- a/scripts/autobahn_experiments.py +++ b/scripts/autobahn_experiments.py @@ -76,6 +76,7 @@ def __init__(self, addresses, base_port): port = base_port self.json = {'authorities': OrderedDict()} + id_counter = 0 for name, hosts in addresses.items(): host = hosts.pop(0) @@ -101,10 +102,12 @@ def __init__(self, addresses, base_port): self.json['authorities'][name] = { 'stake': 1, + 'id': id_counter, 'consensus': consensus_addr, 'primary': primary_addr, 'workers': workers_addr } + id_counter += 1 def primary_addresses(self, faults=0): ''' Returns an ordered list of primaries' addresses. ''' @@ -342,6 +345,10 @@ def db_path(i, j=None, path_prefix=""): worker_id = f'-{j}' if j is not None else '' return f'{path_prefix}.db-{i}{worker_id}' + @staticmethod + def db_path_client(i, path_prefix=""): + return f'{path_prefix}.db-client-{i}' + @staticmethod def logs_path(): return 'logs' @@ -363,6 +370,12 @@ def client_log_file(i, j): assert isinstance(j, int) and i >= 0 return join(PathMaker.logs_path(), f'client-{i}-{j}.log') + @staticmethod + def client_metrics_file(i, j): + assert isinstance(i, int) and i >= 0 + assert isinstance(j, int) and i >= 0 + return join(PathMaker.logs_path(), f'client-{i}-{j}.metrics') + @staticmethod def results_path(): return 'results' @@ -448,7 +461,7 @@ def printProgressBar(iteration): print() class RemoteCommittee(Committee): - def __init__(self, names, port, workers, ip_list): + def __init__(self, names, port, workers, ip_list, clients = None): assert isinstance(names, list) assert all(isinstance(x, str) for x in names) assert isinstance(port, int) @@ -458,49 +471,59 @@ def __init__(self, names, port, workers, ip_list): addresses = OrderedDict((x, [ip_list[i]]*(1+workers)) for i, x in enumerate(names)) super().__init__(addresses, port) + if clients is not None: + self.json['clients'] = clients + def get_default_node_params(num_nodes, repeats, seconds): bench_params = { - 'faults': 0, - 'nodes': num_nodes, + 'faults': 0, + 'nodes': 7, 'workers': 1, - 'rate': 200_000, + 'worker_fault_tolerance': 1, # Number of workers each client sends to + 'rate': 10_000, 'tx_size': 512, - 'duration': seconds, + 'duration': 60, + 'latency_warmup': 2, + 'latency_cooldown': 2, + 'transaction_timeout': 120, # ms - timeout for early ACKs before retry # Unused 'simulate_partition': False, - 'partition_start': seconds + 100, - 'partition_duration': 0, - 'partition_nodes': 0, + 'partition_start': 5, + 'partition_duration': 5, + 'partition_nodes': 1, } node_params = { - 'timeout_delay': 1_000, # ms + 'timeout_delay': 5_000, # ms 'header_size': 32, # bytes - 'max_header_delay': 200, # ms + 'max_header_delay': 5, # ms 'gc_depth': 50, # rounds - 'sync_retry_delay': 1_000, # ms + 'sync_retry_delay': 100, # ms 'sync_retry_nodes': 4, # number of nodes 'batch_size': 500_000, # bytes - 'max_batch_delay': 10, # ms - 'use_optimistic_tips': False, + 'max_batch_delay': 1, # ms + 'use_optimistic_tips': True, + 'use_threshold_random_coin': True, + 'optimistic_leader_only': False, 'use_parallel_proposals': True, - 'k': 1, + 'k': 4, 'use_fast_path': True, - 'fast_path_timeout': 200, + 'fast_path_timeout': 100, 'use_ride_share': False, - 'car_timeout': 2000, + 'car_timeout': 200, + 'start_slot_rounds': 1, 'simulate_asynchrony': False, - 'asynchrony_type': [], + 'asynchrony_type': [2], - 'asynchrony_start': [], #ms - 'asynchrony_duration': [], #ms - 'affected_nodes': [], - 'egress_penalty': 0, #ms + 'asynchrony_start': [10_000], #ms + 'asynchrony_duration': [4_800], #ms + 'affected_nodes': [1], + 'egress_penalty': 40, #ms - 'use_fast_sync': True, - 'use_exponential_timeouts': True, + 'use_fast_sync': False, + 'use_exponential_timeouts': False, } return bench_params, node_params @@ -552,15 +575,39 @@ def run_worker(keys, committee, store, parameters, id, debug=False, binary_name= return (f'{binary_name} {v} run --keys {keys} --committee {committee} ' f'--store {store} --parameters {parameters} worker --id {id}') + # @staticmethod + # def run_client(id, reply_addr, address, size, clients, binary_name="./benchmark_client"): + # assert isinstance(address, str) + # assert isinstance(size, int) and size > 0 + # assert isinstance(clients, int) and clients >= 0 + # # assert isinstance(nodes, list) + # # assert all(isinstance(x, str) for x in nodes) + # # nodes = f'--nodes {" ".join(nodes)}' if nodes else '' + # return f'{binary_name} {address} --size {size} --rate {clients} --client-id {id} --reply-addr {reply_addr}' + @staticmethod - def run_client(address, size, clients, nodes, binary_name="./benchmark_client"): - assert isinstance(address, str) + def run_client(client_id, reply_addr, ack_addr, committee, keys, store, size, rate, workers, threshold=1, duration=None, branch=None, metrics_file=None, transaction_timeout=150, binary_name="./node"): + assert isinstance(client_id, int) and 0 <= client_id <= 255 + assert isinstance(reply_addr, str) + assert isinstance(ack_addr, str) + assert isinstance(committee, str) + assert isinstance(keys, str) + # assert isinstance(threshold_keys, str) + assert isinstance(store, str) assert isinstance(size, int) and size > 0 - assert isinstance(clients, int) and clients >= 0 - assert isinstance(nodes, list) - assert all(isinstance(x, str) for x in nodes) - nodes = f'--nodes {" ".join(nodes)}' if nodes else '' - return f'{binary_name} {address} --size {size} --rate {clients} {nodes}' + assert isinstance(rate, int) and rate >= 0 + assert isinstance(workers, int) and workers > 0 + assert isinstance(threshold, int) and threshold > 0 + assert isinstance(transaction_timeout, int) and transaction_timeout > 0 + if duration is not None: + assert isinstance(duration, int) and duration >= 0 + if metrics_file is not None: + assert isinstance(metrics_file, str) + threshold_keys_flag = '' + metrics_flag = f' --metrics-file {metrics_file}' if metrics_file else '' + duration_flag = f' --duration {duration}' if duration is not None else '' + return f'{binary_name} -vvv run --keys {keys} {threshold_keys_flag}--committee {committee} --store {store} client --client-id {client_id} --reply-addr {reply_addr} --ack-addr {ack_addr} --size {size} --rate {rate} --workers {workers} --threshold {threshold} --transaction-timeout {transaction_timeout}{duration_flag}{metrics_flag}' + @staticmethod def kill(): @@ -572,7 +619,7 @@ def alias_binaries(origin): node, client = join(origin, 'node'), join(origin, 'benchmark_client') return f'rm node ; rm benchmark_client ; ln -s {node} . ; ln -s {client} .' -def gen_config(nodes: int, base_port: int, workers: int, node_parameters: NodeParameters, ip_list: List[str], path_prefix): +def gen_config(nodes: int, base_port: int, workers: int, node_parameters: NodeParameters, ip_list: List[str], path_prefix, clients=None): # Generate configuration files. keys = [] key_files = [PathMaker.key_file(i, path_prefix=path_prefix) for i in range(nodes)] @@ -583,7 +630,7 @@ def gen_config(nodes: int, base_port: int, workers: int, node_parameters: NodePa names = [x.name for x in keys] #print('num workers', self.workers) - committee = RemoteCommittee(names, base_port, workers, ip_list) + committee = RemoteCommittee(names, base_port, workers, ip_list, clients=clients) committee.print(PathMaker.committee_file(path_prefix=path_prefix)) node_parameters.print(PathMaker.parameters_file(path_prefix=path_prefix)) @@ -702,6 +749,30 @@ def generate_configs(self, deployment: Deployment, config_dir, log_dir): self.node_params = node_params self.bench_params = bench_params + __client_id = 0 + __client_reply_base_port = deployment.node_port_base + 2001 + + self.client_reply_mapping = defaultdict(dict) + + for client_num in range(len(client_vms)): + client = "client" + str(client_num + 1) + node_addrs = self.committee.workers_addresses(0) + for i, addresses in enumerate(node_addrs): + for (id, addr) in addresses: + self.client_reply_mapping[client][addr] = (__client_id, __client_reply_base_port + 2 * __client_id, __client_reply_base_port + 2 * __client_id + 1, client_vms[client_num].private_ip) + __client_id += 1 + + ____clients = {} + for _, v in self.client_reply_mapping.items(): + for _, (client_id, reply_addr, transaction_ack_addr, ip) in v.items(): + ____clients[client_id] = { + "replies": f"{ip}:{reply_addr}", + "transaction_acks": f"{ip}:{transaction_ack_addr}" + } + + self.committee = gen_config(self.num_nodes, deployment.node_port_base, num_workers, node_params, ip_list, config_dir, clients=____clients) + + def generate_arbiter_script(self): @@ -711,17 +782,31 @@ def generate_arbiter_script(self): # This script is generated by the experiment pipeline. DO NOT EDIT. SSH_CMD="ssh -o StrictHostKeyChecking=no -i {self.dev_ssh_key}" -SCP_CMD="rsync -avz -e 'ssh -o StrictHostKeyChecking=no -i {self.dev_ssh_key}'" + +scp_cmd() {{ + rsync -avz -e 'ssh -o StrictHostKeyChecking=no -i {self.dev_ssh_key}' "$@" +}} # SSH into each VM and run the binaries """ # Plan the binaries to run + + __node_vms = [x for x in self.binary_mapping.keys() if not("client" in x.name)] + __client_vms = [x for x in self.binary_mapping.keys() if "client" in x.name] + __binary_mapping = OrderedDict() + for vm in __node_vms: + __binary_mapping[vm] = self.binary_mapping[vm] + for vm in __client_vms: + __binary_mapping[vm] = self.binary_mapping[vm] + # All nodes spawned before clients. + clients_have_slept = False + for repeat_num in range(self.repeats): print("Running repeat", repeat_num) _script = script_base[:] curr_client_count = 0 - for vm, bin_list in self.binary_mapping.items(): + for vm, bin_list in __binary_mapping.items(): for bin in bin_list: if "node" in bin: binary_name = f"{self.remote_workdir}/build/node" @@ -730,12 +815,13 @@ def generate_arbiter_script(self): if not config_dir.endswith("/"): config_dir += "/" log_dir = os.path.join(self.remote_workdir, "logs") + db_dir = "/data/" if not log_dir.endswith("/"): log_dir += "/" primary_cmd = CommandMaker.run_primary( PathMaker.key_file(node_num, path_prefix=config_dir), PathMaker.committee_file(path_prefix=config_dir), - PathMaker.db_path(node_num, path_prefix=log_dir), + PathMaker.db_path(node_num, path_prefix=db_dir), PathMaker.parameters_file(path_prefix=config_dir), debug=False, binary_name=binary_name @@ -751,7 +837,7 @@ def generate_arbiter_script(self): worker_cmd = CommandMaker.run_worker( PathMaker.key_file(node_num, path_prefix=config_dir), PathMaker.committee_file(path_prefix=config_dir), - PathMaker.db_path(node_num, worker_num, path_prefix=log_dir), + PathMaker.db_path(node_num, worker_num, path_prefix=db_dir), PathMaker.parameters_file(path_prefix=config_dir), worker_num, debug=False, @@ -766,16 +852,33 @@ def generate_arbiter_script(self): """ elif "client" in bin: - binary_name = f"{self.remote_workdir}/build/benchmark_client" + if not clients_have_slept: + _script += f""" +sleep 10 +""" + clients_have_slept = True + # binary_name = f"{self.remote_workdir}/build/benchmark_client" + binary_name = f"{self.remote_workdir}/build/node" num_clients = self.clients_per_vm[curr_client_count] curr_client_count += 1 tx_size = self.bench_params['tx_size'] node_addrs = self.committee.workers_addresses(0) for i, addresses in enumerate(node_addrs): for (id, addr) in addresses: + client_id, reply_addr, transaction_ack_addr, _ = self.client_reply_mapping[bin][addr] cmd = CommandMaker.run_client( - addr, tx_size, num_clients, - [x for y in node_addrs for _, x in y], + client_id, f"0.0.0.0:{reply_addr}", f"0.0.0.0:{transaction_ack_addr}", + PathMaker.committee_file(path_prefix=config_dir), + PathMaker.key_file(node_num, path_prefix=config_dir), + # PathMaker.threshold_key_file(node_num, path_prefix=config_dir), + PathMaker.db_path_client(client_id, path_prefix=db_dir), + tx_size, num_clients, + self.bench_params['worker_fault_tolerance'], + threshold=1, + duration=self.duration, + branch=None, + metrics_file=f"{self.remote_workdir}/logs/{repeat_num}/{bin}-{i}-{id}.metrics", + transaction_timeout=150, binary_name=binary_name ) _script += f""" @@ -804,17 +907,31 @@ def generate_arbiter_script(self): if "node" in bin: binary_name = "node" elif "client" in bin: - binary_name = "benchmark" # "benchmark_client" is more than 15 chars and pkill doesn't like that + binary_name = "node" - # Copy the logs back + # Kill individually _script += f""" $SSH_CMD {self.dev_ssh_user}@{vm.public_ip} 'pkill -9 -c {binary_name}' || true -$SSH_CMD {self.dev_ssh_user}@{vm.public_ip} 'rm -rf {self.remote_workdir}/logs/.db-*' || true -$SCP_CMD {self.dev_ssh_user}@{vm.public_ip}:{self.remote_workdir}/logs/{repeat_num}/ {self.remote_workdir}/logs/{repeat_num}/ || true +$SSH_CMD {self.dev_ssh_user}@{vm.public_ip} 'rm -rf /data/.db-*' || true +""" + + _script += f""" +sleep 5 +""" + # Copy the logs back + for vm in set(list(self.binary_mapping.keys())): + _script += f""" +scp_cmd {self.dev_ssh_user}@{vm.public_ip}:{self.remote_workdir}/logs/{repeat_num}/ {self.remote_workdir}/logs/{repeat_num}/ || true """ _script += f""" -sleep 2 +sleep 5 +""" + + # Try again. Since metrics files somehow don't get copied the first time. + for vm in set(list(self.binary_mapping.keys())): + _script += f""" +scp_cmd {self.dev_ssh_user}@{vm.public_ip}:{self.remote_workdir}/logs/{repeat_num}/ {self.remote_workdir}/logs/{repeat_num}/ || true """ # pkill -9 -c server also kills tmux-server. So we can't run a server on the dev VM. diff --git a/scripts/autobahn_logs.py b/scripts/autobahn_logs.py index a62cfb78..1054fd2c 100644 --- a/scripts/autobahn_logs.py +++ b/scripts/autobahn_logs.py @@ -6,66 +6,559 @@ from os.path import join from re import findall, search from statistics import mean, median +import pandas as pd +import numpy as np +from pathlib import Path +import json + class ParseError(Exception): pass -class LogParser: - def __init__(self, clients, primaries, workers, faults=0): - inputs = [clients, primaries, workers] - assert all(isinstance(x, list) for x in inputs) - assert all(isinstance(x, str) for y in inputs for x in y) - assert all(x for x in inputs) +# class LogParser: +# def __init__(self, clients, primaries, workers, faults=0): +# inputs = [clients, primaries, workers] +# assert all(isinstance(x, list) for x in inputs) +# assert all(isinstance(x, str) for y in inputs for x in y) +# assert all(x for x in inputs) + +# self.faults = faults +# if isinstance(faults, int): +# self.committee_size = len(primaries) + int(faults) +# self.workers = len(workers) // len(primaries) +# else: +# self.committee_size = '?' +# self.workers = '?' + +# # Parse the clients logs. +# try: +# with Pool() as p: +# results = p.map(self._parse_clients, clients) +# except (ValueError, IndexError, AttributeError) as e: +# raise ParseError(f'Failed to parse clients\' logs: {e}') +# self.size, self.rate, self.start, misses, self.sent_samples \ +# = zip(*results) +# self.misses = sum(misses) + +# # Parse the primaries logs. +# try: +# with Pool() as p: +# results = p.map(self._parse_primaries, primaries) +# except (ValueError, IndexError, AttributeError) as e: +# raise ParseError(f'Failed to parse nodes\' logs: {e}') +# proposals, commits, self.configs, primary_ips, client_latencies = zip(*results) +# self.proposals = self._merge_results([x.items() for x in proposals]) +# self.commits = self._merge_results([x.items() for x in commits]) +# self.all_commits = self._pile_results([x.items() for x in commits]) + +# self.client_latencies = self._avg_results(client_latencies) + +# # Parse the workers logs. +# try: +# with Pool() as p: +# results = p.map(self._parse_workers, workers) +# except (ValueError, IndexError, AttributeError) as e: +# raise ParseError(f'Failed to parse workers\' logs: {e}') +# sizes, self.received_samples, workers_ips = zip(*results) +# self.sizes = { +# k: v for x in sizes for k, v in x.items() if k in self.commits +# } + +# # Determine whether the primary and the workers are collocated. +# self.collocate = set(primary_ips) == set(workers_ips) + +# # Check whether clients missed their target rate. +# if self.misses != 0: +# print( +# f'Clients missed their target rate {self.misses:,} time(s)' +# ) + +# def _merge_results(self, input): +# # Keep the earliest timestamp. +# merged = {} +# for x in input: +# for k, v in x: +# if not k in merged or merged[k] > v: +# merged[k] = v +# return merged + +# def _avg_results(self, input): +# # Keep the earliest timestamp. +# merged = {} +# for x in input: +# for k, v in x.items(): +# if not k in merged: +# merged[k] = [v] +# else: +# merged[k].append(v) + +# merged = {k: mean(v) for k, v in merged.items()} +# return merged + +# def _pile_results(self, input): +# merged = {} +# for x in input: +# for k, v in x: +# if not k in merged: +# merged[k] = [v] +# else: +# merged[k].append(v) - self.faults = faults - if isinstance(faults, int): - self.committee_size = len(primaries) + int(faults) - self.workers = len(workers) // len(primaries) - else: - self.committee_size = '?' - self.workers = '?' +# return merged - # Parse the clients logs. - try: - with Pool() as p: - results = p.map(self._parse_clients, clients) - except (ValueError, IndexError, AttributeError) as e: - raise ParseError(f'Failed to parse clients\' logs: {e}') - self.size, self.rate, self.start, misses, self.sent_samples \ - = zip(*results) - self.misses = sum(misses) +# def _parse_clients(self, log): +# # if search(r'Error', log) is not None: +# # raise ParseError('Client(s) panicked') + +# size = int(search(r'Transactions size: (\d+)', log).group(1)) +# rate = int(search(r'Transactions rate: (\d+)', log).group(1)) + +# tmp = search(r'\[(.*Z) .* Start ', log).group(1) +# start = self._to_posix(tmp) + +# misses = len(findall(r'rate too high', log)) + +# tmp = findall(r'\[(.*Z) .* sample transaction (\d+)', log) +# samples = {int(s): self._to_posix(t) for t, s in tmp} + +# # tmp = findall(r'Client latency: (\d+) ms', log) +# # client_latencies = [int(x) for x in tmp] +# # if len(client_latencies) == 0: +# # client_latencies = [0] + +# return size, rate, start, misses, samples + +# def _parse_primaries(self, log): +# # if search(r'(?:panicked|Error)', log) is not None: +# # raise ParseError('Primary(s) panicked') + +# tmp = findall(r'\[(.*Z) .* Created B\d+\([^ ]+\) -> ([^ ]+=)', log) +# tmp = [(d, self._to_posix(t)) for t, d in tmp] +# proposals = self._merge_results([tmp]) + +# tmp = findall(r'\[(.*Z) .* Committed B\d+\([^ ]+\) -> ([^ ]+=)', log) +# tmp = [(d, self._to_posix(t)) for t, d in tmp] +# commits = self._merge_results([tmp]) + +# latencies = {} +# for d in commits.keys(): +# if d in proposals: +# latencies[d] = commits[d] - proposals[d] + + + + +# configs = { +# #'timeout_delay': int( +# # search(r'Timeout delay .* (\d+)', log).group(1) +# #), +# 'header_size': int( +# search(r'Header size .* (\d+)', log).group(1) +# ), +# 'max_header_delay': int( +# search(r'Max header delay .* (\d+)', log).group(1) +# ), +# 'gc_depth': int( +# search(r'Garbage collection depth .* (\d+)', log).group(1) +# ), +# 'sync_retry_delay': int( +# search(r'Sync retry delay .* (\d+)', log).group(1) +# ), +# 'sync_retry_nodes': int( +# search(r'Sync retry nodes .* (\d+)', log).group(1) +# ), +# 'batch_size': int( +# search(r'Batch size .* (\d+)', log).group(1) +# ), +# 'max_batch_delay': int( +# search(r'Max batch delay .* (\d+)', log).group(1) +# ), +# } + +# ip = search(r'booted on (\d+.\d+.\d+.\d+)', log).group(1) + +# return proposals, commits, configs, ip, latencies + +# def _parse_workers(self, log): +# # if search(r'(?:panic|Error)', log) is not None: +# # raise ParseError('Worker(s) panicked') + +# tmp = findall(r'Batch ([^ ]+) contains (\d+) B', log) +# sizes = {d: int(s) for d, s in tmp} + +# tmp = findall(r'Batch ([^ ]+) contains sample tx (\d+)', log) +# samples = {int(s): d for d, s in tmp} + +# ip = search(r'booted on (\d+.\d+.\d+.\d+)', log).group(1) + +# return sizes, samples, ip + +# def _to_posix(self, string): +# x = datetime.fromisoformat(string.replace('Z', '+00:00')) +# return datetime.timestamp(x) + +# def _consensus_throughput(self): +# if not self.commits: +# return 0, 0, 0 +# start, end = min(self.proposals.values()), max(self.commits.values()) +# duration = end - start +# bytes = sum(self.sizes.values()) +# bps = bytes / duration +# tps = bps / self.size[0] +# return tps, bps, duration + +# def _consensus_latency(self): +# # return 0 +# # latency = [c - self.proposals[d] for d, c in self.commits.items()] + +# # # latency = [x for x in latency if x > 0] +# # # print(mean(list(self.client_latencies.values()))) +# latency = list(self.client_latencies.values()) +# return mean(latency) if latency else 0 + +# def _end_to_end_throughput(self): +# if not self.commits: +# return 0, 0, 0 +# start, end = min(self.start), max(self.commits.values()) +# duration = end - start +# bytes = sum(self.sizes.values()) +# bps = bytes / duration +# tps = bps / self.size[0] +# return tps, bps, duration + +# def _end_to_end_latency(self): +# # return 0 +# latency = [] +# list_latencies = [] +# first_start = 0 +# set_first = True +# for received in self.received_samples: +# for tx_id, batch_id in received.items(): +# if batch_id in self.commits: +# # assert tx_id in sent # We receive txs that we sent. +# for _sent in self.sent_samples: +# if tx_id in _sent: +# start = _sent[tx_id] +# possible_ends = self.all_commits[batch_id] +# impossible_ends = [x for x in possible_ends if x < start] +# if len(impossible_ends) > 0: +# print("batch:", batch_id, "tx:", tx_id, "impossible_ends:", impossible_ends) +# possible_ends = [x for x in possible_ends if x > start] + +# if len(possible_ends) == 0: +# continue +# end = min(possible_ends) +# if set_first: +# first_start = start +# first_end = end +# set_first = False +# latency += [end-start] +# list_latencies += [(start-first_start, end-first_start, end-start)] + +# list_latencies.sort(key=lambda tup: tup[0]) +# with open('latencies.txt', 'w') as f: +# for line in list_latencies: +# f.write(str(line[0]) + ',' + str(line[1]) + ',' + str((line[2])) + '\n') +# # latency = [x for x in latency if x > 0] +# return mean(latency) if latency else 0 + +# def result(self): +# #timeout_delay = self.configs[0]['timeout_delay'] +# header_size = self.configs[0]['header_size'] +# max_header_delay = self.configs[0]['max_header_delay'] +# gc_depth = self.configs[0]['gc_depth'] +# sync_retry_delay = self.configs[0]['sync_retry_delay'] +# sync_retry_nodes = self.configs[0]['sync_retry_nodes'] +# batch_size = self.configs[0]['batch_size'] +# max_batch_delay = self.configs[0]['max_batch_delay'] + +# consensus_latency = self._consensus_latency() * 1_000 +# consensus_tps, consensus_bps, _ = self._consensus_throughput() +# end_to_end_tps, end_to_end_bps, duration = self._end_to_end_throughput() +# end_to_end_latency = self._end_to_end_latency() * 1_000 + +# # client_latencies = [] +# # for c in self.client_latencies: +# # client_latencies.extend(c) +# # client_latency = mean(client_latencies) + +# # if client_latency > 400: +# # raise ParseError('Client latency is too high') + +# return ( +# '\n' +# '-----------------------------------------\n' +# ' SUMMARY:\n' +# '-----------------------------------------\n' +# ' + CONFIG:\n' +# f' Faults: {self.faults} node(s)\n' +# f' Committee size: {self.committee_size} node(s)\n' +# f' Worker(s) per node: {self.workers} worker(s)\n' +# f' Collocate primary and workers: {self.collocate}\n' +# f' Input rate: {sum(self.rate):,} tx/s\n' +# f' Transaction size: {self.size[0]:,} B\n' +# f' Execution time: {round(duration):,} s\n' +# '\n' +# #f' Timeout delay: {timeout_delay:,} ms\n' +# f' Header size: {header_size:,} B\n' +# f' Max header delay: {max_header_delay:,} ms\n' +# f' GC depth: {gc_depth:,} round(s)\n' +# f' Sync retry delay: {sync_retry_delay:,} ms\n' +# f' Sync retry nodes: {sync_retry_nodes:,} node(s)\n' +# f' batch size: {batch_size:,} B\n' +# f' Max batch delay: {max_batch_delay:,} ms\n' +# '\n' +# ' + RESULTS:\n' +# f' Consensus TPS: {round(consensus_tps):,} tx/s\n' +# f' Consensus BPS: {round(consensus_bps):,} B/s\n' +# f' Consensus latency: {round(consensus_latency):,} ms\n' +# '\n' +# f' End-to-end TPS: {round(end_to_end_tps):,} tx/s\n' +# f' End-to-end BPS: {round(end_to_end_bps):,} B/s\n' +# f' End-to-end latency: {round(end_to_end_latency):,} ms\n' +# '\n' +# f' Client latency: {round(end_to_end_latency):,} ms\n' +# '-----------------------------------------\n' +# ) + +# def print(self, filename): +# assert isinstance(filename, str) +# with open(filename, 'a') as f: +# f.write(self.result()) + +# @classmethod +# def process(cls, directory, faults=0): +# assert isinstance(directory, str) + +# clients = [] +# for filename in sorted(glob(join(directory, 'client*.log'))): +# print(filename) +# with open(filename, 'r') as f: +# clients += [f.read()] +# primaries = [] +# for filename in sorted(glob(join(directory, 'primary*.log'))): +# with open(filename, 'r') as f: +# primaries += [f.read()] +# workers = [] +# for filename in sorted(glob(join(directory, 'worker*.log'))): +# with open(filename, 'r') as f: +# workers += [f.read()] + +# return cls(clients, primaries, workers, faults=faults) + +class Color: + HEADER = '\033[95m' + OK_BLUE = '\033[94m' + OK_GREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + END = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + +class Print: + @staticmethod + def heading(message): + assert isinstance(message, str) + print(f'{Color.OK_GREEN}{message}{Color.END}') + + @staticmethod + def info(message): + assert isinstance(message, str) + print(message) + + @staticmethod + def warn(message): + assert isinstance(message, str) + print(f'{Color.BOLD}{Color.WARNING}WARN{Color.END}: {message}') + + @staticmethod + def error(e): + # assert isinstance(e, BenchError) + print(f'\n{Color.BOLD}{Color.FAIL}ERROR{Color.END}: {e}\n') + causes, current_cause = [], e.cause + # while isinstance(current_cause, BenchError): + # causes += [f' {len(causes)}: {e.cause}\n'] + # current_cause = current_cause.cause + causes += [f' {len(causes)}: {type(current_cause)}\n'] + causes += [f' {len(causes)}: {current_cause}\n'] + print(f'Caused by: \n{"".join(causes)}\n') - # Parse the primaries logs. + +class ParseError(Exception): + pass + + +class ClientMetricsData: + """Structured metrics extracted from the per-client metrics file using pandas.""" + + def __init__(self, client_id, df): + self.client_id = client_id + self.df = df + + @classmethod + def from_file(cls, path): try: - with Pool() as p: - results = p.map(self._parse_primaries, primaries) - except (ValueError, IndexError, AttributeError) as e: - raise ParseError(f'Failed to parse nodes\' logs: {e}') - proposals, commits, self.configs, primary_ips, client_latencies = zip(*results) - self.proposals = self._merge_results([x.items() for x in proposals]) - self.commits = self._merge_results([x.items() for x in commits]) - self.all_commits = self._pile_results([x.items() for x in commits]) - - self.client_latencies = self._avg_results(client_latencies) - - # Parse the workers logs. + # Read CSV with optimized dtypes + df = pd.read_csv( + path, + dtype={ + 'client_id': 'uint8', + 'tx_id': 'uint64', + 'is_sample': 'uint8', + 'send_ns': 'uint64', + 'commit_ns': 'uint64', + 'latency_ns': 'uint64' + }, + engine='c' + ) + + if df.empty: + return None + + # Ensure we keep the earliest commit per (client_id, tx_id) + df = df.sort_values(['commit_ns', 'send_ns']) + before = len(df) + df = df.drop_duplicates(subset=['client_id', 'tx_id'], keep='first') + df.attrs['dedup_dropped'] = before - len(df) + + # Vectorized time conversion from nanoseconds to seconds + df['send_time'] = df['send_ns'] / 1_000_000_000.0 + df['commit_time'] = df['commit_ns'] / 1_000_000_000.0 + + client_id = int(df['client_id'].iloc[0]) + return cls(client_id, df) + + except (OSError, pd.errors.EmptyDataError, KeyError): + return None + + +class LogParser: + def __init__( + self, + clients, + primaries, + workers, + metrics_map, + metrics_by_file=None, + faults=0, + parameters=None, + committee=None, + client_files=None, + warmup_seconds=0, + cooldown_seconds=0, + ): + assert isinstance(clients, list) and all(isinstance(x, str) for x in clients) + assert isinstance(primaries, list) and all(isinstance(x, str) for x in primaries) + assert isinstance(workers, list) and all(isinstance(x, str) for x in workers) + + self.metrics_map = metrics_map or {} + self.metrics_by_file = metrics_by_file or {} + self.faults = faults + self.parameters = parameters or {} + self.committee = committee or {} + self.client_files = [Path(p) for p in (client_files or [])] try: - with Pool() as p: - results = p.map(self._parse_workers, workers) - except (ValueError, IndexError, AttributeError) as e: - raise ParseError(f'Failed to parse workers\' logs: {e}') - sizes, self.received_samples, workers_ips = zip(*results) - self.sizes = { - k: v for x in sizes for k, v in x.items() if k in self.commits - } + self.latency_warmup = max(0.0, float(warmup_seconds)) + self.latency_cooldown = max(0.0, float(cooldown_seconds)) + except (TypeError, ValueError): + raise ParseError('Invalid warmup/cooldown durations') + + if self.committee: + authorities = self.committee.get('authorities', {}) + self.committee_size = len(authorities) + if authorities: + first = next(iter(authorities.values())) + self.workers = len(first.get('workers', {})) + else: + self.workers = '?' + else: + if isinstance(faults, int): + self.committee_size = len(primaries) + int(faults) + self.workers = len(workers) // len(primaries) if primaries else '?' + else: + self.committee_size = '?' + self.workers = '?' + + # Parse the clients. Prefer metrics files when available to avoid heavy log parsing. + client_results = [] + missing = [] + for idx, log in enumerate(clients): + client_path = self.client_files[idx] if idx < len(self.client_files) else None + result = self._parse_clients_from_metrics(log, client_path) + if result is None: + name = ( + self.client_files[idx].name + if idx < len(self.client_files) + else f"client-{idx}" + ) + missing.append(name) + client_results.append(result) + + if missing: + names = ', '.join(missing) + raise ParseError(f'Metrics file missing for client log(s): {names}') + + # Pure pandas implementation - only store essential metadata + ( + self.size, + self.rate, + self.start, + misses, + ) = zip(*client_results) + self.misses = sum(misses) - # Determine whether the primary and the workers are collocated. - self.collocate = set(primary_ips) == set(workers_ips) + # Aggregate using metrics files (required). + if not self.metrics_map and self.metrics_by_file: + for data in self.metrics_by_file.values(): + self.metrics_map.setdefault(data.client_id, data) + + if not self.metrics_map: + raise ParseError('Metrics files are required for client aggregation') + + total_dedup = 0 + + # Concatenate all client DataFrames for efficient processing + all_dfs = [] + for data in self.metrics_map.values(): + if data.df is not None and not data.df.empty: + total_dedup += data.df.attrs.get('dedup_dropped', 0) + all_dfs.append(data.df) + + if total_dedup: + Print.warn(f'Dropped {total_dedup:,} duplicate commits while loading metrics') + + if all_dfs: + self.all_metrics_df = pd.concat(all_dfs, ignore_index=True) + + # Validate no duplicate transactions - each (client_id, tx_id) should be unique + duplicates = self.all_metrics_df.duplicated(subset=['client_id', 'tx_id'], keep=False) + if duplicates.any(): + dup_count = duplicates.sum() + dup_samples = self.all_metrics_df[duplicates][['client_id', 'tx_id']].head(10) + Print.warn(f'Found {dup_count} duplicate transactions! Sample: {dup_samples.values.tolist()}') + else: + self.all_metrics_df = pd.DataFrame() + + # NOTE: Primaries/workers parsing is disabled; use provided parameters instead. + self.proposals = {} + self.commits = {} + self.configs = [{ + 'header_size': self.parameters.get('header_size', 0), + 'max_header_delay': self.parameters.get('max_header_delay', 0), + 'gc_depth': self.parameters.get('gc_depth', 0), + 'sync_retry_delay': self.parameters.get('sync_retry_delay', 0), + 'sync_retry_nodes': self.parameters.get('sync_retry_nodes', 0), + 'batch_size': self.parameters.get('batch_size', 0), + 'max_batch_delay': self.parameters.get('max_batch_delay', 0), + }] + self.collocate = self._compute_collocate_from_committee() + self.sizes = {} # Check whether clients missed their target rate. if self.misses != 0: - print( + Print.warn( f'Clients missed their target rate {self.misses:,} time(s)' ) @@ -78,55 +571,91 @@ def _merge_results(self, input): merged[k] = v return merged - def _avg_results(self, input): - # Keep the earliest timestamp. - merged = {} - for x in input: - for k, v in x.items(): - if not k in merged: - merged[k] = [v] - else: - merged[k].append(v) - - merged = {k: mean(v) for k, v in merged.items()} - return merged + def _compute_collocate_from_committee(self): + authorities = self.committee.get('authorities', {}) + if not authorities: + return False + + for info in authorities.values(): + primary_addr = info.get('primary', {}).get('primary_to_primary') + if not primary_addr: + return False + primary_ip = primary_addr.split(':')[0] + + workers = info.get('workers', {}) + for worker_info in workers.values(): + tx_addr = worker_info.get('transactions') + if not tx_addr: + return False + worker_ip = tx_addr.split(':')[0] + if worker_ip != primary_ip: + return False + return True - def _pile_results(self, input): - merged = {} - for x in input: - for k, v in x: - if not k in merged: - merged[k] = [v] - else: - merged[k].append(v) + def _parse_clients_from_metrics(self, log, client_path=None): + size_match = search(r'Transactions size: (\d+)', log) + size = int(size_match.group(1)) if size_match else 0 + rate_match = search(r'Transactions rate: (\d+)', log) + rate = int(rate_match.group(1)) if rate_match else 0 + misses = len(findall(r'rate too high', log)) - return merged + client_match = search(r'Client (\d+) Start sending transactions', log) + if client_match is None: + client_match = search(r'Client (\d+) sending sample transaction', log) + if client_match is None: + client_match = search(r'Client (\d+) successfully started', log) + client_id = int(client_match.group(1)) if client_match else None + + metrics_entry = None + if client_id is not None: + metrics_entry = self.metrics_map.get(client_id) + if metrics_entry is None and client_path is not None: + metrics_entry = self.metrics_by_file.get(client_path.name) + if metrics_entry is None and client_path is not None: + metrics_entry = ClientMetricsData.from_file(client_path.with_suffix('.metrics')) + if metrics_entry is not None: + self.metrics_map.setdefault(metrics_entry.client_id, metrics_entry) + self.metrics_by_file.setdefault(client_path.name, metrics_entry) + if metrics_entry is None: + return None + + # Pure pandas implementation - only return essential metadata + start = metrics_entry.df['send_time'].min() if not metrics_entry.df.empty else 0 + return size, rate, start, misses def _parse_clients(self, log): - # if search(r'Error', log) is not None: - # raise ParseError('Client(s) panicked') + if search(r'Error', log) is not None: + raise ParseError('Client(s) panicked') - size = int(search(r'Transactions size: (\d+)', log).group(1)) - rate = int(search(r'Transactions rate: (\d+)', log).group(1)) + size_match = search(r'Transactions size: (\d+)', log) + size = int(size_match.group(1)) if size_match else 0 + rate_match = search(r'Transactions rate: (\d+)', log) + rate = int(rate_match.group(1)) if rate_match else 0 - tmp = search(r'\[(.*Z) .* Start ', log).group(1) - start = self._to_posix(tmp) + start_match = search(r'\[(.*Z) .* Start ', log) + start = self._to_posix(start_match.group(1)) if start_match else 0 misses = len(findall(r'rate too high', log)) - tmp = findall(r'\[(.*Z) .* sample transaction (\d+)', log) - samples = {int(s): self._to_posix(t) for t, s in tmp} + client_match = search(r'Client (\d+) Start sending transactions', log) + if client_match is None: + client_match = search(r'Client (\d+) sending sample transaction', log) + if client_match is None: + client_match = search(r'Client (\d+) successfully started', log) + client_id = int(client_match.group(1)) if client_match else None - # tmp = findall(r'Client latency: (\d+) ms', log) - # client_latencies = [int(x) for x in tmp] - # if len(client_latencies) == 0: - # client_latencies = [0] + if client_id is not None: + metrics_entry = self.metrics_map.get(client_id) + if metrics_entry: + # Pandas-only implementation - fallback not supported + raise ParseError('Fallback parsing from logs not supported - metrics files required') - return size, rate, start, misses, samples + # Old regex-based parsing - not supported anymore + raise ParseError('Fallback parsing from logs not supported - metrics files required') def _parse_primaries(self, log): - # if search(r'(?:panicked|Error)', log) is not None: - # raise ParseError('Primary(s) panicked') + if search(r'(?:panicked|Error)', log) is not None: + raise ParseError('Primary(s) panicked') tmp = findall(r'\[(.*Z) .* Created B\d+\([^ ]+\) -> ([^ ]+=)', log) tmp = [(d, self._to_posix(t)) for t, d in tmp] @@ -136,14 +665,6 @@ def _parse_primaries(self, log): tmp = [(d, self._to_posix(t)) for t, d in tmp] commits = self._merge_results([tmp]) - latencies = {} - for d in commits.keys(): - if d in proposals: - latencies[d] = commits[d] - proposals[d] - - - - configs = { #'timeout_delay': int( # search(r'Timeout delay .* (\d+)', log).group(1) @@ -173,90 +694,146 @@ def _parse_primaries(self, log): ip = search(r'booted on (\d+.\d+.\d+.\d+)', log).group(1) - return proposals, commits, configs, ip, latencies + return proposals, commits, configs, ip def _parse_workers(self, log): - # if search(r'(?:panic|Error)', log) is not None: - # raise ParseError('Worker(s) panicked') + if search(r'(?:panic|Error)', log) is not None: + raise ParseError('Worker(s) panicked') tmp = findall(r'Batch ([^ ]+) contains (\d+) B', log) sizes = {d: int(s) for d, s in tmp} - tmp = findall(r'Batch ([^ ]+) contains sample tx (\d+)', log) - samples = {int(s): d for d, s in tmp} - ip = search(r'booted on (\d+.\d+.\d+.\d+)', log).group(1) - return sizes, samples, ip + return sizes, ip def _to_posix(self, string): x = datetime.fromisoformat(string.replace('Z', '+00:00')) return datetime.timestamp(x) - def _consensus_throughput(self): - if not self.commits: + def _consensus_latency(self): + latency = [c - self.proposals[d] for d, c in self.commits.items()] + return mean(latency) if latency else 0 + + def _per_client_throughput(self): + """Compute throughput by summing per-client contributions using vectorized operations.""" + if not self.metrics_map or not self.size: return 0, 0, 0 - start, end = min(self.proposals.values()), max(self.commits.values()) - duration = end - start - bytes = sum(self.sizes.values()) - bps = bytes / duration - tps = bps / self.size[0] - return tps, bps, duration - def _consensus_latency(self): - # return 0 - # latency = [c - self.proposals[d] for d, c in self.commits.items()] + size_bytes = self.size[0] + total_tps = 0.0 + total_bps = 0.0 + durations = [] - # # latency = [x for x in latency if x > 0] - # # print(mean(list(self.client_latencies.values()))) - latency = list(self.client_latencies.values()) - return mean(latency) if latency else 0 + for data in self.metrics_map.values(): + if data.df is None or data.df.empty: + continue + + # Vectorized operations on DataFrame + df = data.df + start = df['send_time'].min() + end = df['commit_time'].max() + duration = end - start + + if duration <= 0: + continue + + tx_count = len(df) + client_tps = tx_count / duration + client_bps = (tx_count * size_bytes) / duration + + total_tps += client_tps + total_bps += client_bps + durations.append(duration) + + overall_duration = max(durations) if durations else 0 + return total_tps, total_bps, overall_duration def _end_to_end_throughput(self): - if not self.commits: - return 0, 0, 0 - start, end = min(self.start), max(self.commits.values()) - duration = end - start - bytes = sum(self.sizes.values()) - bps = bytes / duration - tps = bps / self.size[0] - return tps, bps, duration + return self._per_client_throughput() def _end_to_end_latency(self): - # return 0 - latency = [] - list_latencies = [] - first_start = 0 - set_first = True - for received in self.received_samples: - for tx_id, batch_id in received.items(): - if batch_id in self.commits: - # assert tx_id in sent # We receive txs that we sent. - for _sent in self.sent_samples: - if tx_id in _sent: - start = _sent[tx_id] - possible_ends = self.all_commits[batch_id] - impossible_ends = [x for x in possible_ends if x < start] - if len(impossible_ends) > 0: - print("batch:", batch_id, "tx:", tx_id, "impossible_ends:", impossible_ends) - possible_ends = [x for x in possible_ends if x > start] - - if len(possible_ends) == 0: - continue - end = min(possible_ends) - if set_first: - first_start = start - first_end = end - set_first = False - latency += [end-start] - list_latencies += [(start-first_start, end-first_start, end-start)] - - list_latencies.sort(key=lambda tup: tup[0]) - with open('latencies.txt', 'w') as f: - for line in list_latencies: - f.write(str(line[0]) + ',' + str(line[1]) + ',' + str((line[2])) + '\n') - # latency = [x for x in latency if x > 0] - return mean(latency) if latency else 0 + """Compute end-to-end latency using vectorized pandas operations.""" + if self.all_metrics_df.empty: + return {'mean': 0, 'p50': 0, 'p95': 0, 'p99': 0, 'p999': 0} + + df = self.all_metrics_df.copy() + + # Vectorized latency calculation + df['latency'] = df['commit_time'] - df['send_time'] + + # Track totals before trimming + total_samples = int(df['is_sample'].sum()) + total_transactions = len(df) + first_send = df['send_time'].min() + last_commit = df['commit_time'].max() + + # Apply warmup/cooldown trimming + lower_bound = None + upper_bound = None + if self.latency_warmup > 0: + lower_bound = first_send + self.latency_warmup + if self.latency_cooldown > 0: + upper_bound = last_commit - self.latency_cooldown + if lower_bound is not None and upper_bound is not None and upper_bound <= lower_bound: + lower_bound = upper_bound = None + + # Vectorized filtering + original_len = len(df) + if lower_bound is not None: + df = df[df['send_time'] >= lower_bound] + if upper_bound is not None: + df = df[df['commit_time'] <= upper_bound] + + trimmed_transactions = original_len - len(df) + trimmed_samples = total_samples - int(df['is_sample'].sum()) + + # Handle case where all transactions were trimmed + if df.empty and original_len > 0: + Print.warn('Warmup/cooldown trimming removed all transaction latencies; recomputing without trimming.') + df = self.all_metrics_df.copy() + df['latency'] = df['commit_time'] - df['send_time'] + trimmed_samples = 0 + trimmed_transactions = 0 + + # Write latencies.txt for sample transactions + sample_df = df[df['is_sample'] == 1].copy() + if not sample_df.empty: + first_start = sample_df['send_time'].min() + sample_df['start_offset'] = sample_df['send_time'] - first_start + sample_df['commit_offset'] = sample_df['commit_time'] - first_start + sample_df = sample_df.sort_values('start_offset') + sample_df[['start_offset', 'commit_offset', 'latency']].to_csv( + 'latencies.txt', index=False, header=False + ) + else: + # Write empty file if no samples + with open('latencies.txt', 'w') as f: + pass + + print('Num sent samples:', total_samples, 'Trimmed:', trimmed_samples) + print('Num sent transactions:', total_transactions, 'Trimmed:', trimmed_transactions) + print('Num sample latencies:', int(df['is_sample'].sum())) + print('Num all latencies:', len(df)) + + if df.empty: + return {'mean': 0, 'p50': 0, 'p95': 0, 'p99': 0, 'p999': 0} + + # Vectorized percentile calculations + latencies = df['latency'].values + mean_latency = float(latencies.mean()) + p50 = float(np.percentile(latencies, 50)) + p95 = float(np.percentile(latencies, 95)) + p99 = float(np.percentile(latencies, 99)) + p999 = float(np.percentile(latencies, 99.9)) + + return { + 'mean': mean_latency, + 'p50': p50, + 'p95': p95, + 'p99': p99, + 'p999': p999 + } def result(self): #timeout_delay = self.configs[0]['timeout_delay'] @@ -268,18 +845,13 @@ def result(self): batch_size = self.configs[0]['batch_size'] max_batch_delay = self.configs[0]['max_batch_delay'] - consensus_latency = self._consensus_latency() * 1_000 - consensus_tps, consensus_bps, _ = self._consensus_throughput() end_to_end_tps, end_to_end_bps, duration = self._end_to_end_throughput() - end_to_end_latency = self._end_to_end_latency() * 1_000 - - # client_latencies = [] - # for c in self.client_latencies: - # client_latencies.extend(c) - # client_latency = mean(client_latencies) - - # if client_latency > 400: - # raise ParseError('Client latency is too high') + latency_stats = self._end_to_end_latency() + end_to_end_latency = latency_stats['mean'] * 1_000 + end_to_end_p50 = latency_stats['p50'] * 1_000 + end_to_end_p95 = latency_stats['p95'] * 1_000 + end_to_end_p99 = latency_stats['p99'] * 1_000 + end_to_end_p999 = latency_stats['p999'] * 1_000 return ( '\n' @@ -305,15 +877,16 @@ def result(self): f' Max batch delay: {max_batch_delay:,} ms\n' '\n' ' + RESULTS:\n' - f' Consensus TPS: {round(consensus_tps):,} tx/s\n' - f' Consensus BPS: {round(consensus_bps):,} B/s\n' - f' Consensus latency: {round(consensus_latency):,} ms\n' - '\n' + #f' Consensus TPS: {round(consensus_tps):,} tx/s\n' + #f' Consensus BPS: {round(consensus_bps):,} B/s\n' + #f' Consensus latency: {round(consensus_latency):,} ms\n' f' End-to-end TPS: {round(end_to_end_tps):,} tx/s\n' f' End-to-end BPS: {round(end_to_end_bps):,} B/s\n' - f' End-to-end latency: {round(end_to_end_latency):,} ms\n' - '\n' - f' Client latency: {round(end_to_end_latency):,} ms\n' + f' End-to-end latency (mean): {round(end_to_end_latency):,} ms\n' + f' End-to-end latency (P50): {round(end_to_end_p50):,} ms\n' + f' End-to-end latency (P95): {round(end_to_end_p95):,} ms\n' + f' End-to-end latency (P99): {round(end_to_end_p99):,} ms\n' + f' End-to-end latency (P99.9): {round(end_to_end_p999):,} ms\n' '-----------------------------------------\n' ) @@ -323,21 +896,52 @@ def print(self, filename): f.write(self.result()) @classmethod - def process(cls, directory, faults=0): + def process(cls, directory, faults=0, warmup_seconds=0, cooldown_seconds=0): assert isinstance(directory, str) - clients = [] - for filename in sorted(glob(join(directory, 'client*.log'))): - print(filename) - with open(filename, 'r') as f: - clients += [f.read()] + client_files = sorted(glob(join(directory, 'client*.log'))) + print(client_files) + clients = [Path(path).read_text() for path in client_files] primaries = [] - for filename in sorted(glob(join(directory, 'primary*.log'))): - with open(filename, 'r') as f: - primaries += [f.read()] workers = [] - for filename in sorted(glob(join(directory, 'worker*.log'))): - with open(filename, 'r') as f: - workers += [f.read()] - return cls(clients, primaries, workers, faults=faults) + directory_path = Path(directory) + root = directory_path.parent + params_path = root / '.parameters.json' + committee_path = root / '.committee.json' + + parameters = {} + committee = {} + if params_path.exists(): + parameters = json.loads(params_path.read_text()) + if committee_path.exists(): + committee = json.loads(committee_path.read_text()) + + metrics_map, metrics_by_file = cls._load_metrics(client_files) + + return cls( + clients, + primaries, + workers, + metrics_map, + metrics_by_file=metrics_by_file, + faults=faults, + parameters=parameters, + committee=committee, + client_files=client_files, + warmup_seconds=warmup_seconds, + cooldown_seconds=cooldown_seconds, + ) + + @staticmethod + def _load_metrics(client_files): + metrics_by_id = {} + metrics_by_file = {} + for log_path in client_files: + log_path = Path(log_path) + metrics_path = log_path.with_suffix('.metrics') + data = ClientMetricsData.from_file(metrics_path) + if data is not None: + metrics_by_id[data.client_id] = data + metrics_by_file[log_path.name] = data + return metrics_by_id, metrics_by_file diff --git a/scripts/deployment.py b/scripts/deployment.py index ad8bfa72..f57e2aab 100644 --- a/scripts/deployment.py +++ b/scripts/deployment.py @@ -158,6 +158,13 @@ def deploy(self): if self.mode == "manual": # Manual must mean there is a nodelist specified in the toml file. # There is no need to deploy. + # Save myself + with open(os.path.join(self.workdir, "deployment", "deployment.pkl"), "wb") as f: + pickle.dump(self, f) + + # Rewrite deployment.txt + with open(os.path.join(self.workdir, "deployment", "deployment.txt"), "w") as f: + pprint(self, f) return # Terraform deploy diff --git a/scripts/gcloud-fetch.py b/scripts/gcloud-fetch.py new file mode 100644 index 00000000..189174c0 --- /dev/null +++ b/scripts/gcloud-fetch.py @@ -0,0 +1,43 @@ +from subprocess import Popen, PIPE + +GCLOUD = "/opt/homebrew/share/google-cloud-sdk/bin/gcloud" + +def run_gcloud(command): + return Popen( + f'{GCLOUD} {command}', + shell=True, + stdout=PIPE, + stderr=PIPE, + ).stdout.read().decode('utf-8').split("\n") + +all_node_list = run_gcloud( + 'compute instances list --filter="Name ~ pool" --format="table(Name, INTERNAL_IP, EXTERNAL_IP)"' +) +all_node_list = [line.split() for line in all_node_list if line.strip()][1:] # Skip header + + +client_list = [x for x in all_node_list if "client" in x[0]] +node_list = [x for x in all_node_list if not("client" in x[0])] + +print("[deployment_config.node_list]") + +for i, node in enumerate(node_list): + print(f"[deployment_config.node_list.nodepool_vm{i}]") + print(f"private_ip = \"{node[1]}\"") + print(f"public_ip = \"{node[2]}\"") + + if "sev" in node[0]: + tee_type = "sev" + elif "tdx" in node[0]: + tee_type = "tdx" + else: + tee_type = "nontee" + + print(f"tee_type = \"{tee_type}\"") + print(f"region_id = 0") + print() + +for i, node in enumerate(client_list): + print(f"[deployment_config.node_list.clientpool_vm{i}]") + print(f"private_ip = \"{node[1]}\"") + print(f"public_ip = \"{node[2]}\"") \ No newline at end of file diff --git a/scripts/results.py b/scripts/results.py index f1e0119a..bdbbb2a1 100644 --- a/scripts/results.py +++ b/scripts/results.py @@ -307,17 +307,19 @@ def process_autobahn_experiment(self, experiment, ramp_up, ramp_down, byz, tput_ log_dir = os.path.join(experiment.local_workdir, "logs", "0") print(log_dir) try: - result = AutobahnLogParser.process(log_dir).result() + result = AutobahnLogParser.process(log_dir, faults=0, warmup_seconds=ramp_up, cooldown_seconds=ramp_down).result() mean_tput = 0.0 mean_latency = 0.0 + ___i = 0 for line in result.split("\n"): - print(line) - if line.startswith(" Consensus TPS"): + print(___i, line) + ___i += 1 + if line.startswith(" End-to-end TPS"): mean_tput = line.split(":")[-1].strip() mean_tput = mean_tput.replace(",", "") mean_tput = float(mean_tput.split(" ")[0]) / tput_scale - if line.startswith(" Client latency"): + if line.startswith(" End-to-end latency (mean)"): mean_latency = line.split(":")[-1].strip() mean_latency = mean_latency.replace(",", "") mean_latency = float(mean_latency.split(" ")[0]) / latency_scale @@ -764,7 +766,7 @@ def tput_latency_sweep_plot(self, plot_dict: Dict[str, List[Stats]], output: str y_range_total = max([v[3] for v in bounding_boxes.values()]) - min([v[2] for v in bounding_boxes.values()]) # if y_range_total > 200: # plt.yscale("log") - # plt.ylim((0, 125)) + plt.ylim((0, 125)) # plt.xlim((50, 550)) plt.legend(loc='upper center', bbox_to_anchor=(0.5, 1.4), ncol=legends_ncols, fontsize=70) plt.xticks(fontsize=70) diff --git a/src/client/worker.rs b/src/client/worker.rs index 907b08bc..a87bbcb3 100644 --- a/src/client/worker.rs +++ b/src/client/worker.rs @@ -381,7 +381,7 @@ impl ClientWorker }; if res.is_err() { - debug!("Error: {:?}", res); + error!("Error: {:?}", res); match __executor_mode { Executor::Leader => { *curr_leader_id = (*curr_leader_id + 1) % node_list.len(); diff --git a/src/config/mod.rs b/src/config/mod.rs index 5fd4ba21..47b67cc7 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; -use serde_json::Result; +use serde_json::{Result, Value}; use std::collections::HashMap; #[cfg(test)] @@ -114,6 +114,10 @@ impl ConsensusConfig { pub struct AppConfig { pub logger_stats_report_ms: u64, pub checkpoint_interval_ms: u64, + + #[serde(flatten)] + #[serde(default = "HashMap::new")] + pub app_specific: HashMap, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -222,6 +226,7 @@ impl ClientConfig { app_config: AppConfig { logger_stats_report_ms: 100, checkpoint_interval_ms: 60000, + app_specific: HashMap::new(), }, #[cfg(feature = "evil")] diff --git a/src/config/tests.rs b/src/config/tests.rs index 8445db34..b3081966 100644 --- a/src/config/tests.rs +++ b/src/config/tests.rs @@ -70,6 +70,7 @@ fn test_nodeconfig_serialize() { let app_config = AppConfig { logger_stats_report_ms: 100, checkpoint_interval_ms: 60000, + app_specific: HashMap::new(), }; let evil_config = EvilConfig { @@ -226,6 +227,7 @@ async fn test_atomic_config_access() { let app_config = AppConfig { logger_stats_report_ms: 100, checkpoint_interval_ms: 60000, + app_specific: HashMap::new(), }; let evil_config = EvilConfig { diff --git a/src/consensus/block_sequencer.rs b/src/consensus/block_sequencer.rs index 87210796..c7840f2b 100644 --- a/src/consensus/block_sequencer.rs +++ b/src/consensus/block_sequencer.rs @@ -392,6 +392,7 @@ impl BlockSequencer { } self.current_qc_list.push(qc); + // warn!("current_qc_list size: {}", self.current_qc_list.len()); } } diff --git a/src/consensus/engines/kvs.rs b/src/consensus/engines/kvs.rs index c295e27f..930fa9d8 100644 --- a/src/consensus/engines/kvs.rs +++ b/src/consensus/engines/kvs.rs @@ -391,238 +391,4 @@ impl KVSAppEngine { return None; } } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::config::AtomicConfig; - - #[test] - fn test_setup() { - let engine = setup_engine(); - println!("{}", engine.last_bci); - println!("{}", engine.last_ci); - println!("{}", engine.quit_signal); - } - - #[test] - fn test_crash_commit() { - let mut engine: KVSAppEngine = setup_engine(); - let blocks = vec![ - create_dummy_cached_block(false, true, false), - create_dummy_cached_block(false, true, false), - ]; - - let result: Vec> = engine.handle_crash_commit(blocks); - dbg!(&engine.state.ci_state); - dbg!(&result); - println!("{}", engine.last_ci) - } - #[test] - fn test_byz_commit() { - let mut engine: KVSAppEngine = setup_engine(); - let blocks = vec![ - create_dummy_cached_block(false, false, true), - create_dummy_cached_block(false, false, true), - ]; - - let result: Vec> = engine.handle_byz_commit(blocks); - dbg!(&engine.state.bci_state); - dbg!(&result); - println!("potsto{}", engine.last_bci) - } - - fn test_rollback() { - let mut engine: KVSAppEngine = setup_engine(); - - todo!() - } - #[test] - fn test_reads() { - let mut engine = setup_engine(); - - let blocks = vec![ - create_dummy_cached_block(false, true, false), - create_dummy_cached_block(false, true, false), - ]; - - let result: Vec> = engine.handle_crash_commit(blocks); - dbg!(&engine.state.ci_state); - dbg!(&result); - - let txn = create_dummy_tx(true, false, false, ProtoTransactionOpType::Read); - let result= engine.handle_unlogged_request(txn); - dbg!(&engine.state.ci_state); - dbg!(&result); - } - - - - - fn setup_engine() -> KVSAppEngine { - let mut net_config = NetConfig { - name: "node1".to_string(), - addr: "0.0.0.0:3001".to_string(), - tls_cert_path: String::from("blah"), - tls_key_path: String::from("blah"), - tls_root_ca_cert_path: String::from("blah"), - nodes: HashMap::new(), - client_max_retry: 10, - }; - - let rpc_config = RpcConfig { - allowed_keylist_path: String::from("blah/blah"), - signing_priv_key_path: String::from("blah/blah"), - recv_buffer_size: (1 << 15), - channel_depth: 32, - }; - - let consensus_config = ConsensusConfig { - node_list: vec![ - String::from("node1"), - String::from("node2"), - String::from("node3"), - ], - learner_list: vec![String::from("node4"), String::from("node5")], - quorum_diversity_k: 3, - max_backlog_batch_size: 1000, - signature_max_delay_blocks: 128, - signature_max_delay_ms: 100, - vote_processing_workers: 128, - view_timeout_ms: 150, - batch_max_delay_ms: 10, - - #[cfg(feature = "storage")] - log_storage_config: crate::config::StorageConfig::RocksDB(RocksDBConfig::default()), - - #[cfg(feature = "platforms")] - liveness_u: 1, - }; - - let app_config = AppConfig { - logger_stats_report_ms: 100, - checkpoint_interval_ms: 60000, - }; - - let evil_config = EvilConfig { - simulate_byzantine_behavior: true, - byzantine_start_block: 20000, - }; - - let config = Config { - net_config, - rpc_config, - consensus_config, - app_config, - }; - - let atomic_config = AtomicConfig::new(config); - - KVSAppEngine::new(atomic_config) - } - -fn create_dummy_tx(on_receieve: bool, on_crash_commit: bool, on_byzantine_commit: bool, op_type: ProtoTransactionOpType) -> ProtoTransaction { - let dummy_op = ProtoTransactionOp { - op_type: op_type as i32, - operands: vec![vec![1], vec![4]] - }; - - let dummy_phase = ProtoTransactionPhase { - ops: vec![dummy_op.clone()], - }; - - let mut dummy_transaction = ProtoTransaction { - on_receive:None, - on_crash_commit: None, - on_byzantine_commit: None, - is_reconfiguration: false, - }; - - if on_receieve == true { - dummy_transaction.on_receive = Some(dummy_phase.clone()); - } - - if on_crash_commit == true { - - dummy_transaction.on_crash_commit = Some(dummy_phase.clone()); - } - if on_byzantine_commit == true { - - dummy_transaction.on_byzantine_commit = Some(dummy_phase.clone()); - } - return dummy_transaction; -} - -fn create_dummy_cached_block(on_receieve: bool, on_crash_commit: bool, on_byzantine_commit: bool) -> CachedBlock { - - - // let dummy_op = ProtoTransactionOp { - // op_type: ProtoTransactionOpType::Write as i32, - // operands: vec![vec![1], vec![4]] - // }; - - // let dummy_phase = ProtoTransactionPhase { - // ops: vec![dummy_op.clone()], - // }; - - // let dummy_txn = ProtoTransaction { - // on_receive:None, - // on_crash_commit: None, - // on_byzantine_commit: Some(dummy_phase.clone()), - // is_reconfiguration: false, - // }; - - let dummy_txn = create_dummy_tx(on_receieve, on_crash_commit, on_byzantine_commit, ProtoTransactionOpType::Write); - - - let dummy_qc = ProtoQuorumCertificate { - digest: vec![0xaa, 0xbb, 0xcc], - n: 1, - sig: vec![ProtoNameWithSignature { - name: "Validator1".to_string(), - sig: vec![0xde, 0xad, 0xbe, 0xef], - }], - view: 1, - }; - - let dummy_fork_validation = ProtoForkValidation { - view: 3, - fork_hash: vec![0xba, 0xad, 0xf0, 0x0d], - fork_sig: vec![0xca, 0xfe, 0xba, 0xbe], - fork_len: 2, - fork_last_qc: Some(dummy_qc.clone()), - name: "ForkValidator".to_string(), - }; - - // let dummy_vote = ProtoVote { - // sig_array: vec![ProtoSignatureArrayEntry { - // n: 10, - // sig: vec![0xbe, 0xef, 0xfa, 0xce], - // }], - // fork_digest: vec![0x12, 0x34, 0x56, 0x78], - // n: 10, - // view: 4, - // config_num: 1, - // }; - - let dummy_proto_block = ProtoBlock { - tx_list: vec![dummy_txn.clone(), dummy_txn.clone()], - n: 42, - parent: vec![0xde, 0xad, 0xbe, 0xef], - view: 2, - qc: vec![dummy_qc], - fork_validation: vec![dummy_fork_validation], - view_is_stable: true, - config_num: 1, - sig: Some(Sig::NoSig(DefferedSignature {})), - }; - - let block_ser = vec![0; 100]; - let block_hash = vec![0xaa; 32]; - - return CachedBlock::new(dummy_proto_block, block_ser, block_hash); - } -} - - +} \ No newline at end of file diff --git a/src/consensus/mod.rs b/src/consensus/mod.rs index 43fe5b09..70b23807 100644 --- a/src/consensus/mod.rs +++ b/src/consensus/mod.rs @@ -10,6 +10,9 @@ mod logserver; mod pacemaker; pub mod extra_2pc; +#[cfg(feature = "witness_forwarding")] +mod witness_receiver; + // #[cfg(test)] // mod tests; @@ -27,7 +30,14 @@ use logserver::LogServer; use pacemaker::Pacemaker; use prost::Message; use staging::{Staging, VoteWithSender}; +#[cfg(feature = "witness_forwarding")] +use tokio::sync::mpsc::UnboundedSender; use tokio::{sync::{mpsc::unbounded_channel, Mutex}, task::JoinSet}; +#[cfg(feature = "witness_forwarding")] +use crate::proto::consensus::ProtoWitness; +#[cfg(feature = "witness_forwarding")] +use witness_receiver::WitnessReceiver; + use crate::{proto::{checkpoint::ProtoBackfillNack, consensus::{ProtoAppendEntries, ProtoViewChange}}, rpc::{client::Client, SenderType}, utils::{channel::{make_channel, Receiver, Sender}, RocksDBStorageEngine, StorageService}}; use crate::{config::{AtomicConfig, Config}, crypto::{AtomicKeyStore, CryptoService, KeyStore}, proto::rpc::ProtoPayload, rpc::{server::{MsgAckChan, RespType, Server, ServerContextType}, MessageRef}}; @@ -41,6 +51,9 @@ pub struct ConsensusServerContext { vote_receiver_tx: Sender, view_change_receiver_tx: Sender<(ProtoViewChange, SenderType)>, backfill_request_tx: Sender, + + #[cfg(feature = "witness_forwarding")] + witness_receiver_tx: UnboundedSender, } @@ -56,12 +69,16 @@ impl PinnedConsensusServerContext { vote_receiver_tx: Sender, view_change_receiver_tx: Sender<(ProtoViewChange, SenderType)>, backfill_request_tx: Sender, + #[cfg(feature = "witness_forwarding")] + witness_receiver_tx: UnboundedSender, ) -> Self { Self(Arc::new(Box::pin(ConsensusServerContext { config, keystore, batch_proposal_tx, fork_receiver_tx, fork_receiver_command_tx, vote_receiver_tx, view_change_receiver_tx, backfill_request_tx, + #[cfg(feature = "witness_forwarding")] + witness_receiver_tx, }))) } } @@ -143,6 +160,14 @@ impl ServerContextType for PinnedConsensusServerContext { .expect("Channel send error"); return Ok(RespType::NoResp); }, + crate::proto::rpc::proto_payload::Message::Witness(proto_witness) => { + #[cfg(feature = "witness_forwarding")] + { + self.witness_receiver_tx.send(proto_witness) + .expect("Channel send error"); + } + return Ok(RespType::NoResp); + }, } @@ -172,6 +197,9 @@ pub struct ConsensusNode { logserver: Arc>, pacemaker: Arc>, + #[cfg(feature = "witness_forwarding")] + witness_receiver: Arc>, + #[cfg(feature = "extra_2pc")] extra_2pc: Arc>, @@ -226,6 +254,9 @@ impl ConsensusNode { let pacemaker_client = Client::new_atomic(config.clone(), keystore.clone(), false, 0); let fork_receiver_client = Client::new_atomic(config.clone(), keystore.clone(), false, 0); + #[cfg(feature = "witness_forwarding")] + let witness_client = Client::new_atomic(config.clone(), keystore.clone(), false, 0); + #[cfg(feature = "extra_2pc")] let extra_2pc_client = Client::new_atomic(config.clone(), keystore.clone(), true, 50); @@ -272,7 +303,13 @@ impl ConsensusNode { #[cfg(feature = "extra_2pc")] let (extra_2pc_staging_tx, extra_2pc_staging_rx) = make_channel(10 * _chan_depth); - let ctx = PinnedConsensusServerContext::new(config.clone(), keystore.clone(), batch_proposer_tx.clone(), fork_tx, fork_receiver_command_tx.clone(), vote_tx, view_change_tx, backfill_request_tx); + #[cfg(feature = "witness_forwarding")] + let (witness_tx, witness_rx) = unbounded_channel(); + + let ctx = PinnedConsensusServerContext::new(config.clone(), keystore.clone(), batch_proposer_tx.clone(), fork_tx, fork_receiver_command_tx.clone(), vote_tx, view_change_tx, backfill_request_tx, + #[cfg(feature = "witness_forwarding")] + witness_tx, + ); let batch_proposer = BatchProposer::new(config.clone(), batch_proposer_rx, block_maker_tx, client_reply_command_tx.clone(), unlogged_tx, batch_proposer_command_rx); let block_sequencer = BlockSequencer::new(config.clone(), control_command_rx, block_maker_rx, qc_rx, block_broadcaster_tx, client_reply_tx, block_maker_crypto); let block_broadcaster = BlockBroadcaster::new(config.clone(), client.into(), block_broadcaster_crypto2, block_broadcaster_rx, other_block_rx, broadcaster_control_command_rx, block_broadcaster_storage, staging_tx, fork_receiver_command_tx.clone(), app_tx.clone()); @@ -297,6 +334,10 @@ impl ConsensusNode { #[cfg(feature = "extra_2pc")] let extra_2pc = extra_2pc::TwoPCHandler::new(config.clone(), extra_2pc_client.into(), storage.get_connector(crypto.get_connector()), storage.get_connector(crypto.get_connector()), extra_2pc_command_rx, extra_2pc_phase_message_rx, extra_2pc_staging_tx); + #[cfg(feature = "witness_forwarding")] + let witness_receiver = WitnessReceiver::new(config.clone(), witness_client.into(), keystore.clone(), witness_rx); + + let mut handles = JoinSet::new(); @@ -316,6 +357,9 @@ impl ConsensusNode { #[cfg(feature = "extra_2pc")] extra_2pc: Arc::new(Mutex::new(extra_2pc)), + #[cfg(feature = "witness_forwarding")] + witness_receiver: Arc::new(Mutex::new(witness_receiver)), + crypto, storage: Arc::new(Mutex::new(storage)), __sink_handles: handles, @@ -387,6 +431,14 @@ impl ConsensusNode { Pacemaker::run(pacemaker).await; }); + #[cfg(feature = "witness_forwarding")] + { + let witness_receiver = self.witness_receiver.clone(); + handles.spawn(async move { + WitnessReceiver::run(witness_receiver).await; + }); + } + #[cfg(feature = "extra_2pc")] { let extra_2pc = self.extra_2pc.clone(); diff --git a/src/consensus/staging/mod.rs b/src/consensus/staging/mod.rs index b8f5f13c..7fcf296f 100644 --- a/src/consensus/staging/mod.rs +++ b/src/consensus/staging/mod.rs @@ -4,6 +4,8 @@ use futures::{future::BoxFuture, stream::FuturesOrdered, StreamExt as _}; use log::{debug, error, info, trace, warn}; use tokio::sync::{mpsc::UnboundedSender, oneshot, Mutex}; +#[cfg(feature = "witness_forwarding")] +use crate::{crypto::{HashType, default_hash}, rpc::client::Client}; use crate::{config::AtomicConfig, crypto::{CachedBlock, CryptoServiceConnector}, proto::consensus::{ProtoQuorumCertificate, ProtoSignatureArrayEntry, ProtoVote}, rpc::{client::PinnedClient, SenderType}, utils::{channel::{Receiver, Sender}, timer::ResettableTimer, PerfCounter, StorageAck}}; use super::{app::AppCommand, batch_proposal::BatchProposerCommand, block_broadcaster::BlockBroadcasterCommand, block_sequencer::BlockSequencerControlCommand, client_reply::ClientReplyCommand, extra_2pc::{EngraftActionAfterFutureDone, EngraftTwoPCFuture, TwoPCCommand}, fork_receiver::{AppendEntriesStats, ForkReceiverCommand}, logserver::{self, LogServerCommand}, pacemaker::PacemakerCommand}; @@ -78,6 +80,16 @@ pub struct Staging { #[cfg(feature = "extra_2pc")] engraft_2pc_futures_rx: Receiver, + + #[cfg(feature = "witness_forwarding")] + witness_set_map: HashMap>, + + #[cfg(feature = "witness_forwarding")] + last_vote_hash: HashType, + + #[cfg(feature = "witness_forwarding")] + witness_client: PinnedClient, + } impl Staging { @@ -130,9 +142,26 @@ impl Staging { &leader_staging_event_order, )); + #[cfg(feature = "witness_forwarding")] + let witness_set_map = { + use crate::consensus::witness_receiver::WitnessReceiver; + + let config = config.get(); + let node_list = config.consensus_config.node_list.clone(); + let r_plus_one = config.consensus_config.node_list.len() - 2 * (config.consensus_config.liveness_u as usize); + WitnessReceiver::find_witness_set_map(node_list, r_plus_one) + }; + + #[cfg(feature = "witness_forwarding")] + let witness_client = Client::new_atomic(config.clone(), client.0.key_store.clone(), false, 0).into(); + let mut ret = Self { config, client, + + #[cfg(feature = "witness_forwarding")] + witness_client, + crypto, ci: 0, bci: 0, @@ -168,6 +197,13 @@ impl Staging { #[cfg(feature = "extra_2pc")] engraft_2pc_futures_rx, + + #[cfg(feature = "witness_forwarding")] + witness_set_map, + + #[cfg(feature = "witness_forwarding")] + last_vote_hash: default_hash(), + }; #[cfg(not(feature = "view_change"))] diff --git a/src/consensus/staging/steady_state.rs b/src/consensus/staging/steady_state.rs index 74d382aa..ec5627b3 100644 --- a/src/consensus/staging/steady_state.rs +++ b/src/consensus/staging/steady_state.rs @@ -6,14 +6,15 @@ use log::{debug, error, info, trace, warn}; use prost::Message; use tokio::{sync::oneshot, task::spawn_local}; +#[cfg(feature = "witness_forwarding")] +use crate::crypto::HashType; use crate::{ - consensus::{extra_2pc::{EngraftActionAfterFutureDone, EngraftTwoPCFuture, TwoPCCommand}, logserver::LogServerCommand, pacemaker::PacemakerCommand}, crypto::{CachedBlock, DIGEST_LENGTH}, proto::{ + consensus::{extra_2pc::{EngraftActionAfterFutureDone, EngraftTwoPCFuture, TwoPCCommand}, logserver::LogServerCommand, pacemaker::PacemakerCommand}, crypto::{CachedBlock, DIGEST_LENGTH, hash}, proto::{ consensus::{ - proto_block::Sig, ProtoNameWithSignature, ProtoQuorumCertificate, - ProtoSignatureArrayEntry, ProtoVote, + ProtoNameWithSignature, ProtoQuorumCertificate, ProtoSignatureArrayEntry, ProtoVote, proto_block::Sig }, rpc::ProtoPayload, - }, rpc::{client::PinnedClient, PinnedMessage, SenderType}, utils::StorageAck + }, rpc::{PinnedMessage, SenderType, client::PinnedClient}, utils::StorageAck }; use super::{ @@ -225,8 +226,21 @@ impl Staging { n: last_block.block.block.n, view: self.view, config_num: self.config_num, + + #[cfg(feature = "witness_forwarding")] + last_vote_hash: self.last_vote_hash.clone(), + + #[cfg(not(feature = "witness_forwarding"))] + last_vote_hash: Vec::new(), }; + #[cfg(feature = "witness_forwarding")] + { + // self.last_vote_hash = hash(&vote.encode_to_vec()); + use crate::crypto::default_hash; + self.last_vote_hash = default_hash(); + } + #[cfg(feature = "extra_2pc")] let (_vote_n, _vote_view, _vote_digest) = (vote.n, vote.view, vote.fork_digest.clone()); @@ -318,8 +332,21 @@ impl Staging { n: last_block.block.block.n, view: self.view, config_num: self.config_num, + + #[cfg(feature = "witness_forwarding")] + last_vote_hash: self.last_vote_hash.clone(), + + #[cfg(not(feature = "witness_forwarding"))] + last_vote_hash: Vec::new(), }; + #[cfg(feature = "witness_forwarding")] + { + use crate::crypto::default_hash; + // self.last_vote_hash = hash(&vote.encode_to_vec()); + self.last_vote_hash = default_hash(); + } + #[cfg(feature = "extra_2pc")] let (_vote_n, _vote_view, _vote_digest) = (vote.n, vote.view, vote.fork_digest.clone()); @@ -557,6 +584,49 @@ impl Staging { Ok(()) } + #[cfg(feature = "witness_forwarding")] + async fn send_block_to_witness_set(&mut self, block: CachedBlock) { + use ed25519_dalek::SIGNATURE_LENGTH; + + use crate::{proto::consensus::{ProtoBlockWitness, ProtoWitness, proto_witness::Body}, rpc::server::LatencyProfile}; + + #[cfg(not(feature = "always_sign"))] + { + panic!("Misconfigured protocol!"); + } + // return; + + let leader = self.config.get().consensus_config.get_leader_for_view(self.view); + let witness_set = self.witness_set_map.get(&leader).unwrap(); + let my_name = self.config.get().net_config.name.clone(); + let sig = match &block.block.sig { + Some(Sig::ProposerSig(sig)) => sig.clone(), + _ => panic!("Block is not signed!"), + }; + + let witness = ProtoWitness { + sender: leader, + receiver: my_name, + body: Some(Body::BlockWitness(ProtoBlockWitness { + block_hash: block.block_hash.clone(), + block_partial_hash: hash(&block.block_ser[SIGNATURE_LENGTH..]), + block_sig: sig, + parent_hash: block.block.parent.clone(), + n: block.block.n, + qc: block.block.qc.clone(), + })), + }; + + let payload = ProtoPayload { + message: Some(crate::proto::rpc::proto_payload::Message::Witness(witness)), + }; + let buf = payload.encode_to_vec(); + let sz = buf.len(); + let msg = PinnedMessage::from(buf, sz, SenderType::Anon); + let mut profile = LatencyProfile::new(); + let _res = PinnedClient::broadcast(&self.witness_client, witness_set, &msg, &mut profile, 0).await; + } + /// This has a lot of similarities with process_block_as_leader. #[async_recursion] pub(super) async fn process_block_as_follower( @@ -662,6 +732,10 @@ impl Staging { } } + + #[cfg(feature = "witness_forwarding")] + self.send_block_to_witness_set(block.clone()).await; + self.logserver_tx.send(LogServerCommand::NewBlock(block.clone())).await.unwrap(); self.__ae_seen_in_this_view += if this_is_final_block { 1 } else { 0 }; @@ -689,14 +763,18 @@ impl Staging { .map(|e| e.clone()) .collect::>(); + // warn!("qc_list size: {}", qc_list.len()); + for qc in qc_list.drain(..) { if !old_view_is_stable { // Try to see if this QC can stabilize the view. trace!("Trying to stabilize view {} with QC", self.view); self.maybe_stabilize_view(&qc).await; } - + + self.maybe_byzantine_commit(qc).await?; + } #[cfg(feature = "no_qc")] @@ -776,8 +854,53 @@ impl Staging { self.process_vote(sender, vote).await } + #[cfg(feature = "witness_forwarding")] + async fn send_vote_to_witness_set(&mut self, sender: String, vote: ProtoVote, block_hash: HashType) { + + #[cfg(not(feature = "always_sign"))] + { + panic!("Misconfigured protocol!"); + } + // return; + + use crate::{proto::consensus::{ProtoVoteWitness, ProtoWitness, proto_witness::Body}, rpc::server::LatencyProfile}; + + let witness_set = self.witness_set_map.get(&sender).unwrap(); + let my_name = self.config.get().net_config.name.clone(); + // This only works when "alway_sign" is set. + let n = vote.n; + // Find the signature with the matching sequence number. + let sig = vote.sig_array.iter().find(|e| e.n == n).unwrap(); + let vote_sig = sig.sig.clone(); + let witness = ProtoWitness { + sender, + receiver: my_name.clone(), + body: Some(Body::VoteWitness(ProtoVoteWitness { + block_hash, + n, + vote_sig, + })), + }; + let payload = ProtoPayload { + message: Some(crate::proto::rpc::proto_payload::Message::Witness(witness)), + }; + let buf = payload.encode_to_vec(); + let sz = buf.len(); + let msg = PinnedMessage::from(buf, sz, SenderType::Anon); + let mut profile = LatencyProfile::new(); + let _res = PinnedClient::broadcast(&self.witness_client, witness_set, &msg, &mut profile, 0).await; + } + + /// Precondition: The vote has been cryptographically verified to be from sender. async fn process_vote(&mut self, sender: String, mut vote: ProtoVote) -> Result<(), ()> { + #[cfg(feature = "witness_forwarding")] + { + // let block = self.pending_blocks.iter().find(|e| e.block.block.n == vote.n).unwrap(); + let block_hash = vote.fork_digest.clone(); + self.send_vote_to_witness_set(sender.clone(), vote.clone(), block_hash).await; + } + if !self.view_is_stable { info!("Processing vote on {} from {}", vote.n, sender); } @@ -823,6 +946,8 @@ impl Staging { self.do_byzantine_commit(self.bci, self.ci).await; // This is needed to prevent a memory leak. + + Ok(()) } @@ -901,7 +1026,9 @@ impl Staging { let mut qcs = Vec::new(); let thresh = self.byzantine_commit_threshold(); + let fast_thresh = self.byzantine_fast_path_threshold(); + for block in &mut self.pending_blocks { if block.qc_is_proposed && block.fast_qc_is_proposed { continue; @@ -919,7 +1046,6 @@ impl Staging { thresh }; - if block.vote_sigs.len() >= thresh { let qc = ProtoQuorumCertificate { n: block.block.block.n, @@ -957,7 +1083,10 @@ impl Staging { // But since this thread will block on block_broadcaster_tx.send, it will not be able to consume from qc_rx. // Once the queues are saturated, the system will deadlock. let _ = self.qc_tx.send(qc.clone()); + self.maybe_byzantine_commit(qc).await?; + + } Ok(()) diff --git a/src/consensus/witness_receiver.rs b/src/consensus/witness_receiver.rs new file mode 100644 index 00000000..d501607e --- /dev/null +++ b/src/consensus/witness_receiver.rs @@ -0,0 +1,318 @@ +use std::{collections::{BTreeMap, HashMap, HashSet}, sync::Arc, time::Duration}; +use log::{error, info, trace}; +use prost::Message as _; +use rand::{SeedableRng as _, seq::IteratorRandom}; +use rand_chacha::ChaCha20Rng; +use tokio::{sync::Mutex, task::JoinSet, sync::mpsc::UnboundedReceiver}; + +use crate::{config::AtomicConfig, crypto::{AtomicKeyStore, HashType, default_hash, hash}, proto::{consensus::{ProtoVoteWitness, ProtoWitness, proto_witness::Body}, rpc::ProtoPayload}, rpc::{PinnedMessage, SenderType, client::PinnedClient, server::LatencyProfile}, utils::{channel::{Receiver, Sender, make_channel}, timer::ResettableTimer}}; + +pub struct WitnessReceiver { + config: AtomicConfig, + client: PinnedClient, + witness_set_map: HashMap>, // sender -> list of witness sets. + my_audit_responsibility: HashSet, // list of nodes that I am responsible for auditing. + witness_rx: UnboundedReceiver, + witness_audit_txs: HashMap>, // If the load is too high, might split the responsibility into multiple tasks. + + handles: JoinSet<()>, + key_store: AtomicKeyStore, +} + + +struct AuditorState { + sender: String, + key_store: AtomicKeyStore, + block_hashes: BTreeMap, + votes: HashMap>, +} + +impl AuditorState { + pub fn new(sender: String, key_store: AtomicKeyStore) -> Self { + Self { sender, key_store, block_hashes: BTreeMap::new(), votes: HashMap::new() } + } + + fn display_hash(hash: &HashType) -> String { + hex::encode(hash.as_slice()).get(..5).unwrap().to_string() + } + + pub fn log_stats(&mut self) { + let (last_block_n, last_block_hash) = match self.block_hashes.last_entry() { + Some(entry) => (*entry.key(), Self::display_hash(entry.get())), + None => (0, Self::display_hash(&default_hash())), + }; + + let mut vote_last = HashMap::new(); + for (sender, votes) in self.votes.iter_mut() { + let last_vote = match votes.last_entry() { + Some(entry) => (*entry.key(), Self::display_hash(entry.get())), + None => (0, Self::display_hash(&default_hash())), + }; + vote_last.insert(sender.clone(), last_vote); + } + + let vote_stat_str = vote_last.iter().map(|(sender, (n, hash))| format!("{}: {} -> {}", sender, n, hash)).collect::>().join(", "); + + info!("Auditor stats for node: {}, last block: {} -> {}, last vote: {}", self.sender, last_block_n, last_block_hash, vote_stat_str); + + // Garbage collect + + self.block_hashes.retain(|n, _| if last_block_n > 10_000 {*n > last_block_n - 10_000 } else {true}); + for (sender, votes) in self.votes.iter_mut() { + votes.retain(|n, _| if last_block_n > 10_000 {*n > last_block_n - 10_000 } else {true}); + } + } + + pub fn process_witness(&mut self, witness: ProtoWitness) { + match witness.body { + Some(Body::BlockWitness(block_witness)) => { + let _sig = block_witness.block_sig.try_into(); + let Ok(_sig) = _sig else { + error!("Block signature is malformed for block n: {}, sender: {}", block_witness.n, witness.sender); + return; + }; + if !self.key_store.get().verify(&witness.sender, &_sig, &block_witness.block_partial_hash.as_slice()) { + error!("Block signature verification failed for block n: {}, sender: {}", block_witness.n, witness.sender); + } + if self.block_hashes.contains_key(&block_witness.n) { + let old_hash = self.block_hashes.get(&block_witness.n).unwrap(); + if old_hash != &block_witness.block_hash { + error!("Block hash mismatch for block n: {}, old hash: {}, new hash: {}", block_witness.n, Self::display_hash(old_hash), Self::display_hash(&block_witness.block_hash)); + } + } + + for (_, vote_buffer) in self.votes.iter() { + if vote_buffer.contains_key(&block_witness.n) { + let old_hash = vote_buffer.get(&block_witness.n).unwrap(); + if old_hash != &block_witness.block_hash { + error!("Vote hash mismatch for block n: {}, old hash: {}, new hash: {}", block_witness.n, Self::display_hash(old_hash), Self::display_hash(&block_witness.block_hash)); + } else { + trace!("Vote hash matches block hash for vote n: {} and sender: {}", block_witness.n, witness.sender); + } + } + } + + if block_witness.n >= 2 { + let prev_block_hash = self.block_hashes.get(&(block_witness.n - 1)); + let Some(prev_block_hash) = prev_block_hash else { + error!("Previous block hash not found for block n: {}", block_witness.n); + return; + }; + if prev_block_hash != &block_witness.parent_hash { + error!("Parent hash mismatch for block n: {}, old hash: {}, new hash: {}", block_witness.n, Self::display_hash(prev_block_hash), Self::display_hash(&block_witness.parent_hash)); + } else { + trace!("Parent hash matches for block n: {}, parent hash: {}", block_witness.n, Self::display_hash(prev_block_hash)); + } + } + self.block_hashes.insert(block_witness.n, block_witness.block_hash.clone()); + } + Some(Body::VoteWitness(vote_witness)) => { + let entry = self.votes.entry(witness.sender.clone()).or_insert_with(BTreeMap::new); + if entry.contains_key(&vote_witness.n) { + let old_hash = entry.get(&vote_witness.n).unwrap(); + if old_hash != &vote_witness.block_hash { + error!("Vote hash mismatch for vote n: {} and sender: {}, old hash: {}, new hash: {}", vote_witness.n, witness.sender, Self::display_hash(old_hash), Self::display_hash(&vote_witness.block_hash)); + } + } + if self.block_hashes.contains_key(&vote_witness.n) { + let block_hash = self.block_hashes.get(&vote_witness.n).unwrap(); + if block_hash != &vote_witness.block_hash { + error!("Block hash mismatch for vote n: {} and sender: {}, block hash: {}, vote hash: {}", vote_witness.n, witness.sender, Self::display_hash(block_hash), Self::display_hash(&vote_witness.block_hash)); + } else { + trace!("Vote hash matches block hash for vote n: {} and sender: {}", vote_witness.n, witness.sender); + } + } + entry.insert(vote_witness.n, vote_witness.block_hash.clone()); + + // Verify the vote signature. + let _sig = vote_witness.vote_sig.try_into(); + let Ok(_sig) = _sig else { + error!("Vote signature is malformed for vote n: {}, sender: {}", vote_witness.n, witness.sender); + return; + }; + if !self.key_store.get().verify(&witness.sender, &_sig, &vote_witness.block_hash.as_slice()) { + error!("Vote signature verification failed for vote n: {}, sender: {}", vote_witness.n, witness.sender); + } + } + None => { + error!("Witness has no body!"); + } + } + } +} + + +impl WitnessReceiver { + pub fn find_witness_set_map(mut node_list: Vec, r_plus_one: usize) -> HashMap> { + let mut res = HashMap::new(); + + let mut load_on_each_node: HashMap = HashMap::new(); + let max_load = r_plus_one; + + node_list.sort(); + + + for node in &node_list { + // Randomly select r_plus_one nodes from the list. + // Exclude the current node from the list. + // Seed the RNG with the node's name. + + let _node_list = node_list.iter() + .filter_map(|n| { + if n.eq(node) { + None + } else if *load_on_each_node.get(n).unwrap_or(&0) >= max_load { + None + } else { + Some(n.clone()) + } + }) + .collect::>(); + + let seed: [u8; 32] = hash(node.as_bytes())[..32].try_into().unwrap(); + let mut rng = ChaCha20Rng::from_seed(seed); + let witness_set = _node_list.iter() + .choose_multiple(&mut rng, r_plus_one) + .into_iter() + .map(|n| n.clone()) + .collect::>(); + for n in witness_set.iter() { + *load_on_each_node.entry(n.clone()).or_insert(0) += 1; + } + + res.insert(node.clone(), witness_set); + } + + trace!("Witness set map: {:?}", res); + + res + } + + fn find_my_audit_responsibility(name: &String, witness_set_map: &HashMap>) -> HashSet { + let mut res = HashSet::new(); + + for (sender, witness_set) in witness_set_map.iter() { + if witness_set.contains(name) { + res.insert(sender.clone()); + } + } + + res + } + + + pub fn new(config: AtomicConfig, client: PinnedClient, key_store: AtomicKeyStore, witness_rx: UnboundedReceiver) -> Self { + let _config = config.get(); + let node_list = _config.consensus_config.node_list.clone(); + let r_plus_one = _config.consensus_config.node_list.len() - 2 * (_config.consensus_config.liveness_u as usize); + let witness_set_map = Self::find_witness_set_map(node_list, r_plus_one); + let my_audit_responsibility = Self::find_my_audit_responsibility(&_config.net_config.name, &witness_set_map); + let handles = JoinSet::new(); + let witness_audit_txs = HashMap::new(); + Self { config, client, witness_set_map, my_audit_responsibility, witness_rx, witness_audit_txs, handles, key_store } + } + + pub async fn run(witness_receiver: Arc>) { + let mut witness_receiver = witness_receiver.lock().await; + let _chan_depth = witness_receiver.config.get().rpc_config.channel_depth as usize; + + let _audit_responsibility = witness_receiver.my_audit_responsibility.clone(); + trace!("Audit responsibility: {:?} Witness set map: {:?}", _audit_responsibility, witness_receiver.witness_set_map); + + // Auditing threads. + // for node in _audit_responsibility.iter() { + for _ in 0..1 { // TODO: Handle load-balancing logic. + let (witness_audit_tx, witness_audit_rx) = make_channel(_chan_depth); + witness_receiver.witness_audit_txs.insert("*".to_string(), witness_audit_tx); + // let _node = node.clone(); + let log_timeout = witness_receiver.config.get().app_config.logger_stats_report_ms; + let log_timer = ResettableTimer::new(Duration::from_millis(log_timeout)); + log_timer.run().await; + let key_store = witness_receiver.key_store.clone(); + witness_receiver.handles.spawn(async move { + // TODO: Handle load-balancing logic. + let mut state = AuditorState::new("*".to_string(), key_store); + loop { + tokio::select! { + _ = log_timer.wait() => { + state.log_stats(); + } + witness = witness_audit_rx.recv() => { + if let Some(witness) = witness { + state.process_witness(witness); + } + } + } + } + }); + } + + + // Forward to other witness thread. + let (witness_forward_tx, witness_forward_rx) = make_channel::(_chan_depth); + let _witness_set_map = witness_receiver.witness_set_map.clone(); + let _client = witness_receiver.client.clone(); + witness_receiver.handles.spawn(async move { + while let Some(witness) = witness_forward_rx.recv().await { + // Broadcast this witness to the witness set of the sender. + let witness_set = _witness_set_map.get(&witness.sender).unwrap(); + + let payload = ProtoPayload { + message: Some(crate::proto::rpc::proto_payload::Message::Witness(witness)), + }; + let buf = payload.encode_to_vec(); + + let sz = buf.len(); + let msg = PinnedMessage::from(buf, sz, SenderType::Anon); + + let mut profile = LatencyProfile::new(); + let _res = PinnedClient::broadcast(&_client, witness_set, &msg, &mut profile, 0).await; + } + }); + + while let Some(witness) = witness_receiver.witness_rx.recv().await { + witness_receiver.maybe_forward_witness(&witness, &witness_forward_tx).await; + witness_receiver.maybe_audit_witness(witness).await; + } + } + + async fn maybe_forward_witness(&mut self, witness: &ProtoWitness, witness_forward_tx: &Sender) { + let Some(body) = witness.body.as_ref() else { + return; + }; + + let Body::BlockWitness(block_witness) = body else { + return; + }; + + let sender = witness.sender.clone(); + + // Decompose the vote qc into a vector of votes and create a witness for each vote. + for qc in block_witness.qc.iter() { + for sig in qc.sig.iter() { + let vote_witness = ProtoVoteWitness { + block_hash: qc.digest.clone(), + vote_sig: sig.sig.clone(), + n: qc.n, + }; + + let witness = ProtoWitness { + sender: sig.name.clone(), + receiver: sender.clone(), + body: Some(Body::VoteWitness(vote_witness)), + }; + + witness_forward_tx.send(witness).await.unwrap(); + } + } + + } + + async fn maybe_audit_witness(&mut self, witness: ProtoWitness) { + for (_, witness_audit_tx) in self.witness_audit_txs.iter() { + // TODO: Handle load-balancing logic. + witness_audit_tx.send(witness.clone()).await.unwrap(); + } + } + +} \ No newline at end of file diff --git a/src/crypto/service.rs b/src/crypto/service.rs index e2e9100d..18ba1a1b 100644 --- a/src/crypto/service.rs +++ b/src/crypto/service.rs @@ -1,7 +1,7 @@ use std::{io::{BufReader, Error, ErrorKind}, ops::Deref, pin::Pin, sync::{atomic::fence, Arc}}; use bytes::{BufMut, BytesMut}; -use ed25519_dalek::{verify_batch, Signature, SIGNATURE_LENGTH}; +use ed25519_dalek::{SIGNATURE_LENGTH, Signature, VerifyingKey, verify_batch}; use futures::SinkExt; use itertools::min; use log::{info, trace, warn}; @@ -101,6 +101,8 @@ fn verify_qc(keystore: &KeyStore, qc: &ProtoQuorumCertificate, min_len: usize) - let res = verify_batch(&msgs, sigs.as_slice(), &keys) .is_ok(); + // let res = dummy_verify_batch(&msgs, sigs.as_slice(), &keys); + if !res { warn!("QC verification failed"); } @@ -109,6 +111,16 @@ fn verify_qc(keystore: &KeyStore, qc: &ProtoQuorumCertificate, min_len: usize) - } +fn dummy_verify_batch(msgs: &[&[u8]], sigs: &[Signature], keys: &[VerifyingKey]) -> bool { + use ed25519_dalek::Verifier; + for i in 0..msgs.len() { + if !keys[i].verify(msgs[i], &sigs[i]).is_ok() { + return false; + } + } + true +} + enum CryptoServiceCommand { Hash(Vec, oneshot::Sender>), Sign(Vec, oneshot::Sender<[u8; SIGNATURE_LENGTH]>), diff --git a/src/proto/consensus.proto b/src/proto/consensus.proto index 2229ba3d..2de26664 100644 --- a/src/proto/consensus.proto +++ b/src/proto/consensus.proto @@ -108,4 +108,33 @@ message ProtoVote { uint64 n = 3; uint64 view = 4; uint64 config_num = 5; + + bytes last_vote_hash = 6; // Only for PeerReview. +} + + +message ProtoBlockWitness { + bytes block_hash = 1; + bytes block_partial_hash = 2; + bytes block_sig = 3; + bytes parent_hash = 4; + uint64 n = 5; + repeated ProtoQuorumCertificate qc = 6; +} + +message ProtoVoteWitness { + bytes block_hash = 1; + bytes vote_sig = 2; + uint64 n = 3; +} + +message ProtoWitness { + string sender = 1; + string receiver = 2; + + oneof body { + ProtoBlockWitness block_witness = 3; + ProtoVoteWitness vote_witness = 4; + } + } \ No newline at end of file diff --git a/src/proto/rpc.proto b/src/proto/rpc.proto index bd278a2c..399f9491 100644 --- a/src/proto/rpc.proto +++ b/src/proto/rpc.proto @@ -23,5 +23,7 @@ message ProtoPayload { proto.checkpoint.ProtoBackfillNack backfill_nack = 7; + proto.consensus.ProtoWitness witness = 8; + } } \ No newline at end of file