Skip to content

Commit

Permalink
Cleanup workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Alberto Sonnino committed Jun 17, 2021
1 parent 9488eba commit 77e735a
Show file tree
Hide file tree
Showing 126 changed files with 6,013 additions and 8,288 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
.venv/
.idea/*
rust/.idea/*
.db-*

target/
!/**/src/**/target/
Expand Down
70 changes: 65 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,74 @@
# Narwhal HotStuff
# Narwhal and Tusk

[![rustc](https://img.shields.io/badge/rustc-1.48+-blue?style=flat-square&logo=rust)](https://www.rust-lang.org)
[![license](https://img.shields.io/badge/license-Apache-blue.svg?style=flat-square)](LICENSE)

The code in this branch is a prototype of Narwhal HotStuff (Hotstuff-over-Narwhal). It supplements the paper [Narwhal and Tusk: A DAG-based Mempool and Efficient BFT Consensus](https://arxiv.org/pdf/2105.11827.pdf) enabling reproducible results. There are no plans to maintain this branch. The [master branch](https://github.com/facebookresearch/narwhal) contains the most recent and polished version of this codebase.
This repo contains a prototype of Narwhal and Tusk. It supplements the paper [Narwhal and Tusk: A DAG-based Mempool and Efficient BFT Consensus](https://arxiv.org/pdf/2105.11827.pdf).

**Note:** Please run tests on a single thread:
## Overview
We propose separating the task of transaction dissemination from transaction ordering, to enable high-performance
Byzantine fault-tolerant consensus in a permissioned setting. To this end, we design and evaluate a mempool protocol,
Narwhal, specializing in high-throughput reliable dissemination and storage of causal histories of transactions. Narwhal
tolerates an asynchronous network and maintains its performance despite failures. We demonstrate that composing
Narwhal with a partially synchronous consensus protocol (HotStuff) yields significantly better throughput even in the
presence of faults. However, loss of liveness during view-changes can result in high latency. To achieve overall good
performance when faults occur we propose Tusk, a zero-message overhead asynchronous consensus protocol embedded within Narwhal. We demonstrate its high performance under a variety of configurations and faults. Further, Narwhal is designed to easily scale-out using multiple workers at each validator, and we demonstrate that there is no foreseeable limit to the throughput we can achieve for consensus,
with a few seconds latency.

As a summary of results, on a Wide Area Network (WAN), Hotstuff over Narwhal achieves 170,000 tx/sec with a 2.5-sec
latency instead of 1,800 tx/sec with 1-sec latency of Hotstuff.
Additional workers increase throughput linearly to 600,000
tx/sec without any latency increase. Tusk achieves 140,000
tx/sec with 4 seconds latency or 20x better than the state-of-the-art asynchronous protocol. Under faults, both Narwhal
based protocols maintain high throughput, but the HotStuff
variant suffers from slightly higher latency.

## Getting Started
The core protocols are written in Rust, but all benchmarking scripts are written in Python and run with [Fabric](http://www.fabfile.org/).
To deploy and benchmark a testbed of 4 nodes on your local machine, clone the repo and compile it in release mode:
```
$ git clone https://github.com/facebookresearch/narwhal.git
$ cd rust
$ cargo build --release
```
Then install the Python dependencies:
```
$ cd ../scripts
$ pip install -r requirements.txt
```
cargo test -- --test-threads 1
You also need to install Clang (required by rocksdb) and [tmux](https://linuxize.com/post/getting-started-with-tmux/#installing-tmux) (which runs all nodes and clients in the background). Finally, run a local benchmark using fabric:
```
$ fab local
```
This command may take a long time the first time you run it (compiling rust code in `release` mode may be slow) and you can customize a number of benchmark parameters in `fabfile.py`. When the benchmark terminates, it displays a summary of the execution similarly to the one below.
```
-----------------------------------------
SUMMARY:
-----------------------------------------
Committee size: 4 nodes
Number of workers: 1 worker(s) per node
Faults: 0 nodes
Transaction size: 512 B
Max batch size: 1,000 txs
Transaction rate: 60,000 tx/s
Dag Results:
+ Total certified bytes: 799,468,544 B
+ Execution time: 29,646 ms
+ Estimated BPS: 26,967,619 B/s
+ Estimated TPS: 52,671 txs/s
+ Block Latency: 6 ms
+ Client Latency: 93 ms
Consensus Results:
+ Total committed bytes: 786,986,496 B
+ Execution time: 29,542 ms
+ Estimated BPS: 26,639,130 B/s
+ Estimated TPS: 52,030 txs/s
+ Block Latency: 395 ms
+ Client Latency: 482 ms
-----------------------------------------
```

## License
This software is licensed as [Apache 2.0](LICENSE).
This software is licensed as [Apache 2.0](LICENSE).
11 changes: 1 addition & 10 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,2 @@
[workspace]
members = [
"dag_core",
"bench_worker",
"hotstuff",
"network_hs",
]

[profile.release]
debug = true

members = ["primary", "node", "store", "crypto", "worker", "consensus", "network", "config"]
23 changes: 0 additions & 23 deletions rust/bench_worker/Cargo.toml

This file was deleted.

4 changes: 4 additions & 0 deletions scripts/.gitignore → rust/benchmark/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
node
benchmark_client
results

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
Empty file added rust/benchmark/aws/__init__
Empty file.
242 changes: 242 additions & 0 deletions rust/benchmark/aws/instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
import boto3
from botocore.exceptions import ClientError
from collections import defaultdict, OrderedDict
from time import sleep

from benchmark.utils import Print, BenchError, progress_bar
from aws.settings import Settings, SettingsError


class AWSError(Exception):
def __init__(self, error):
assert isinstance(error, ClientError)
self.message = error.response['Error']['Message']
self.code = error.response['Error']['Code']
super().__init__(self.message)


class InstanceManager:
INSTANCE_NAME = 'dag-node'
SECURITY_GROUP_NAME = 'dag'

def __init__(self, settings):
assert isinstance(settings, Settings)
self.settings = settings
self.clients = OrderedDict()
for region in settings.aws_regions:
self.clients[region] = boto3.client('ec2', region_name=region)

@classmethod
def make(cls, settings_file='settings.json'):
try:
return cls(Settings.load(settings_file))
except SettingsError as e:
raise BenchError('Failed to load settings', e)

def _get(self, state):
# Possible states are: 'pending', 'running', 'shutting-down',
# 'terminated', 'stopping', and 'stopped'.
ids, ips = defaultdict(list), defaultdict(list)
for region, client in self.clients.items():
r = client.describe_instances(
Filters=[
{
'Name': 'tag:Name',
'Values': [self.INSTANCE_NAME]
},
{
'Name': 'instance-state-name',
'Values': state
}
]
)
instances = [y for x in r['Reservations'] for y in x['Instances']]
for x in instances:
ids[region] += [x['InstanceId']]
if 'PublicIpAddress' in x:
ips[region] += [x['PublicIpAddress']]
return ids, ips

def _wait(self, state):
# Possible states are: 'pending', 'running', 'shutting-down',
# 'terminated', 'stopping', and 'stopped'.
while True:
sleep(1)
ids, _ = self._get(state)
if sum(len(x) for x in ids.values()) == 0:
break

def _create_security_group(self, client):
client.create_security_group(
Description='HotStuff node',
GroupName=self.SECURITY_GROUP_NAME,
)

client.authorize_security_group_ingress(
GroupName=self.SECURITY_GROUP_NAME,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 22,
'ToPort': 22,
'IpRanges': [{
'CidrIp': '0.0.0.0/0',
'Description': 'Debug SSH access',
}],
'Ipv6Ranges': [{
'CidrIpv6': '::/0',
'Description': 'Debug SSH access',
}],
},
{
'IpProtocol': 'tcp',
'FromPort': self.settings.base_port,
'ToPort': self.settings.base_port + 2_000,
'IpRanges': [{
'CidrIp': '0.0.0.0/0',
'Description': 'Dag port',
}],
'Ipv6Ranges': [{
'CidrIpv6': '::/0',
'Description': 'Dag port',
}],
}
]
)

def _get_ami(self, client):
# The AMI changes with regions.
response = client.describe_images(
Filters=[{
'Name': 'description',
'Values': ['Canonical, Ubuntu, 20.04 LTS, amd64 focal image build on 2020-10-26']
}]
)
return response['Images'][0]['ImageId']

def create_instances(self, instances):
assert isinstance(instances, int) and instances > 0

# Create the security group in every region.
for client in self.clients.values():
try:
self._create_security_group(client)
except ClientError as e:
error = AWSError(e)
if error.code != 'InvalidGroup.Duplicate':
raise BenchError('Failed to create security group', error)

try:
# Create all instances.
size = instances * len(self.clients)
progress = progress_bar(
self.clients.values(), prefix=f'Creating {size} instances'
)
for client in progress:
client.run_instances(
ImageId=self._get_ami(client),
InstanceType=self.settings.instance_type,
KeyName=self.settings.key_name,
MaxCount=instances,
MinCount=instances,
SecurityGroups=[self.SECURITY_GROUP_NAME],
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': [{
'Key': 'Name',
'Value': self.INSTANCE_NAME
}]
}],
EbsOptimized=True,
BlockDeviceMappings=[{
'DeviceName': '/dev/sda1',
'Ebs': {
'VolumeType': 'gp2',
'VolumeSize': 200,
'DeleteOnTermination': True
}
}],
)

# Wait for the instances to boot.
Print.info('Waiting for all instances to boot...')
self._wait(['pending'])
Print.heading(f'Successfully created {size} new instances')
except ClientError as e:
raise BenchError('Failed to create AWS instances', AWSError(e))

def terminate_instances(self):
try:
ids, _ = self._get(['pending', 'running', 'stopping', 'stopped'])
size = sum(len(x) for x in ids.values())
if size == 0:
Print.heading(f'All instances are shut down')
return

# Terminate instances.
for region, client in self.clients.items():
if ids[region]:
client.terminate_instances(InstanceIds=ids[region])

# Wait for all instances to properly shut down.
Print.info('Waiting for all instances to shut down...')
self._wait(['shutting-down'])
for client in self.clients.values():
client.delete_security_group(
GroupName=self.SECURITY_GROUP_NAME
)

Print.heading(f'Testbed of {size} instances destroyed')
except ClientError as e:
raise BenchError('Failed to terminate instances', AWSError(e))

def start_instances(self, max):
size = 0
try:
ids, _ = self._get(['stopping', 'stopped'])
for region, client in self.clients.items():
if ids[region]:
target = ids[region]
target = target if len(target) < max else target[:max]
size += len(target)
client.start_instances(InstanceIds=target)
Print.heading(f'Starting {size} instances')
except ClientError as e:
raise BenchError('Failed to start instances', AWSError(e))

def stop_instances(self):
try:
ids, _ = self._get(['pending', 'running'])
for region, client in self.clients.items():
if ids[region]:
client.stop_instances(InstanceIds=ids[region])
size = sum(len(x) for x in ids.values())
Print.heading(f'Stopping {size} instances')
except ClientError as e:
raise BenchError(AWSError(e))

def hosts(self, flat=False):
try:
_, ips = self._get(['pending', 'running'])
return [x for y in ips.values() for x in y] if flat else ips
except ClientError as e:
raise BenchError('Failed to gather instances IPs', AWSError(e))

def print_info(self):
hosts = self.hosts()
key = self.settings.key_path
text = ''
for region, ips in hosts.items():
text += f'\n Region: {region.upper()}\n'
for i, ip in enumerate(ips):
new_line = '\n' if (i+1) % 6 == 0 else ''
text += f'{new_line} {i}\tssh -i {key} ubuntu@{ip}\n'
print(
'\n'
'----------------------------------------------------------------\n'
' INFO:\n'
'----------------------------------------------------------------\n'
f' Available machines: {sum(len(x) for x in hosts.values())}\n'
f'{text}'
'----------------------------------------------------------------\n'
)
Loading

0 comments on commit 77e735a

Please sign in to comment.