Skip to content

Commit c7aa94e

Browse files
committed
fix up
Signed-off-by: Sam Yuan <yy19902439@126.com>
1 parent 8fa0a8c commit c7aa94e

File tree

2 files changed

+91
-83
lines changed

2 files changed

+91
-83
lines changed

pkg/infra/broadcaster.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package infra
2+
3+
import (
4+
"context"
5+
"io"
6+
"tape/pkg/infra/basic"
7+
8+
"github.com/hyperledger/fabric-protos-go/common"
9+
"github.com/hyperledger/fabric-protos-go/orderer"
10+
"github.com/pkg/errors"
11+
log "github.com/sirupsen/logrus"
12+
)
13+
14+
type Broadcasters struct {
15+
workers []*Broadcaster
16+
ctx context.Context
17+
envs chan *Elements
18+
errorCh chan error
19+
}
20+
21+
type Broadcaster struct {
22+
c orderer.AtomicBroadcast_BroadcastClient
23+
logger *log.Logger
24+
}
25+
26+
func CreateBroadcasters(ctx context.Context, envs chan *Elements, errorCh chan error, config basic.Config, logger *log.Logger) (*Broadcasters, error) {
27+
var workers []*Broadcaster
28+
for i := 0; i < config.NumOfConn; i++ {
29+
broadcaster, err := CreateBroadcaster(ctx, config.Orderer, logger)
30+
if err != nil {
31+
return nil, err
32+
}
33+
workers = append(workers, broadcaster)
34+
}
35+
36+
return &Broadcasters{
37+
workers: workers,
38+
ctx: ctx,
39+
envs: envs,
40+
errorCh: errorCh,
41+
}, nil
42+
}
43+
44+
func (bs Broadcasters) Start() {
45+
for _, b := range bs.workers {
46+
go b.StartDraining(bs.errorCh)
47+
go b.Start(bs.ctx, bs.envs, bs.errorCh)
48+
}
49+
}
50+
51+
func CreateBroadcaster(ctx context.Context, node basic.Node, logger *log.Logger) (*Broadcaster, error) {
52+
client, err := basic.CreateBroadcastClient(ctx, node, logger)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
return &Broadcaster{c: client, logger: logger}, nil
58+
}
59+
60+
func (b *Broadcaster) Start(ctx context.Context, envs <-chan *Elements, errorCh chan error) {
61+
b.logger.Debugf("Start sending broadcast")
62+
for {
63+
select {
64+
case e := <-envs:
65+
err := b.c.Send(e.Envelope)
66+
if err != nil {
67+
errorCh <- err
68+
}
69+
case <-ctx.Done():
70+
return
71+
}
72+
}
73+
}
74+
75+
func (b *Broadcaster) StartDraining(errorCh chan error) {
76+
for {
77+
res, err := b.c.Recv()
78+
if err != nil {
79+
if err == io.EOF {
80+
return
81+
}
82+
b.logger.Errorf("recv broadcast err: %+v, status: %+v\n", err, res)
83+
return
84+
}
85+
86+
if res.Status != common.Status_SUCCESS {
87+
errorCh <- errors.Errorf("recv errouneous status %s", res.Status)
88+
return
89+
}
90+
}
91+
}

pkg/infra/proposer.go

Lines changed: 0 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@ package infra
22

33
import (
44
"context"
5-
"io"
65
"tape/pkg/infra/basic"
76

8-
"github.com/hyperledger/fabric-protos-go/common"
9-
"github.com/hyperledger/fabric-protos-go/orderer"
107
"github.com/hyperledger/fabric-protos-go/peer"
11-
"github.com/pkg/errors"
128
log "github.com/sirupsen/logrus"
139
)
1410

@@ -91,82 +87,3 @@ func (p *Proposer) Start(ctx context.Context, signed, processed chan *Elements,
9187
}
9288
}
9389
}
94-
95-
type Broadcasters struct {
96-
workers []*Broadcaster
97-
ctx context.Context
98-
envs chan *Elements
99-
errorCh chan error
100-
}
101-
102-
type Broadcaster struct {
103-
c orderer.AtomicBroadcast_BroadcastClient
104-
logger *log.Logger
105-
}
106-
107-
func CreateBroadcasters(ctx context.Context, envs chan *Elements, errorCh chan error, config basic.Config, logger *log.Logger) (*Broadcasters, error) {
108-
var workers []*Broadcaster
109-
for i := 0; i < config.NumOfConn; i++ {
110-
broadcaster, err := CreateBroadcaster(ctx, config.Orderer, logger)
111-
if err != nil {
112-
return nil, err
113-
}
114-
workers = append(workers, broadcaster)
115-
}
116-
117-
return &Broadcasters{
118-
workers: workers,
119-
ctx: ctx,
120-
envs: envs,
121-
errorCh: errorCh,
122-
}, nil
123-
}
124-
125-
func (bs Broadcasters) Start() {
126-
for _, b := range bs.workers {
127-
go b.StartDraining(bs.errorCh)
128-
go b.Start(bs.ctx, bs.envs, bs.errorCh)
129-
}
130-
}
131-
132-
func CreateBroadcaster(ctx context.Context, node basic.Node, logger *log.Logger) (*Broadcaster, error) {
133-
client, err := basic.CreateBroadcastClient(ctx, node, logger)
134-
if err != nil {
135-
return nil, err
136-
}
137-
138-
return &Broadcaster{c: client, logger: logger}, nil
139-
}
140-
141-
func (b *Broadcaster) Start(ctx context.Context, envs <-chan *Elements, errorCh chan error) {
142-
b.logger.Debugf("Start sending broadcast")
143-
for {
144-
select {
145-
case e := <-envs:
146-
err := b.c.Send(e.Envelope)
147-
if err != nil {
148-
errorCh <- err
149-
}
150-
case <-ctx.Done():
151-
return
152-
}
153-
}
154-
}
155-
156-
func (b *Broadcaster) StartDraining(errorCh chan error) {
157-
for {
158-
res, err := b.c.Recv()
159-
if err != nil {
160-
if err == io.EOF {
161-
return
162-
}
163-
b.logger.Errorf("recv broadcast err: %+v, status: %+v\n", err, res)
164-
return
165-
}
166-
167-
if res.Status != common.Status_SUCCESS {
168-
errorCh <- errors.Errorf("recv errouneous status %s", res.Status)
169-
return
170-
}
171-
}
172-
}

0 commit comments

Comments
 (0)