Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2p synchronization protocol #89

Open
wants to merge 11 commits into
base: dev
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Deploy dev images to GHCR
on:
push:
branches:
- 'db-compactions'
- 'p2p-rewrite'

jobs:
push-store-image:
Expand Down
86 changes: 70 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,80 @@ This can be configured using the `QUBIC_NODES_QUBIC_PEER_LIST` environment varia
## Other optional configuration parameters for qubic-archiver can be specified as env variable by adding them to docker compose:

```bash
$QUBIC_ARCHIVER_SERVER_READ_TIMEOUT <duration> (default: 5s)
$QUBIC_ARCHIVER_SERVER_WRITE_TIMEOUT <duration> (default: 5s)
$QUBIC_ARCHIVER_SERVER_SHUTDOWN_TIMEOUT <duration> (default: 5s)
$QUBIC_ARCHIVER_SERVER_HTTP_HOST <string> (default: 0.0.0.0:8000)
$QUBIC_ARCHIVER_SERVER_GRPC_HOST <string> (default: 0.0.0.0:8001)
$QUBIC_ARCHIVER_SERVER_NODE_SYNC_THRESHOLD <int> (default: 3)
$QUBIC_ARCHIVER_SERVER_CHAIN_TICK_FETCH_URL <string> (default: http://127.0.0.1:8080/max-tick)
$QUBIC_ARCHIVER_SERVER_READ_TIMEOUT <duration> (default: 5s)
$QUBIC_ARCHIVER_SERVER_WRITE_TIMEOUT <duration> (default: 5s)
$QUBIC_ARCHIVER_SERVER_SHUTDOWN_TIMEOUT <duration> (default: 5s)
$QUBIC_ARCHIVER_SERVER_HTTP_HOST <string> (default: 0.0.0.0:8000)
$QUBIC_ARCHIVER_SERVER_GRPC_HOST <string> (default: 0.0.0.0:8001)
$QUBIC_ARCHIVER_SERVER_NODE_SYNC_THRESHOLD <int> (default: 3)
$QUBIC_ARCHIVER_SERVER_CHAIN_TICK_FETCH_URL <string> (default: http://127.0.0.1:8080/max-tick)

$QUBIC_ARCHIVER_POOL_NODE_FETCHER_URL <string> (default: http://127.0.0.1:8080/status)
$QUBIC_ARCHIVER_POOL_NODE_FETCHER_TIMEOUT <duration> (default: 2s)
$QUBIC_ARCHIVER_POOL_INITIAL_CAP <int> (default: 5)
$QUBIC_ARCHIVER_POOL_MAX_IDLE <int> (default: 20)
$QUBIC_ARCHIVER_POOL_MAX_CAP <int> (default: 30)
$QUBIC_ARCHIVER_POOL_IDLE_TIMEOUT <duration> (default: 15s)

$QUBIC_ARCHIVER_QUBIC_NODE_PORT <string> (default: 21841)
$QUBIC_ARCHIVER_QUBIC_STORAGE_FOLDER <string> (default: store)
$QUBIC_ARCHIVER_QUBIC_PROCESS_TICK_TIMEOUT <duration> (default: 5s)

$QUBIC_ARCHIVER_POOL_NODE_FETCHER_URL <string> (default: http://127.0.0.1:8080/status)
$QUBIC_ARCHIVER_POOL_NODE_FETCHER_TIMEOUT <duration> (default: 2s)
$QUBIC_ARCHIVER_POOL_INITIAL_CAP <int> (default: 5)
$QUBIC_ARCHIVER_POOL_MAX_IDLE <int> (default: 20)
$QUBIC_ARCHIVER_POOL_MAX_CAP <int> (default: 30)
$QUBIC_ARCHIVER_POOL_IDLE_TIMEOUT <duration> (default: 15s)
$QUBIC_ARCHIVER_STORE_RESET_EMPTY_TICK_KEYS <bool> (default: false)

$QUBIC_ARCHIVER_QUBIC_NODE_PORT <string> (default: 21841)
$QUBIC_ARCHIVER_QUBIC_STORAGE_FOLDER <string> (default: store)
$QUBIC_ARCHIVER_QUBIC_PROCESS_TICK_TIMEOUT <duration> (default: 5s)
$QUBIC_ARCHIVER_SYNC_ENABLE <bool> (default: false)
$QUBIC_ARCHIVER_SYNC_SOURCES <string>,[string...] (default: localhost:8001) // TODO: To be changed with official bootstrap node list
$QUBIC_ARCHIVER_SYNC_RESPONSE_TIMEOUT <duration> (default: 1m)
$QUBIC_ARCHIVER_SYNC_ENABLE_COMPRESSION <bool> (default: true)
$QUBIC_ARCHIVER_SYNC_RETRY_COUNT <int> (default: 10)

$QUBIC_ARCHIVER_BOOTSTRAP_ENABLE <bool> (default: true)
$QUBIC_ARCHIVER_BOOTSTRAP_MAX_REQUESTED_ITEMS <int> (default: 1000)
$QUBIC_ARCHIVER_BOOTSTRAP_MAX_CONCURRENT_CONNECTIONS <int> (default: 20)
$QUBIC_ARCHIVER_BOOTSTRAP_BATCH_SIZE <int> (default: 10)
```

## Peer to Peer data synchronization

Archiver supports data synchronization between nodes.
Nodes can be configured to either synchronize from other nodes (client nodes), or provide information to other nodes once they are up to date (bootstrap nodes).

### Overview
Upon starting a node with the `QUBIC_ARCHIVER_SYNC_ENABLE` path variable set to `true` it will start synchronizing information from the bootstrap nodes specified using the `QUBIC_ARCHIVER_SYNC_SOURCES` variable.
The synchronization works as follows:
1. The client will attempt to establish a connection to the specified bootstrap nodes.
2. The client will verify that it's version is compatible with each node. Incompatible nodes will not be used to synchronize information.
3. From one of the nodes, metadata will be fetched in order to calculate the synchronization delta, or the difference in information between the client and the bootstrap, as well as how many ticks the bootstrap can provide per request.
4. Missing epoch related information such as the computor list will be fetched and saved.
5. The client will fetch missing tick ranges in batches. This is done concurrently in order to lower the synchronization duration.
6. Upon fetching a batch of ticks, they are cryptographically verified to ensure data accuracy. This is also done concurrently to save time.
7. After verification is finished, the ticks are saved to the database, and the last two steps are repeated until all the information has been synchronized.
8. After the synchronization is finished, the client will resume to normal operation and synchronize the current epoch directly from the Qubic network.

Depending on the hardware and network conditions, the synchronization duration can vary.
Out tests show that a machine with a 16 core, 5Ghz CPU can synchronize an epoch in about 20 - 30 minutes at a rate of 3 - 4 thousand ticks per minute.
Storage speed is also a factor to consider, and in some cases may become a bottleneck.

>[!WARNING]
> It is not recommended to synchronize a node from zero close to the epoch transition.
> While synchronization of past epochs may finish before the transition, synchronization from the network itself is a couple of times slower, thus the current epoch may not be synchronized in time.

### Configuration

#### Client

- `QUBIC_ARCHIVER_SYNC_ENABLE`: Whether to enable the synchronization feature or not.
- `QUBIC_ARCHIVER_SYNC_SOURCES`: The list of bootstrap nodes to fetch from.
- `QUBIC_ARCHIVER_SYNC_RESPONSE_TIMEOUT`: The maximum time fetching a tick batch should take.
- `QUBIC_ARCHIVER_SYNC_RETRY_COUNT`: The number of times to retry fetching a tick range, in the event that the bootstrap has reached the maximum number of connections.

#### Bootstrap

- `QUBIC_ARCHIVER_BOOTSTRAP_ENABLE`: Whether to enable the bootstrap functionality or not.
- `QUBIC_ARCHIVER_BOOTSTRAP_MAX_REQUESTED_ITEMS`: The maximum number of ticks per request.
- `QUBIC_ARCHIVER_BOOTSTRAP_MAX_CONCURRENT_CONNECTIONS`: The maximum number of concurrent connections across all clients.
- `QUBIC_ARCHIVER_BOOTSTRAP_BATCH_SIZE`: The number of ticks that are sent to the client at a time.

## Run with docker-compose:

```bash
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/cockroachdb/pebble v1.1.2
github.com/google/go-cmp v0.6.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0
github.com/pingcap/errors v0.11.4
github.com/pkg/errors v0.9.1
github.com/qubic/go-node-connector v0.10.2
github.com/qubic/go-schnorrq v1.0.1
Expand Down
34 changes: 31 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ func run() error {
Store struct {
ResetEmptyTickKeys bool `conf:"default:false"`
}
Sync struct {
Enable bool `conf:"default:false"`
Sources []string `conf:"default:localhost:8001"`
ResponseTimeout time.Duration `conf:"default:1m"`
EnableCompression bool `conf:"default:true"`
RetryCount int `conf:"default:10"`
}
Bootstrap struct {
Enable bool `conf:"default:true"`
MaxRequestedItems int `conf:"default:1000"`
MaxConcurrentConnections int `conf:"default:20"`
BatchSize int `conf:"default:10"`
}
}

if err := conf.Parse(os.Args[1:], prefix, &cfg); err != nil {
Expand Down Expand Up @@ -148,7 +161,7 @@ func run() error {
}
}

err = tick.CalculateEmptyTicksForAllEpochs(ps)
err = tick.CalculateEmptyTicksForAllEpochs(ps, false)
if err != nil {
return errors.Wrap(err, "calculating empty ticks for all epochs")
}
Expand All @@ -166,7 +179,22 @@ func run() error {
return errors.Wrap(err, "creating qubic pool")
}

rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, cfg.Server.NodeSyncThreshold, cfg.Server.ChainTickFetchUrl, ps, p)
bootstrapConfiguration := rpc.BootstrapConfiguration{
Enable: cfg.Bootstrap.Enable,
MaximumRequestedItems: cfg.Bootstrap.MaxRequestedItems,
BatchSize: cfg.Bootstrap.BatchSize,
MaxConcurrentConnections: cfg.Bootstrap.MaxConcurrentConnections,
}

syncConfiguration := processor.SyncConfiguration{
Enable: cfg.Sync.Enable,
Sources: cfg.Sync.Sources,
ResponseTimeout: cfg.Sync.ResponseTimeout,
EnableCompression: cfg.Sync.EnableCompression,
RetryCount: cfg.Sync.RetryCount,
}

rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, cfg.Server.NodeSyncThreshold, cfg.Server.ChainTickFetchUrl, ps, p, bootstrapConfiguration, syncConfiguration)
err = rpcServer.Start()
if err != nil {
return errors.Wrap(err, "starting rpc server")
Expand All @@ -175,7 +203,7 @@ func run() error {
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

proc := processor.NewProcessor(p, ps, cfg.Qubic.ProcessTickTimeout)
proc := processor.NewProcessor(p, ps, cfg.Qubic.ProcessTickTimeout, syncConfiguration)
procErrors := make(chan error, 1)

// Start the service listening for requests.
Expand Down
21 changes: 20 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,40 @@ func (e *TickInTheFutureError) Error() string {
return errors.Errorf("Requested tick %d is in the future. Latest tick is: %d", e.requestedTick, e.latestTick).Error()
}

type SyncConfiguration struct {
Enable bool
Sources []string
ResponseTimeout time.Duration
EnableCompression bool
RetryCount int
}

type Processor struct {
pool *qubic.Pool
ps *store.PebbleStore
processTickTimeout time.Duration
SyncConfiguration SyncConfiguration
}

func NewProcessor(p *qubic.Pool, ps *store.PebbleStore, processTickTimeout time.Duration) *Processor {
func NewProcessor(p *qubic.Pool, ps *store.PebbleStore, processTickTimeout time.Duration, syncConfiguration SyncConfiguration) *Processor {
return &Processor{
pool: p,
ps: ps,
processTickTimeout: processTickTimeout,
SyncConfiguration: syncConfiguration,
}
}

func (p *Processor) Start() error {

if p.SyncConfiguration.Enable {
syncProcessor := NewSyncProcessor(p.SyncConfiguration, p.ps)
err := syncProcessor.Start()
if err != nil {
return errors.Wrap(err, "performing synchronization")
}
}

for {
err := p.processOneByOne()
if err != nil {
Expand Down
Loading
Loading