Skip to content

Commit

Permalink
BlockCollector detects repeated block (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
wuxuer authored May 1, 2021
1 parent d965232 commit e87f926
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 267 deletions.
54 changes: 9 additions & 45 deletions pkg/infra/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,62 +57,26 @@ func BenchmarkPeerEndorsement2(b *testing.B) { benchmarkNPeer(2, b) }
func BenchmarkPeerEndorsement4(b *testing.B) { benchmarkNPeer(4, b) }
func BenchmarkPeerEndorsement8(b *testing.B) { benchmarkNPeer(8, b) }

func benchmarkSyncCollector(concurrency int, b *testing.B) {
instance, _ := NewBlockCollector(concurrency, concurrency)
processed := make(chan struct{}, b.N)
defer close(processed)
now := time.Now()
finishCh := make(chan struct{})
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < concurrency; i++ {
go func() {
for j := 0; j < b.N; j++ {
ft := make([]*peer.FilteredTransaction, 1)
fb := &peer.FilteredBlock{
Number: uint64(j),
FilteredTransactions: ft,
}
block := &peer.DeliverResponse_FilteredBlock{
FilteredBlock: fb,
}
if instance.Commit(block, finishCh, now) {
processed <- struct{}{}
}
}
}()
}
var n int
for n < b.N {
<-processed
n++
}
b.StopTimer()
}

func BenchmarkSyncCollector1(b *testing.B) { benchmarkSyncCollector(1, b) }
func BenchmarkSyncCollector2(b *testing.B) { benchmarkSyncCollector(2, b) }
func BenchmarkSyncCollector4(b *testing.B) { benchmarkSyncCollector(4, b) }
func BenchmarkSyncCollector8(b *testing.B) { benchmarkSyncCollector(8, b) }
func BenchmarkSyncCollector16(b *testing.B) { benchmarkSyncCollector(16, b) }

func benchmarkAsyncCollector(concurrent int, b *testing.B) {
instance, _ := NewBlockCollector(concurrent, concurrent)
block := make(chan *peer.FilteredBlock, 100)
block := make(chan *AddressedBlock, 100)
done := make(chan struct{})
go instance.Start(context.Background(), block, done, b.N, time.Now(), false)

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < concurrent; i++ {
go func() {
go func(idx int) {
for j := 0; j < b.N; j++ {
block <- &peer.FilteredBlock{
Number: uint64(j),
FilteredTransactions: make([]*peer.FilteredTransaction, 1),
block <- &AddressedBlock{
FilteredBlock: &peer.FilteredBlock{
Number: uint64(j),
FilteredTransactions: make([]*peer.FilteredTransaction, 1),
},
Address: idx,
}
}
}()
}(i)
}
<-done
b.StopTimer()
Expand Down
59 changes: 59 additions & 0 deletions pkg/infra/bitmap/bitmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package bitmap

import "github.com/pkg/errors"

type BitMap struct {
count int // number of bits set
capability int // total number of bits
bits []uint64
}

// Has determine whether the specified position is set
func (b *BitMap) Has(num int) bool {
if num >= b.capability {
return false
}
c, bit := num/64, uint(num%64)
return (c < len(b.bits)) && (b.bits[c]&(1<<bit) != 0)
}

// Set set the specified position
// If the position has been set or exceeds the maximum number of bits, set is a no-op.
func (b *BitMap) Set(num int) {
if b.Has(num) {
return
}
if b.capability <= num {
return
}

c, bit := num/64, uint(num%64)
b.bits[c] |= 1 << bit
b.count++
return
}

func (b *BitMap) Count() int {
return b.count
}

func (b *BitMap) Cap() int {
return b.capability
}

func (b *BitMap) BitsLen() int {
return len(b.bits)
}

// NewBitsMap create a new BitsMap
func NewBitMap(cap int) (BitMap, error) {
if cap < 1 {
return BitMap{}, errors.New("cap should not be less than 1")
}
bitsLen := cap / 64
if cap%64 > 0 {
bitsLen++
}

return BitMap{bits: make([]uint64, bitsLen), capability: cap}, nil
}
13 changes: 13 additions & 0 deletions pkg/infra/bitmap/bitmap_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package bitmap_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestBitmap(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Bitmap Suite")
}
91 changes: 91 additions & 0 deletions pkg/infra/bitmap/bitmap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package bitmap_test

import (
"tape/pkg/infra/bitmap"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("Bitmap", func() {

Context("New BitsMap", func() {
It("the environment is properly set", func() {
b, err := bitmap.NewBitMap(4)
Expect(err).To(BeNil())
Expect(b.Cap()).To(Equal(4))
Expect(b.Count()).To(Equal(0))
Expect(b.BitsLen()).To(Equal(1))

b, err = bitmap.NewBitMap(65)
Expect(err).To(BeNil())
Expect(b.Cap()).To(Equal(65))
Expect(b.Count()).To(Equal(0))
Expect(b.BitsLen()).To(Equal(2))
})

It("should error which cap is less than 1", func() {
_, err := bitmap.NewBitMap(0)
Expect(err).NotTo(BeNil())

_, err = bitmap.NewBitMap(-1)
Expect(err).NotTo(BeNil())
})
})

Context("Operate BitsMap", func() {
It("the len of bits is just one ", func() {
b, err := bitmap.NewBitMap(4)
Expect(err).To(BeNil())
b.Set(0)
Expect(b.Count()).To(Equal(1))
b.Set(2)
Expect(b.Count()).To(Equal(2))
ok := b.Has(0)
Expect(ok).To(BeTrue())
ok = b.Has(2)
Expect(ok).To(BeTrue())
ok = b.Has(1)
Expect(ok).To(BeFalse())
ok = b.Has(4)
Expect(ok).To(BeFalse())

b.Set(4)
Expect(b.Count()).To(Equal(2))
b.Set(2)
Expect(b.Count()).To(Equal(2))
})

It("the len of bits is more than one", func() {
b, err := bitmap.NewBitMap(80)
Expect(err).To(BeNil())
b.Set(0)
Expect(b.Count()).To(Equal(1))
b.Set(2)
Expect(b.Count()).To(Equal(2))
b.Set(70)
Expect(b.Count()).To(Equal(3))
b.Set(79)
Expect(b.Count()).To(Equal(4))
ok := b.Has(0)
Expect(ok).To(BeTrue())
ok = b.Has(2)
Expect(ok).To(BeTrue())
ok = b.Has(70)
Expect(ok).To(BeTrue())
ok = b.Has(79)
Expect(ok).To(BeTrue())
ok = b.Has(1)
Expect(ok).To(BeFalse())
ok = b.Has(3)
Expect(ok).To(BeFalse())
ok = b.Has(69)
Expect(ok).To(BeFalse())

b.Set(80)
Expect(b.Count()).To(Equal(4))
b.Set(2)
Expect(b.Count()).To(Equal(4))
})
})
})
90 changes: 45 additions & 45 deletions pkg/infra/block_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"tape/pkg/infra/bitmap"
"time"

"github.com/hyperledger/fabric-protos-go/peer"
Expand All @@ -17,12 +18,21 @@ type BlockCollector struct {
sync.Mutex
thresholdP, totalP int
totalTx int
registry map[uint64]int
registry map[uint64]*bitmap.BitMap
}

// AddressedBlock describe the source of block
type AddressedBlock struct {
*peer.FilteredBlock
Address int // source peer's number
}

// NewBlockCollector creates a BlockCollector
func NewBlockCollector(threshold int, total int) (*BlockCollector, error) {
registry := make(map[uint64]int)
registry := make(map[uint64]*bitmap.BitMap)
if threshold <= 0 || total <= 0 {
return nil, errors.New("threshold and total must be greater than zero")
}
if threshold > total {
return nil, errors.Errorf("threshold [%d] must be less than or equal to total [%d]", threshold, total)
}
Expand All @@ -35,70 +45,60 @@ func NewBlockCollector(threshold int, total int) (*BlockCollector, error) {

func (bc *BlockCollector) Start(
ctx context.Context,
blockCh <-chan *peer.FilteredBlock,
blockCh <-chan *AddressedBlock,
finishCh chan struct{},
totalTx int,
now time.Time,
printResult bool, // controls whether to print block commit message. Tests set this to false to avoid polluting stdout.
) {
// TODO block collector should be able to detect repeated block, and exclude it from total tx counting.
for {
select {
case block := <-blockCh:
cnt := bc.registry[block.Number] // cnt is default to 0 when key does not exist
cnt++

// newly committed block just hits threshold
if cnt == bc.thresholdP {
if printResult {
fmt.Printf("Time %8.2fs\tBlock %6d\tTx %6d\t \n", time.Since(now).Seconds(), block.Number, len(block.FilteredTransactions))
}

bc.totalTx += len(block.FilteredTransactions)
if bc.totalTx >= totalTx {
close(finishCh)
}
}

if cnt == bc.totalP {
// committed on all peers, remove from registry
delete(bc.registry, block.Number)
} else {
// upsert back to registry
bc.registry[block.Number] = cnt
}
bc.commit(block, finishCh, totalTx, now, printResult)
case <-ctx.Done():
return
}
}
}

// Deprecated
//
// Commit commits a block to collector. It returns true iff the number of peers on which
// this block has been committed has satisfied thresholdP.
func (bc *BlockCollector) Commit(block *peer.DeliverResponse_FilteredBlock, finishCh chan struct{}, now time.Time) (committed bool) {
bc.Lock()
defer bc.Unlock()
// TODO This function contains too many functions and needs further optimization
// commit commits a block to collector.
// If the number of peers on which this block has been committed has satisfied thresholdP,
// adds the number to the totalTx.
func (bc *BlockCollector) commit(block *AddressedBlock, finishCh chan struct{}, totalTx int, now time.Time, printResult bool) {
bitMap, ok := bc.registry[block.Number]
if !ok {
// The block with Number is received for the first time
b, err := bitmap.NewBitMap(bc.totalP)
if err != nil {
panic("Can not make new bitmap for BlockCollector" + err.Error())
}
bc.registry[block.Number] = &b
bitMap = &b
}
// When the block from Address has been received before, return directly.
if bitMap.Has(block.Address) {
return
}

cnt := bc.registry[block.FilteredBlock.Number] // cnt is default to 0 when key does not exist
cnt++
bitMap.Set(block.Address)
cnt := bitMap.Count()

// newly committed block just hits threshold
if cnt == bc.thresholdP {
committed = true
duration := time.Since(now)
bc.totalTx += len(block.FilteredBlock.FilteredTransactions)
fmt.Printf("tx: %d, duration: %+v, tps: %f\n", bc.totalTx, duration, float64(bc.totalTx)/duration.Seconds())
if printResult {
fmt.Printf("Time %8.2fs\tBlock %6d\tTx %6d\t \n", time.Since(now).Seconds(), block.Number, len(block.FilteredTransactions))
}

bc.totalTx += len(block.FilteredTransactions)
if bc.totalTx >= totalTx {
close(finishCh)
}
}

// TODO issue176
if cnt == bc.totalP {
// committed on all peers, remove from registry
delete(bc.registry, block.FilteredBlock.Number)
} else {
// upsert back to registry
bc.registry[block.FilteredBlock.Number] = cnt
delete(bc.registry, block.Number)
}

return
}
Loading

0 comments on commit e87f926

Please sign in to comment.