Skip to content

Commit

Permalink
Merge pull request #67 from kaleido-io/tx-index
Browse files Browse the repository at this point in the history
Add transaction index and timestamps to events
  • Loading branch information
peterbroadhurst authored Jan 15, 2022
2 parents 4e9a1a6 + e96ad59 commit afde10c
Show file tree
Hide file tree
Showing 25 changed files with 1,263 additions and 431 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/google/certificate-transparency-go v1.1.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/hyperledger/fabric-config v0.0.7 // indirect
github.com/hyperledger/fabric-protos-go v0.0.0-20201028172056-a3136dde2354
github.com/hyperledger/fabric-sdk-go v1.0.1-0.20210729165856-3be4ed253dcf
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
Expand Down
18 changes: 10 additions & 8 deletions internal/events/api/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
EventPayloadType_Bytes = "bytes" // default data type of the event payload, no special processing is done before returning to the subscribing client
EventPayloadType_String = "string" // event payload will be an UTF-8 encoded string
EventPayloadType_StringifiedJSON = "stringifiedJSON" // event payload will be a structured map with UTF-8 encoded string values
EventPayloadType_JSON = "json" // equivalent to "stringifiedJSON"
)

// persistedFilter is the part of the filter we record to storage
Expand Down Expand Up @@ -52,7 +53,7 @@ type SubscriptionInfo struct {
Signer string `json:"signer"`
FromBlock string `json:"fromBlock,omitempty"`
Filter persistedFilter `json:"filter"`
PayloadType string `json:"payloadType,omitempty"` // optional. data type of the payload bytes; "bytes", "string" or "stringifiedJSON". Default to "bytes"
PayloadType string `json:"payloadType,omitempty"` // optional. data type of the payload bytes; "bytes", "string" or "stringifiedJSON/json". Default to "bytes"
}

// GetID returns the ID (for sorting)
Expand All @@ -61,11 +62,12 @@ func (info *SubscriptionInfo) GetID() string {
}

type EventEntry struct {
ChaincodeId string `json:"chaincodeId"`
BlockNumber uint64 `json:"blockNumber"`
TransactionId string `json:"transactionId"`
EventName string `json:"eventName"`
Payload interface{} `json:"payload"`
Timestamp uint64 `json:"timestamp,omitempty"`
SubID string `json:"subId"`
ChaincodeId string `json:"chaincodeId"`
BlockNumber uint64 `json:"blockNumber"`
TransactionId string `json:"transactionId"`
TransactionIndex int `json:"transactionIndex"`
EventName string `json:"eventName"`
Payload interface{} `json:"payload"`
Timestamp int64 `json:"timestamp,omitempty"`
SubID string `json:"subId"`
}
50 changes: 30 additions & 20 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api"
"github.com/hyperledger/firefly-fabconnect/internal/ws"

lru "github.com/hashicorp/golang-lru"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -73,6 +74,7 @@ type StreamInfo struct {
Webhook *webhookActionInfo `json:"webhook,omitempty"`
WebSocket *webSocketActionInfo `json:"websocket,omitempty"`
Timestamps bool `json:"timestamps,omitempty"` // Include block timestamps in the events generated
TimestampCacheSize int `json:"timestampCacheSize,omitempty"`
}

type webhookActionInfo struct {
Expand All @@ -91,26 +93,27 @@ type webSocketActionInfo struct {
type eventHandler func(*eventData)

type eventStream struct {
sm subscriptionManager
allowPrivateIPs bool
spec *StreamInfo
eventStream chan *eventData
eventHandler eventHandler
stopped bool
processorDone bool
pollingInterval time.Duration
pollerDone bool
inFlight uint64
batchCond *sync.Cond
batchQueue *list.List
batchCount uint64
initialRetryDelay time.Duration
backoffFactor float64
updateInProgress bool
updateInterrupt chan struct{} // a zero-sized struct used only for signaling (hand rolled alternative to context)
updateWG *sync.WaitGroup // Wait group for the go routines to reply back after they have stopped
action eventStreamAction
wsChannels ws.WebSocketChannels
sm subscriptionManager
allowPrivateIPs bool
spec *StreamInfo
eventStream chan *eventData
eventHandler eventHandler
stopped bool
processorDone bool
pollingInterval time.Duration
pollerDone bool
inFlight uint64
batchCond *sync.Cond
batchQueue *list.List
batchCount uint64
initialRetryDelay time.Duration
backoffFactor float64
updateInProgress bool
updateInterrupt chan struct{} // a zero-sized struct used only for signaling (hand rolled alternative to context)
updateWG *sync.WaitGroup // Wait group for the go routines to reply back after they have stopped
action eventStreamAction
wsChannels ws.WebSocketChannels
blockTimestampCache *lru.Cache
}

type eventStreamAction interface {
Expand Down Expand Up @@ -149,6 +152,9 @@ func newEventStream(sm subscriptionManager, spec *StreamInfo, wsChannels ws.WebS
} else {
spec.ErrorHandling = ErrorHandlingSkip
}
if spec.TimestampCacheSize == 0 {
spec.TimestampCacheSize = DefaultTimestampCacheSize
}

a = &eventStream{
sm: sm,
Expand All @@ -164,6 +170,10 @@ func newEventStream(sm subscriptionManager, spec *StreamInfo, wsChannels ws.WebS
}
a.eventHandler = a.handleEvent

if a.blockTimestampCache, err = lru.New(spec.TimestampCacheSize); err != nil {
return nil, errors.Errorf(errors.EventStreamsCreateStreamResourceErr, err)
}

if a.pollingInterval == 0 {
// Let's us do this from UTs, without exposing it
a.pollingInterval = 10 * time.Millisecond
Expand Down
150 changes: 20 additions & 130 deletions internal/events/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,12 @@
package events

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks"
"github.com/hyperledger/firefly-fabconnect/internal/conf"
"github.com/hyperledger/firefly-fabconnect/internal/errors"
eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api"
Expand All @@ -40,119 +33,6 @@ import (
"github.com/stretchr/testify/mock"
)

func newTestStreamForBatching(spec *StreamInfo, db kvstore.KVStore, status ...int) (*subscriptionMGR, *eventStream, *httptest.Server, chan []*eventsapi.EventEntry) {
mux := http.NewServeMux()
eventStream := make(chan []*eventsapi.EventEntry)
count := 0
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
var events []*eventsapi.EventEntry
_ = json.NewDecoder(req.Body).Decode(&events)
eventStream <- events
idx := count
if idx >= len(status) {
idx = len(status) - 1
}
res.WriteHeader(status[idx])
count++
})
svr := httptest.NewServer(mux)
if spec.Type == "" {
spec.Type = "webhook"
spec.Webhook.URL = svr.URL
spec.Webhook.Headers = map[string]string{"x-my-header": "my-value"}
}
sm := newTestSubscriptionManager()
sm.config.WebhooksAllowPrivateIPs = true
sm.config.PollingIntervalSec = 0
if db != nil {
sm.db = db
}
mockstore, ok := sm.db.(*mockkvstore.KVStore)
if ok {
mockstore.On("Get", mock.Anything).Return([]byte(""), nil)
mockstore.On("Put", mock.Anything, mock.Anything).Return(nil)
}

_ = sm.addStream(spec)
return sm, sm.streams[spec.ID], svr, eventStream
}

func newTestStreamForWebSocket(spec *StreamInfo, db kvstore.KVStore, status ...int) (*subscriptionMGR, *eventStream, *mockWebSocket) {
sm := newTestSubscriptionManager()
sm.config.PollingIntervalSec = 0
if db != nil {
sm.db = db
}
_ = sm.addStream(spec)
return sm, sm.streams[spec.ID], sm.wsChannels.(*mockWebSocket)
}

func testEvent(subID string) *eventData {
entry := &eventsapi.EventEntry{
SubID: subID,
}
return &eventData{
event: entry,
batchComplete: func(*eventsapi.EventEntry) {},
}
}

func mockRPCClient(fromBlock string, withReset ...bool) *mockfabric.RPCClient {
rpc := &mockfabric.RPCClient{}
blockEventChan := make(chan *fab.BlockEvent)
ccEventChan := make(chan *fab.CCEvent)
var roBlockEventChan <-chan *fab.BlockEvent = blockEventChan
var roCCEventChan <-chan *fab.CCEvent = ccEventChan
res := &fab.BlockchainInfoResponse{
BCI: &common.BlockchainInfo{
Height: 10,
},
}
rpc.On("SubscribeEvent", mock.Anything, mock.Anything).Return(nil, roBlockEventChan, roCCEventChan, nil)
rpc.On("QueryChainInfo", mock.Anything, mock.Anything).Return(res, nil)
rpc.On("Unregister", mock.Anything).Return()

go func() {
if fromBlock == "0" {
blockEventChan <- &fab.BlockEvent{
Block: constructBlock(1),
}
}
blockEventChan <- &fab.BlockEvent{
Block: constructBlock(11),
}
if len(withReset) > 0 {
blockEventChan <- &fab.BlockEvent{
Block: constructBlock(11),
}
}
}()

return rpc
}

func setupTestSubscription(sm *subscriptionMGR, stream *eventStream, subscriptionName, fromBlock string, withReset ...bool) *eventsapi.SubscriptionInfo {
rpc := mockRPCClient(fromBlock, withReset...)
sm.rpc = rpc
spec := &eventsapi.SubscriptionInfo{
Name: subscriptionName,
Stream: stream.spec.ID,
}
if fromBlock != "" {
spec.FromBlock = fromBlock
}
_ = sm.addSubscription(spec)

return spec
}

func constructBlock(number uint64) *common.Block {
mockTx := eventmocks.NewTransactionWithCCEvent("testTxID", peer.TxValidationCode_VALID, "testChaincodeID", "testCCEventName", []byte("testPayload"))
mockBlock := eventmocks.NewBlock("testChannelID", mockTx)
mockBlock.Header.Number = number
return mockBlock
}

func TestConstructorNoSpec(t *testing.T) {
assert := assert.New(t)
_, err := newEventStream(newTestSubscriptionManager(), nil, nil)
Expand Down Expand Up @@ -500,7 +380,7 @@ func TestProcessEventsEnd2EndWebhook(t *testing.T) {
&StreamInfo{
BatchSize: 1,
Webhook: &webhookActionInfo{},
Timestamps: false,
Timestamps: true,
}, db, 200)
defer svr.Close()

Expand All @@ -512,9 +392,15 @@ func TestProcessEventsEnd2EndWebhook(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
// the block event
e1s := <-eventStream
assert.Equal(1, len(e1s))
assert.Equal(uint64(11), e1s[0].BlockNumber)
// the chaincode event
e2s := <-eventStream
assert.Equal(1, len(e2s))
assert.Equal(uint64(10), e2s[0].BlockNumber)
assert.Equal(int64(1000000), e2s[0].Timestamp)
wg.Done()
}()
wg.Wait()
Expand All @@ -537,7 +423,7 @@ func TestProcessEventsEnd2EndCatchupWebhook(t *testing.T) {
_ = db.Init()
sm, stream, svr, eventStream := newTestStreamForBatching(
&StreamInfo{
BatchSize: 1,
BatchSize: 2,
Webhook: &webhookActionInfo{},
Timestamps: false,
}, db, 200)
Expand All @@ -552,11 +438,9 @@ func TestProcessEventsEnd2EndCatchupWebhook(t *testing.T) {
wg.Add(1)
go func() {
e1s := <-eventStream
assert.Equal(1, len(e1s))
assert.Equal(2, len(e1s))
assert.Equal(uint64(1), e1s[0].BlockNumber)
e2s := <-eventStream
assert.Equal(1, len(e2s))
assert.Equal(uint64(11), e2s[0].BlockNumber)
assert.Equal(uint64(11), e1s[1].BlockNumber)
wg.Done()
}()
wg.Wait()
Expand Down Expand Up @@ -636,6 +520,10 @@ func TestProcessEventsEnd2EndWithReset(t *testing.T) {
e1s := <-eventStream
assert.Equal(1, len(e1s))
assert.Equal(uint64(11), e1s[0].BlockNumber)
// the chaincode event
e2s := <-eventStream
assert.Equal(1, len(e2s))
assert.Equal(uint64(10), e2s[0].BlockNumber)
wg.Done()
}()
wg.Wait()
Expand Down Expand Up @@ -745,7 +633,7 @@ func TestPauseResumeAfterCheckpoint(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
for i := 0; i < 1; i++ {
for i := 0; i < 2; i++ {
<-eventStream
}
wg.Done()
Expand Down Expand Up @@ -811,7 +699,7 @@ func TestPauseResumeBeforeCheckpoint(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
for i := 0; i < 1; i++ {
for i := 0; i < 2; i++ {
<-eventStream
}
wg.Done()
Expand Down Expand Up @@ -851,7 +739,7 @@ func TestMarkStaleOnError(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
for i := 0; i < 1; i++ {
for i := 0; i < 2; i++ {
<-eventStream
}
wg.Done()
Expand Down Expand Up @@ -929,7 +817,7 @@ func TestStoreCheckpointStoreError(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
for i := 0; i < 1; i++ {
for i := 0; i < 2; i++ {
<-eventStream
}
wg.Done()
Expand Down Expand Up @@ -1199,6 +1087,7 @@ func TestUpdateStreamMissingWebhookURL(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
<-eventStream
<-eventStream
wg.Done()
}()
Expand Down Expand Up @@ -1242,6 +1131,7 @@ func TestUpdateStreamInvalidWebhookURL(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
<-eventStream
<-eventStream
wg.Done()
}()
Expand Down
2 changes: 1 addition & 1 deletion internal/events/evtprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (ep *evtProcessor) processEventEntry(subInfo *api.SubscriptionInfo, entry *
switch payloadType {
case api.EventPayloadType_String:
entry.Payload = string(entry.Payload.([]byte))
case api.EventPayloadType_StringifiedJSON:
case api.EventPayloadType_StringifiedJSON, api.EventPayloadType_JSON:
structuredMap := make(map[string]interface{})
err := json.Unmarshal(entry.Payload.([]byte), &structuredMap)
if err != nil {
Expand Down
Loading

0 comments on commit afde10c

Please sign in to comment.