Skip to content

Commit 34d9664

Browse files
committed
refactor for worker interface
Signed-off-by: Sam Yuan <yy19902439@126.com>
1 parent 95f608d commit 34d9664

File tree

8 files changed

+155
-132
lines changed

8 files changed

+155
-132
lines changed

pkg/infra/benchmark_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package infra
33
import (
44
"context"
55
"testing"
6-
"time"
76

87
"tape/e2e/mock"
98

@@ -58,10 +57,10 @@ func BenchmarkPeerEndorsement4(b *testing.B) { benchmarkNPeer(4, b) }
5857
func BenchmarkPeerEndorsement8(b *testing.B) { benchmarkNPeer(8, b) }
5958

6059
func benchmarkAsyncCollector(concurrent int, b *testing.B) {
61-
instance, _ := NewBlockCollector(concurrent, concurrent)
6260
block := make(chan *AddressedBlock, 100)
6361
done := make(chan struct{})
64-
go instance.Start(context.Background(), block, done, b.N, time.Now(), false)
62+
instance, _ := NewBlockCollector(concurrent, concurrent, context.Background(), block, done, b.N, false)
63+
go instance.Start()
6564

6665
b.ReportAllocs()
6766
b.ResetTimer()

pkg/infra/block_collector.go

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ import (
1616
// on a certain number of peers within network.
1717
type BlockCollector struct {
1818
sync.Mutex
19-
thresholdP, totalP int
20-
totalTx int
21-
registry map[uint64]*bitmap.BitMap
19+
thresholdP, totalP, totalTx int
20+
registry map[uint64]*bitmap.BitMap
21+
ctx context.Context
22+
blockCh <-chan *AddressedBlock
23+
finishCh chan struct{}
24+
printResult bool // controls whether to print block commit message. Tests set this to false to avoid polluting stdout.
2225
}
2326

2427
// AddressedBlock describe the source of block
@@ -28,34 +31,37 @@ type AddressedBlock struct {
2831
}
2932

3033
// NewBlockCollector creates a BlockCollector
31-
func NewBlockCollector(threshold int, total int) (*BlockCollector, error) {
34+
func NewBlockCollector(threshold int, totalP int,
35+
ctx context.Context,
36+
blockCh <-chan *AddressedBlock,
37+
finishCh chan struct{},
38+
totalTx int,
39+
printResult bool) (*BlockCollector, error) {
3240
registry := make(map[uint64]*bitmap.BitMap)
33-
if threshold <= 0 || total <= 0 {
41+
if threshold <= 0 || totalP <= 0 {
3442
return nil, errors.New("threshold and total must be greater than zero")
3543
}
36-
if threshold > total {
37-
return nil, errors.Errorf("threshold [%d] must be less than or equal to total [%d]", threshold, total)
44+
if threshold > totalP {
45+
return nil, errors.Errorf("threshold [%d] must be less than or equal to total [%d]", threshold, totalP)
3846
}
3947
return &BlockCollector{
40-
thresholdP: threshold,
41-
totalP: total,
42-
registry: registry,
48+
thresholdP: threshold,
49+
totalP: totalP,
50+
totalTx: totalTx,
51+
registry: registry,
52+
ctx: ctx,
53+
blockCh: blockCh,
54+
finishCh: finishCh,
55+
printResult: printResult,
4356
}, nil
4457
}
4558

46-
func (bc *BlockCollector) Start(
47-
ctx context.Context,
48-
blockCh <-chan *AddressedBlock,
49-
finishCh chan struct{},
50-
totalTx int,
51-
now time.Time,
52-
printResult bool, // controls whether to print block commit message. Tests set this to false to avoid polluting stdout.
53-
) {
59+
func (bc *BlockCollector) Start() {
5460
for {
5561
select {
56-
case block := <-blockCh:
57-
bc.commit(block, finishCh, totalTx, now, printResult)
58-
case <-ctx.Done():
62+
case block := <-bc.blockCh:
63+
bc.commit(block, time.Now())
64+
case <-bc.ctx.Done():
5965
return
6066
}
6167
}
@@ -65,7 +71,7 @@ func (bc *BlockCollector) Start(
6571
// commit commits a block to collector.
6672
// If the number of peers on which this block has been committed has satisfied thresholdP,
6773
// adds the number to the totalTx.
68-
func (bc *BlockCollector) commit(block *AddressedBlock, finishCh chan struct{}, totalTx int, now time.Time, printResult bool) {
74+
func (bc *BlockCollector) commit(block *AddressedBlock, now time.Time) {
6975
bitMap, ok := bc.registry[block.Number]
7076
if !ok {
7177
// The block with Number is received for the first time
@@ -86,13 +92,12 @@ func (bc *BlockCollector) commit(block *AddressedBlock, finishCh chan struct{},
8692

8793
// newly committed block just hits threshold
8894
if cnt == bc.thresholdP {
89-
if printResult {
95+
if bc.printResult {
9096
fmt.Printf("Time %8.2fs\tBlock %6d\tTx %6d\t \n", time.Since(now).Seconds(), block.Number, len(block.FilteredTransactions))
9197
}
92-
93-
bc.totalTx += len(block.FilteredTransactions)
94-
if bc.totalTx >= totalTx {
95-
close(finishCh)
98+
bc.totalTx -= len(block.FilteredTransactions)
99+
if bc.totalTx <= 0 {
100+
close(bc.finishCh)
96101
}
97102
}
98103

pkg/infra/block_collector_test.go

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"sync"
66
"tape/pkg/infra"
7-
"time"
87

98
"github.com/hyperledger/fabric-protos-go/peer"
109
. "github.com/onsi/ginkgo"
@@ -19,12 +18,12 @@ var _ = Describe("BlockCollector", func() {
1918

2019
Context("Async Commit", func() {
2120
It("should work with threshold 1 and observer 1", func() {
22-
instance, err := infra.NewBlockCollector(1, 1)
23-
Expect(err).NotTo(HaveOccurred())
24-
2521
block := make(chan *infra.AddressedBlock)
2622
done := make(chan struct{})
27-
go instance.Start(context.Background(), block, done, 2, time.Now(), false)
23+
instance, err := infra.NewBlockCollector(1, 1, context.Background(), block, done, 2, false)
24+
Expect(err).NotTo(HaveOccurred())
25+
26+
go instance.Start()
2827

2928
block <- newAddressedBlock(0, 0)
3029
Consistently(done).ShouldNot(BeClosed())
@@ -33,12 +32,12 @@ var _ = Describe("BlockCollector", func() {
3332
})
3433

3534
It("should work with threshold 1 and observer 2", func() {
36-
instance, err := infra.NewBlockCollector(1, 2)
37-
Expect(err).NotTo(HaveOccurred())
38-
3935
block := make(chan *infra.AddressedBlock)
4036
done := make(chan struct{})
41-
go instance.Start(context.Background(), block, done, 2, time.Now(), false)
37+
instance, err := infra.NewBlockCollector(1, 2, context.Background(), block, done, 2, false)
38+
Expect(err).NotTo(HaveOccurred())
39+
40+
go instance.Start()
4241

4342
block <- newAddressedBlock(0, 0)
4443
Consistently(done).ShouldNot(BeClosed())
@@ -55,12 +54,12 @@ var _ = Describe("BlockCollector", func() {
5554
})
5655

5756
It("should work with threshold 4 and observer 4", func() {
58-
instance, err := infra.NewBlockCollector(4, 4)
59-
Expect(err).NotTo(HaveOccurred())
60-
6157
block := make(chan *infra.AddressedBlock)
6258
done := make(chan struct{})
63-
go instance.Start(context.Background(), block, done, 2, time.Now(), false)
59+
instance, err := infra.NewBlockCollector(4, 4, context.Background(), block, done, 2, false)
60+
Expect(err).NotTo(HaveOccurred())
61+
62+
go instance.Start()
6463

6564
block <- newAddressedBlock(0, 1)
6665
Consistently(done).ShouldNot(BeClosed())
@@ -82,12 +81,12 @@ var _ = Describe("BlockCollector", func() {
8281
})
8382

8483
It("should work with threshold 2 and observer 4", func() {
85-
instance, err := infra.NewBlockCollector(2, 4)
86-
Expect(err).NotTo(HaveOccurred())
87-
8884
block := make(chan *infra.AddressedBlock)
8985
done := make(chan struct{})
90-
go instance.Start(context.Background(), block, done, 1, time.Now(), false)
86+
instance, err := infra.NewBlockCollector(2, 4, context.Background(), block, done, 1, false)
87+
Expect(err).NotTo(HaveOccurred())
88+
89+
go instance.Start()
9190

9291
block <- newAddressedBlock(0, 0)
9392
Consistently(done).ShouldNot(BeClosed())
@@ -96,12 +95,12 @@ var _ = Describe("BlockCollector", func() {
9695
})
9796

9897
PIt("should not count tx for repeated block", func() {
99-
instance, err := infra.NewBlockCollector(1, 1)
100-
Expect(err).NotTo(HaveOccurred())
101-
10298
block := make(chan *infra.AddressedBlock)
10399
done := make(chan struct{})
104-
go instance.Start(context.Background(), block, done, 2, time.Now(), false)
100+
instance, err := infra.NewBlockCollector(1, 1, context.Background(), block, done, 2, false)
101+
Expect(err).NotTo(HaveOccurred())
102+
103+
go instance.Start()
105104

106105
block <- newAddressedBlock(0, 0)
107106
Consistently(done).ShouldNot(BeClosed())
@@ -113,28 +112,32 @@ var _ = Describe("BlockCollector", func() {
113112
})
114113

115114
It("should return err when threshold is greater than total", func() {
116-
instance, err := infra.NewBlockCollector(2, 1)
115+
block := make(chan *infra.AddressedBlock)
116+
done := make(chan struct{})
117+
instance, err := infra.NewBlockCollector(2, 1, context.Background(), block, done, 2, false)
117118
Expect(err).Should(MatchError("threshold [2] must be less than or equal to total [1]"))
118119
Expect(instance).Should(BeNil())
119120
})
120121

121122
It("should return err when threshold or total is zero", func() {
122-
instance, err := infra.NewBlockCollector(0, 1)
123+
block := make(chan *infra.AddressedBlock)
124+
done := make(chan struct{})
125+
instance, err := infra.NewBlockCollector(0, 1, context.Background(), block, done, 2, false)
123126
Expect(err).Should(MatchError("threshold and total must be greater than zero"))
124127
Expect(instance).Should(BeNil())
125128

126-
instance, err = infra.NewBlockCollector(1, 0)
129+
instance, err = infra.NewBlockCollector(1, 0, context.Background(), block, done, 2, false)
127130
Expect(err).Should(MatchError("threshold and total must be greater than zero"))
128131
Expect(instance).Should(BeNil())
129132
})
130133

131134
It("Should supports parallel committers", func() {
132-
instance, err := infra.NewBlockCollector(100, 100)
133-
Expect(err).NotTo(HaveOccurred())
134-
135135
block := make(chan *infra.AddressedBlock)
136136
done := make(chan struct{})
137-
go instance.Start(context.Background(), block, done, 1, time.Now(), false)
137+
instance, err := infra.NewBlockCollector(100, 100, context.Background(), block, done, 1, false)
138+
Expect(err).NotTo(HaveOccurred())
139+
140+
go instance.Start()
138141

139142
var wg sync.WaitGroup
140143
wg.Add(100)
@@ -149,12 +152,12 @@ var _ = Describe("BlockCollector", func() {
149152
})
150153

151154
It("Should supports threshold 3 and observer 5 as parallel committers", func() {
152-
instance, err := infra.NewBlockCollector(3, 5)
153-
Expect(err).NotTo(HaveOccurred())
154-
155155
block := make(chan *infra.AddressedBlock)
156156
done := make(chan struct{})
157-
go instance.Start(context.Background(), block, done, 10, time.Now(), false)
157+
instance, err := infra.NewBlockCollector(3, 5, context.Background(), block, done, 10, false)
158+
Expect(err).NotTo(HaveOccurred())
159+
160+
go instance.Start()
158161

159162
for i := 0; i < 3; i++ {
160163
go func(idx int) {
@@ -167,13 +170,12 @@ var _ = Describe("BlockCollector", func() {
167170
})
168171

169172
It("Should supports threshold 5 and observer 5 as parallel committers", func() {
170-
instance, err := infra.NewBlockCollector(5, 5)
171-
Expect(err).NotTo(HaveOccurred())
172-
173173
block := make(chan *infra.AddressedBlock)
174174
done := make(chan struct{})
175+
instance, err := infra.NewBlockCollector(5, 5, context.Background(), block, done, 10, false)
176+
Expect(err).NotTo(HaveOccurred())
175177

176-
go instance.Start(context.Background(), block, done, 10, time.Now(), false)
178+
go instance.Start()
177179
for i := 0; i < 5; i++ {
178180
go func(idx int) {
179181
for j := 0; j < 10; j++ {

pkg/infra/interface.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ type Crypto interface {
1717
}
1818

1919
/*
20-
type consmuer and producer interface
2120
as Tape major as Producer and Consumer pattern
2221
define an interface here as Worker with start here
2322
as for #56 and #174,in cli imp adjust sequence of P&C impl to control workflow.

pkg/infra/observer.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import (
1010
)
1111

1212
type Observers struct {
13-
workers []*Observer
13+
workers []*Observer
14+
errorCh chan error
15+
blockCh chan *AddressedBlock
16+
StartTime time.Time
1417
}
1518

1619
type Observer struct {
@@ -20,22 +23,27 @@ type Observer struct {
2023
logger *log.Logger
2124
}
2225

23-
func CreateObservers(ctx context.Context, channel string, nodes []Node, crypto Crypto, logger *log.Logger) (*Observers, error) {
26+
func CreateObservers(ctx context.Context, crypto Crypto, errorCh chan error, blockCh chan *AddressedBlock, config Config, logger *log.Logger) (*Observers, error) {
2427
var workers []*Observer
25-
for i, node := range nodes {
26-
worker, err := CreateObserver(ctx, channel, node, crypto, logger)
28+
for i, node := range config.Committers {
29+
worker, err := CreateObserver(ctx, config.Channel, node, crypto, logger)
2730
if err != nil {
2831
return nil, err
2932
}
3033
worker.index = i
3134
workers = append(workers, worker)
3235
}
33-
return &Observers{workers: workers}, nil
36+
return &Observers{
37+
workers: workers,
38+
errorCh: errorCh,
39+
blockCh: blockCh,
40+
}, nil
3441
}
3542

36-
func (o *Observers) Start(errorCh chan error, blockCh chan<- *AddressedBlock, now time.Time) {
43+
func (o *Observers) Start() {
44+
o.StartTime = time.Now()
3745
for i := 0; i < len(o.workers); i++ {
38-
go o.workers[i].Start(errorCh, blockCh, now)
46+
go o.workers[i].Start(o.errorCh, o.blockCh, o.StartTime)
3947
}
4048
}
4149

0 commit comments

Comments
 (0)