Skip to content

Commit 384dbc8

Browse files
committed
cmd, nodes: enable shared consensus
1 parent 777e51e commit 384dbc8

File tree

6 files changed

+192
-154
lines changed

6 files changed

+192
-154
lines changed

cmd/clusterd/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func main() {
163163
defer s.Close()
164164
go s.Run(ctx)
165165

166-
nm := nodes.NewManager(dir, cm, s, log.Named("cluster"))
166+
nm := nodes.NewManager(dir, cm, s, nodes.WithLog(log.Named("cluster")))
167167
defer nm.Close()
168168

169169
server := &http.Server{

nodes/hostd.go

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,6 @@ func (m *Manager) StartHostd(ctx context.Context, sk types.PrivateKey, ready cha
7474
}
7575
defer httpListener.Close()
7676

77-
syncerListener, err := net.Listen("tcp", ":0")
78-
if err != nil {
79-
return fmt.Errorf("failed to listen on syncer address: %w", err)
80-
}
81-
defer syncerListener.Close()
82-
8377
rhp2Listener, err := net.Listen("tcp", ":0")
8478
if err != nil {
8579
return fmt.Errorf("failed to listen on rhp2 addr: %w", err)
@@ -98,57 +92,63 @@ func (m *Manager) StartHostd(ctx context.Context, sk types.PrivateKey, ready cha
9892
}
9993
defer rhp4Listener.Close()
10094

95+
network := m.chain.TipState().Network
96+
10197
var cm *chain.Manager
10298
var s *syncer.Syncer
99+
if m.shareConsensus {
100+
cm = m.chain
101+
s = m.syncer
102+
} else {
103+
// start a chain manager
104+
genesisIndex, ok := m.chain.BestIndex(0)
105+
if !ok {
106+
return errors.New("failed to get genesis index")
107+
}
108+
genesis, ok := m.chain.Block(genesisIndex.ID)
109+
if !ok {
110+
return errors.New("failed to get genesis block")
111+
}
112+
bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db"))
113+
if err != nil {
114+
return fmt.Errorf("failed to open bolt db: %w", err)
115+
}
116+
defer bdb.Close()
117+
dbstore, tipState, err := chain.NewDBStore(bdb, network, genesis)
118+
if err != nil {
119+
return fmt.Errorf("failed to create dbstore: %w", err)
120+
}
103121

104-
// start a chain manager
105-
network := m.chain.TipState().Network
106-
genesisIndex, ok := m.chain.BestIndex(0)
107-
if !ok {
108-
return errors.New("failed to get genesis index")
109-
}
110-
genesis, ok := m.chain.Block(genesisIndex.ID)
111-
if !ok {
112-
return errors.New("failed to get genesis block")
113-
}
114-
bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db"))
115-
if err != nil {
116-
return fmt.Errorf("failed to open bolt db: %w", err)
117-
}
118-
defer bdb.Close()
119-
dbstore, tipState, err := chain.NewDBStore(bdb, network, genesis)
120-
if err != nil {
121-
return fmt.Errorf("failed to create dbstore: %w", err)
122-
}
123-
cm = chain.NewManager(dbstore, tipState)
122+
cm = chain.NewManager(dbstore, tipState)
124123

125-
// start a syncer
126-
_, port, err := net.SplitHostPort(syncerListener.Addr().String())
127-
if err != nil {
128-
return fmt.Errorf("failed to split syncer address: %w", err)
129-
}
130-
s = syncer.New(syncerListener, cm, testutil.NewMemPeerStore(), gateway.Header{
131-
GenesisID: genesisIndex.ID,
132-
UniqueID: gateway.GenerateUniqueID(),
133-
NetAddress: "127.0.0.1:" + port,
134-
}, syncer.WithLogger(log.Named("syncer")),
135-
syncer.WithPeerDiscoveryInterval(5*time.Second),
136-
syncer.WithSyncInterval(5*time.Second),
137-
syncer.WithMaxInboundPeers(10000),
138-
syncer.WithMaxOutboundPeers(10000))
139-
defer s.Close()
140-
go s.Run(ctx)
141-
node.SyncerAddress = syncerListener.Addr().String()
142-
// connect to the cluster syncer
143-
_, err = m.syncer.Connect(ctx, node.SyncerAddress)
144-
if err != nil {
145-
return fmt.Errorf("failed to connect to cluster syncer: %w", err)
146-
}
147-
// connect to other nodes in the cluster
148-
for _, n := range m.Nodes() {
149-
_, err = s.Connect(ctx, n.SyncerAddress)
124+
syncerListener, err := net.Listen("tcp", ":0")
125+
if err != nil {
126+
return fmt.Errorf("failed to listen on syncer address: %w", err)
127+
}
128+
defer syncerListener.Close()
129+
130+
// start a syncer
131+
_, port, err := net.SplitHostPort(syncerListener.Addr().String())
132+
if err != nil {
133+
return fmt.Errorf("failed to split syncer address: %w", err)
134+
}
135+
s = syncer.New(syncerListener, cm, testutil.NewMemPeerStore(), gateway.Header{
136+
GenesisID: genesisIndex.ID,
137+
UniqueID: gateway.GenerateUniqueID(),
138+
NetAddress: "127.0.0.1:" + port,
139+
}, syncer.WithLogger(log.Named("syncer")),
140+
syncer.WithPeerDiscoveryInterval(5*time.Second),
141+
syncer.WithSyncInterval(5*time.Second),
142+
syncer.WithMaxInboundPeers(10000),
143+
syncer.WithMaxOutboundPeers(10000))
144+
defer s.Close()
145+
go s.Run(ctx)
146+
147+
node.SyncerAddress = syncerListener.Addr().String()
148+
// connect to the cluster syncer
149+
_, err = m.syncer.Connect(ctx, node.SyncerAddress)
150150
if err != nil {
151-
log.Debug("failed to connect to node", zap.String("node", n.ID.String()), zap.Error(err))
151+
return fmt.Errorf("failed to connect to cluster syncer: %w", err)
152152
}
153153
}
154154

nodes/manager.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ type (
4545

4646
// A Manager manages a set of nodes in the cluster.
4747
Manager struct {
48-
dir string
49-
chain *chain.Manager
50-
syncer *syncer.Syncer
51-
log *zap.Logger
48+
dir string
49+
chain *chain.Manager
50+
syncer *syncer.Syncer
51+
log *zap.Logger
52+
shareConsensus bool
5253

5354
mu sync.Mutex
5455
wg sync.WaitGroup
@@ -226,15 +227,19 @@ func createNodeDir(baseDir string, id NodeID) (dir string, err error) {
226227
}
227228

228229
// NewManager creates a new node manager.
229-
func NewManager(dir string, cm *chain.Manager, s *syncer.Syncer, log *zap.Logger) *Manager {
230-
return &Manager{
230+
func NewManager(dir string, cm *chain.Manager, s *syncer.Syncer, opts ...Option) *Manager {
231+
m := &Manager{
231232
dir: dir,
232233
chain: cm,
233234
syncer: s,
234-
log: log,
235+
log: zap.NewNop(),
235236

236237
nodes: make(map[NodeID]Node),
237238

238239
close: make(chan struct{}),
239240
}
241+
for _, opt := range opts {
242+
opt(m)
243+
}
244+
return m
240245
}

nodes/opts.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package nodes
2+
3+
import "go.uber.org/zap"
4+
5+
// Option is a functional option for a Manager.
6+
type Option func(*Manager)
7+
8+
// WithLog sets the logger for the Manager.
9+
func WithLog(l *zap.Logger) Option {
10+
return func(m *Manager) {
11+
m.log = l
12+
}
13+
}
14+
15+
func WithSharedConsensus(shared bool) Option {
16+
return func(m *Manager) {
17+
m.shareConsensus = shared
18+
}
19+
}

nodes/renterd.go

Lines changed: 54 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -61,62 +61,69 @@ func (m *Manager) StartRenterd(ctx context.Context, sk types.PrivateKey, ready c
6161
}
6262
defer os.RemoveAll(dir)
6363

64-
syncerListener, err := net.Listen("tcp", ":0")
65-
if err != nil {
66-
return fmt.Errorf("failed to listen on syncer address: %w", err)
67-
}
68-
defer syncerListener.Close()
69-
7064
apiListener, err := net.Listen("tcp", ":0")
7165
if err != nil {
7266
return fmt.Errorf("failed to listen on http address: %w", err)
7367
}
7468
defer apiListener.Close()
7569

76-
// start a chain manager
7770
network := m.chain.TipState().Network
78-
genesisIndex, ok := m.chain.BestIndex(0)
79-
if !ok {
80-
return errors.New("failed to get genesis index")
81-
}
82-
genesis, ok := m.chain.Block(genesisIndex.ID)
83-
if !ok {
84-
return errors.New("failed to get genesis block")
85-
}
86-
bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db"))
87-
if err != nil {
88-
return fmt.Errorf("failed to open bolt db: %w", err)
89-
}
90-
defer bdb.Close()
91-
dbstore, tipState, err := chain.NewDBStore(bdb, network, genesis)
92-
if err != nil {
93-
return fmt.Errorf("failed to create dbstore: %w", err)
94-
}
95-
cm := chain.NewManager(dbstore, tipState)
9671

97-
// start a syncer
98-
_, port, err := net.SplitHostPort(syncerListener.Addr().String())
99-
if err != nil {
100-
return fmt.Errorf("failed to split syncer address: %w", err)
101-
}
102-
s := syncer.New(syncerListener, cm, testutil.NewMemPeerStore(), gateway.Header{
103-
GenesisID: genesisIndex.ID,
104-
UniqueID: gateway.GenerateUniqueID(),
105-
NetAddress: "127.0.0.1:" + port,
106-
}, syncer.WithLogger(log.Named("syncer")), syncer.WithPeerDiscoveryInterval(5*time.Second), syncer.WithSyncInterval(5*time.Second), syncer.WithMaxOutboundPeers(10000), syncer.WithMaxInboundPeers(10000))
107-
defer s.Close()
108-
go s.Run(ctx)
109-
node.SyncerAddress = syncerListener.Addr().String()
110-
// connect to the cluster syncer
111-
_, err = m.syncer.Connect(ctx, node.SyncerAddress)
112-
if err != nil {
113-
return fmt.Errorf("failed to connect to cluster syncer: %w", err)
114-
}
115-
// connect to other nodes in the cluster
116-
for _, n := range m.Nodes() {
117-
_, err = s.Connect(ctx, n.SyncerAddress)
72+
var cm *chain.Manager
73+
var s *syncer.Syncer
74+
if m.shareConsensus {
75+
cm = m.chain
76+
s = m.syncer
77+
} else {
78+
// start a chain manager
79+
genesisIndex, ok := m.chain.BestIndex(0)
80+
if !ok {
81+
return errors.New("failed to get genesis index")
82+
}
83+
genesis, ok := m.chain.Block(genesisIndex.ID)
84+
if !ok {
85+
return errors.New("failed to get genesis block")
86+
}
87+
bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db"))
88+
if err != nil {
89+
return fmt.Errorf("failed to open bolt db: %w", err)
90+
}
91+
defer bdb.Close()
92+
dbstore, tipState, err := chain.NewDBStore(bdb, network, genesis)
93+
if err != nil {
94+
return fmt.Errorf("failed to create dbstore: %w", err)
95+
}
96+
97+
cm = chain.NewManager(dbstore, tipState)
98+
99+
syncerListener, err := net.Listen("tcp", ":0")
100+
if err != nil {
101+
return fmt.Errorf("failed to listen on syncer address: %w", err)
102+
}
103+
defer syncerListener.Close()
104+
105+
// start a syncer
106+
_, port, err := net.SplitHostPort(syncerListener.Addr().String())
107+
if err != nil {
108+
return fmt.Errorf("failed to split syncer address: %w", err)
109+
}
110+
s = syncer.New(syncerListener, cm, testutil.NewMemPeerStore(), gateway.Header{
111+
GenesisID: genesisIndex.ID,
112+
UniqueID: gateway.GenerateUniqueID(),
113+
NetAddress: "127.0.0.1:" + port,
114+
}, syncer.WithLogger(log.Named("syncer")),
115+
syncer.WithPeerDiscoveryInterval(5*time.Second),
116+
syncer.WithSyncInterval(5*time.Second),
117+
syncer.WithMaxInboundPeers(10000),
118+
syncer.WithMaxOutboundPeers(10000))
119+
defer s.Close()
120+
go s.Run(ctx)
121+
122+
node.SyncerAddress = syncerListener.Addr().String()
123+
// connect to the cluster syncer
124+
_, err = m.syncer.Connect(ctx, node.SyncerAddress)
118125
if err != nil {
119-
log.Debug("failed to connect to peer syncer", zap.Stringer("peer", n.ID), zap.Error(err))
126+
return fmt.Errorf("failed to connect to cluster syncer: %w", err)
120127
}
121128
}
122129

0 commit comments

Comments
 (0)