Skip to content

Commit

Permalink
Merge pull request #41 from kaleido-io/event-data
Browse files Browse the repository at this point in the history
Event data and filtering enhancements
  • Loading branch information
jimthematrix committed Sep 2, 2021
2 parents ad236ac + 8e54e5f commit 4c6c064
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 107 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.16

require (
github.com/Shopify/sarama v1.29.1
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
github.com/fatih/color v1.12.0 // indirect
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8
github.com/golang/protobuf v1.5.2
Expand All @@ -16,6 +17,8 @@ require (
github.com/hyperledger/fabric-protos-go v0.0.0-20200707132912-fee30f3ccd23
github.com/hyperledger/fabric-sdk-go v1.0.1-0.20210729165856-3be4ed253dcf
github.com/julienschmidt/httprouter v1.3.0
github.com/mattn/go-isatty v0.0.13 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d
github.com/oklog/ulid/v2 v2.0.2
Expand All @@ -31,7 +34,7 @@ require (
github.com/sykesm/zap-logfmt v0.0.4 // indirect
github.com/syndtr/goleveldb v1.0.0
github.com/x-cray/logrus-prefixed-formatter v0.5.2
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf // indirect
golang.org/x/tools v0.1.5 // indirect
gopkg.in/yaml.v2 v2.4.0
)
Expand Down
19 changes: 12 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU=
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/daaku/go.zipexe v1.0.0/go.mod h1:z8IiR6TsVLEYKwXAoE/I+8ys/sDkgTzSL0CLnGVd57E=
Expand Down Expand Up @@ -416,13 +417,15 @@ github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.13 h1:qdl+GuBjcsKKDco5BsxPJlId98mSWNKqYA+Co0SC1yA=
github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.6/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand Down Expand Up @@ -555,19 +558,21 @@ github.com/pseudomuto/protokit v0.2.0/go.mod h1:2PdH30hxVHsup8KpBTOXTBeMVhJZVio3
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand Down Expand Up @@ -875,8 +880,8 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
41 changes: 40 additions & 1 deletion internal/events/api/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,46 @@

package api

const (
BlockType_TX = "tx" // corresponds to blocks containing regular transactions
BlockType_Config = "config" // corresponds to blocks containing channel configurations and updates
)

// persistedFilter is the part of the filter we record to storage
// BlockType: optional. only notify on blocks of a specific type
// types are defined in github.com/hyperledger/fabric-protos-go/common:
// "config": for HeaderType_CONFIG, HeaderType_CONFIG_UPDATE
// "tx": for HeaderType_ENDORSER_TRANSACTION
// ChaincodeId: optional, only notify on blocks containing events for chaincode Id
// Filter: optional. regexp applied to the event name. can be used independent of Chaincode ID
// FromBlock: optional. "newest", "oldest", a number. default is "newest"
type persistedFilter struct {
BlockType string `json:"blockType,omitempty"`
ChaincodeId string `json:"chaincodeId,omitempty"`
EventFilter string `json:"eventFilter,omitempty"`
Filter string `json:"filter,omitempty"`
ToBlock uint64 `json:"toBlock,omitempty"`
}

// SubscriptionInfo is the persisted data for the subscription
type SubscriptionInfo struct {
TimeSorted
ID string `json:"id,omitempty"`
ChannelId string `json:"channel,omitempty"`
Path string `json:"path"`
Summary string `json:"-"` // System generated name for the subscription
Name string `json:"name"` // User provided name for the subscription, set to Summary if missing
Stream string `json:"stream"` // the event stream this subscription is associated under
Signer string `json:"signer"`
FromBlock string `json:"fromBlock,omitempty"`
Filter persistedFilter `json:"filter"`
}

// GetID returns the ID (for sorting)
func (info *SubscriptionInfo) GetID() string {
return info.ID
}

type EventEntry struct {
ChaincodeId string `json:"chaincodeId"`
BlockNumber uint64 `json:"blockNumber"`
Expand All @@ -24,5 +64,4 @@ type EventEntry struct {
Payload []byte `json:"payload"`
Timestamp uint64 `json:"timestamp,omitempty"`
SubID string `json:"subId"`
Signature string `json:"signature"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package events
package api

// TimeSorted base structure for time sortable things
type TimeSorted struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package events
package api

import (
"sort"
Expand Down
10 changes: 5 additions & 5 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

"github.com/hyperledger-labs/firefly-fabconnect/internal/auth"
"github.com/hyperledger-labs/firefly-fabconnect/internal/errors"
"github.com/hyperledger-labs/firefly-fabconnect/internal/events/api"
eventsapi "github.com/hyperledger-labs/firefly-fabconnect/internal/events/api"
"github.com/hyperledger-labs/firefly-fabconnect/internal/ws"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -59,7 +59,7 @@ const (

// StreamInfo configures the stream to perform an action for each event
type StreamInfo struct {
TimeSorted
eventsapi.TimeSorted
ID string `json:"id"`
Name string `json:"name,omitempty"`
Path string `json:"path"`
Expand Down Expand Up @@ -111,7 +111,7 @@ type eventStream struct {
}

type eventStreamAction interface {
attemptBatch(batchNumber, attempt uint64, events []*api.EventEntry) error
attemptBatch(batchNumber, attempt uint64, events []*eventsapi.EventEntry) error
}

func validateWebSocket(w *webSocketActionInfo) error {
Expand Down Expand Up @@ -570,7 +570,7 @@ func (a *eventStream) processBatch(batchNumber uint64, events []*eventData) {
attempt++
log.Infof("%s: Batch %d initiated with %d events. FirstBlock=%d LastBlock=%d", a.spec.ID, batchNumber, len(events), events[0].event.BlockNumber, events[len(events)-1].event.BlockNumber)
a.updateWG.Add(1)
eventEntries := make([]*api.EventEntry, len(events))
eventEntries := make([]*eventsapi.EventEntry, len(events))
for i, entry := range events {
eventEntries[i] = entry.event
}
Expand Down Expand Up @@ -611,7 +611,7 @@ func (a *eventStream) processBatch(batchNumber uint64, events []*eventData) {

// performActionWithRetry performs an action, with exponential backoff retry up
// to a given threshold
func (a *eventStream) performActionWithRetry(batchNumber uint64, events []*api.EventEntry) (err error) {
func (a *eventStream) performActionWithRetry(batchNumber uint64, events []*eventsapi.EventEntry) (err error) {
startTime := time.Now()
endTime := startTime.Add(time.Duration(a.spec.RetryTimeoutSec) * time.Second)
delay := a.initialRetryDelay
Expand Down
5 changes: 3 additions & 2 deletions internal/events/evtprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ func (ep *evtProcessor) initBlockHWM(intVal uint64) {
ep.hwmSync.Unlock()
}

func (ep *evtProcessor) processEventEntry(subInfo string, entry *api.EventEntry) (err error) {
func (ep *evtProcessor) processEventEntry(subID string, entry *api.EventEntry) (err error) {
entry.SubID = subID
result := eventData{
event: entry,
batchComplete: ep.batchComplete,
}

// Ok, now we have the full event in a friendly map output. Pass it down to the stream
log.Infof("%s: Dispatching event. BlockNumber=%d TxId=%s", subInfo, result.event.BlockNumber, result.event.TransactionId)
log.Infof("%s: Dispatching event. BlockNumber=%d TxId=%s", subID, result.event.BlockNumber, result.event.TransactionId)
ep.stream.handleEvent(&result)
return nil
}
29 changes: 18 additions & 11 deletions internal/events/submanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/hyperledger-labs/firefly-fabconnect/internal/conf"
"github.com/hyperledger-labs/firefly-fabconnect/internal/errors"
eventsapi "github.com/hyperledger-labs/firefly-fabconnect/internal/events/api"
"github.com/hyperledger-labs/firefly-fabconnect/internal/fabric"
"github.com/hyperledger-labs/firefly-fabconnect/internal/kvstore"
restutil "github.com/hyperledger-labs/firefly-fabconnect/internal/rest/utils"
Expand Down Expand Up @@ -59,9 +60,9 @@ type SubscriptionManager interface {
SuspendStream(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError)
ResumeStream(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError)
DeleteStream(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError)
AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError)
Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*SubscriptionInfo
SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError)
AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError)
Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*eventsapi.SubscriptionInfo
SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError)
ResetSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError)
DeleteSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError)
Close()
Expand Down Expand Up @@ -102,7 +103,7 @@ func NewSubscriptionManager(config *conf.EventstreamConf, rpc fabric.RPCClient,
}

// SubscriptionByID used externally to get serializable details
func (s *subscriptionMGR) SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError) {
func (s *subscriptionMGR) SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError) {
id := params.ByName("subscriptionId")
sub, err := s.subscriptionByID(id)
if err != nil {
Expand All @@ -112,17 +113,17 @@ func (s *subscriptionMGR) SubscriptionByID(res http.ResponseWriter, req *http.Re
}

// Subscriptions used externally to get list subscriptions
func (s *subscriptionMGR) Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*SubscriptionInfo {
l := make([]*SubscriptionInfo, 0, len(s.subscriptions))
func (s *subscriptionMGR) Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*eventsapi.SubscriptionInfo {
l := make([]*eventsapi.SubscriptionInfo, 0, len(s.subscriptions))
for _, sub := range s.subscriptions {
l = append(l, sub.info)
}
return l
}

// AddSubscription adds a new subscription
func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError) {
var spec SubscriptionInfo
func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError) {
var spec eventsapi.SubscriptionInfo
if err := json.NewDecoder(req.Body).Decode(&spec); err != nil {
return nil, restutil.NewRestError(fmt.Sprintf(errors.RESTGatewaySubscriptionInvalid, err), 400)
}
Expand All @@ -132,7 +133,7 @@ func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Req
if spec.Stream == "" {
return nil, restutil.NewRestError(`Missing required parameter "stream"`, 400)
}
spec.TimeSorted = TimeSorted{
spec.TimeSorted = eventsapi.TimeSorted{
CreatedISO8601: time.Now().UTC().Format(time.RFC3339),
}
spec.ID = subIDPrefix + utils.UUIDv4()
Expand All @@ -142,6 +143,12 @@ func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Req
// user did not set an initial block, default to newest
spec.FromBlock = FromBlockNewest
}
if spec.Filter.BlockType == "" {
spec.Filter.BlockType = eventsapi.BlockType_TX
}
if spec.Filter.EventFilter == "" && spec.Filter.ChaincodeId != "" {
spec.Filter.EventFilter = ".*"
}
// Create it
sub, err := newSubscription(s, s.rpc, &spec)
if err != nil {
Expand Down Expand Up @@ -221,7 +228,7 @@ func (s *subscriptionMGR) deleteSubscription(sub *subscription) error {
return nil
}

func (s *subscriptionMGR) storeSubscription(info *SubscriptionInfo) (*SubscriptionInfo, error) {
func (s *subscriptionMGR) storeSubscription(info *eventsapi.SubscriptionInfo) (*eventsapi.SubscriptionInfo, error) {
infoBytes, _ := json.MarshalIndent(info, "", " ")
if err := s.db.Put(info.ID, infoBytes); err != nil {
return nil, errors.Errorf(errors.EventStreamsSubscribeStoreFailed, err)
Expand Down Expand Up @@ -472,7 +479,7 @@ func (s *subscriptionMGR) recoverSubscriptions() {
for iSub.Next() {
k := iSub.Key()
if strings.HasPrefix(k, subIDPrefix) {
var subInfo SubscriptionInfo
var subInfo eventsapi.SubscriptionInfo
err := json.Unmarshal(iSub.Value(), &subInfo)
if err != nil {
log.Errorf("Failed to recover subscription '%s': %s", string(iSub.Value()), err)
Expand Down
Loading

0 comments on commit 4c6c064

Please sign in to comment.