Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event data and filtering enhancements #41

Merged
merged 4 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.16

require (
github.com/Shopify/sarama v1.29.1
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
github.com/fatih/color v1.12.0 // indirect
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8
github.com/golang/protobuf v1.5.2
Expand All @@ -16,6 +17,8 @@ require (
github.com/hyperledger/fabric-protos-go v0.0.0-20200707132912-fee30f3ccd23
github.com/hyperledger/fabric-sdk-go v1.0.1-0.20210729165856-3be4ed253dcf
github.com/julienschmidt/httprouter v1.3.0
github.com/mattn/go-isatty v0.0.13 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d
github.com/oklog/ulid/v2 v2.0.2
Expand All @@ -31,7 +34,7 @@ require (
github.com/sykesm/zap-logfmt v0.0.4 // indirect
github.com/syndtr/goleveldb v1.0.0
github.com/x-cray/logrus-prefixed-formatter v0.5.2
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf // indirect
golang.org/x/tools v0.1.5 // indirect
gopkg.in/yaml.v2 v2.4.0
)
Expand Down
19 changes: 12 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU=
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/daaku/go.zipexe v1.0.0/go.mod h1:z8IiR6TsVLEYKwXAoE/I+8ys/sDkgTzSL0CLnGVd57E=
Expand Down Expand Up @@ -416,13 +417,15 @@ github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.13 h1:qdl+GuBjcsKKDco5BsxPJlId98mSWNKqYA+Co0SC1yA=
github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.6/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand Down Expand Up @@ -555,19 +558,21 @@ github.com/pseudomuto/protokit v0.2.0/go.mod h1:2PdH30hxVHsup8KpBTOXTBeMVhJZVio3
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand Down Expand Up @@ -875,8 +880,8 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
41 changes: 40 additions & 1 deletion internal/events/api/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,46 @@

package api

const (
BlockType_TX = "tx" // corresponds to blocks containing regular transactions
BlockType_Config = "config" // corresponds to blocks containing channel configurations and updates
)

// persistedFilter is the part of the filter we record to storage
// BlockType: optional. only notify on blocks of a specific type
// types are defined in github.com/hyperledger/fabric-protos-go/common:
// "config": for HeaderType_CONFIG, HeaderType_CONFIG_UPDATE
// "tx": for HeaderType_ENDORSER_TRANSACTION
// ChaincodeId: optional, only notify on blocks containing events for chaincode Id
// Filter: optional. regexp applied to the event name. can be used independent of Chaincode ID
// FromBlock: optional. "newest", "oldest", a number. default is "newest"
type persistedFilter struct {
BlockType string `json:"blockType,omitempty"`
ChaincodeId string `json:"chaincodeId,omitempty"`
EventFilter string `json:"eventFilter,omitempty"`
Filter string `json:"filter,omitempty"`
ToBlock uint64 `json:"toBlock,omitempty"`
}

// SubscriptionInfo is the persisted data for the subscription
type SubscriptionInfo struct {
TimeSorted
ID string `json:"id,omitempty"`
ChannelId string `json:"channel,omitempty"`
Path string `json:"path"`
Summary string `json:"-"` // System generated name for the subscription
Name string `json:"name"` // User provided name for the subscription, set to Summary if missing
Stream string `json:"stream"` // the event stream this subscription is associated under
Signer string `json:"signer"`
FromBlock string `json:"fromBlock,omitempty"`
Filter persistedFilter `json:"filter"`
}

// GetID returns the ID (for sorting)
func (info *SubscriptionInfo) GetID() string {
return info.ID
}

type EventEntry struct {
ChaincodeId string `json:"chaincodeId"`
BlockNumber uint64 `json:"blockNumber"`
Expand All @@ -24,5 +64,4 @@ type EventEntry struct {
Payload []byte `json:"payload"`
Timestamp uint64 `json:"timestamp,omitempty"`
SubID string `json:"subId"`
Signature string `json:"signature"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package events
package api

// TimeSorted base structure for time sortable things
type TimeSorted struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package events
package api

import (
"sort"
Expand Down
10 changes: 5 additions & 5 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

"github.com/hyperledger-labs/firefly-fabconnect/internal/auth"
"github.com/hyperledger-labs/firefly-fabconnect/internal/errors"
"github.com/hyperledger-labs/firefly-fabconnect/internal/events/api"
eventsapi "github.com/hyperledger-labs/firefly-fabconnect/internal/events/api"
"github.com/hyperledger-labs/firefly-fabconnect/internal/ws"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -59,7 +59,7 @@ const (

// StreamInfo configures the stream to perform an action for each event
type StreamInfo struct {
TimeSorted
eventsapi.TimeSorted
ID string `json:"id"`
Name string `json:"name,omitempty"`
Path string `json:"path"`
Expand Down Expand Up @@ -111,7 +111,7 @@ type eventStream struct {
}

type eventStreamAction interface {
attemptBatch(batchNumber, attempt uint64, events []*api.EventEntry) error
attemptBatch(batchNumber, attempt uint64, events []*eventsapi.EventEntry) error
}

func validateWebSocket(w *webSocketActionInfo) error {
Expand Down Expand Up @@ -570,7 +570,7 @@ func (a *eventStream) processBatch(batchNumber uint64, events []*eventData) {
attempt++
log.Infof("%s: Batch %d initiated with %d events. FirstBlock=%d LastBlock=%d", a.spec.ID, batchNumber, len(events), events[0].event.BlockNumber, events[len(events)-1].event.BlockNumber)
a.updateWG.Add(1)
eventEntries := make([]*api.EventEntry, len(events))
eventEntries := make([]*eventsapi.EventEntry, len(events))
for i, entry := range events {
eventEntries[i] = entry.event
}
Expand Down Expand Up @@ -611,7 +611,7 @@ func (a *eventStream) processBatch(batchNumber uint64, events []*eventData) {

// performActionWithRetry performs an action, with exponential backoff retry up
// to a given threshold
func (a *eventStream) performActionWithRetry(batchNumber uint64, events []*api.EventEntry) (err error) {
func (a *eventStream) performActionWithRetry(batchNumber uint64, events []*eventsapi.EventEntry) (err error) {
startTime := time.Now()
endTime := startTime.Add(time.Duration(a.spec.RetryTimeoutSec) * time.Second)
delay := a.initialRetryDelay
Expand Down
5 changes: 3 additions & 2 deletions internal/events/evtprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ func (ep *evtProcessor) initBlockHWM(intVal uint64) {
ep.hwmSync.Unlock()
}

func (ep *evtProcessor) processEventEntry(subInfo string, entry *api.EventEntry) (err error) {
func (ep *evtProcessor) processEventEntry(subID string, entry *api.EventEntry) (err error) {
entry.SubID = subID
result := eventData{
event: entry,
batchComplete: ep.batchComplete,
}

// Ok, now we have the full event in a friendly map output. Pass it down to the stream
log.Infof("%s: Dispatching event. BlockNumber=%d TxId=%s", subInfo, result.event.BlockNumber, result.event.TransactionId)
log.Infof("%s: Dispatching event. BlockNumber=%d TxId=%s", subID, result.event.BlockNumber, result.event.TransactionId)
ep.stream.handleEvent(&result)
return nil
}
29 changes: 18 additions & 11 deletions internal/events/submanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/hyperledger-labs/firefly-fabconnect/internal/conf"
"github.com/hyperledger-labs/firefly-fabconnect/internal/errors"
eventsapi "github.com/hyperledger-labs/firefly-fabconnect/internal/events/api"
"github.com/hyperledger-labs/firefly-fabconnect/internal/fabric"
"github.com/hyperledger-labs/firefly-fabconnect/internal/kvstore"
restutil "github.com/hyperledger-labs/firefly-fabconnect/internal/rest/utils"
Expand Down Expand Up @@ -59,9 +60,9 @@ type SubscriptionManager interface {
SuspendStream(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError)
ResumeStream(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError)
DeleteStream(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError)
AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError)
Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*SubscriptionInfo
SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError)
AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError)
Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*eventsapi.SubscriptionInfo
SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError)
ResetSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError)
DeleteSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError)
Close()
Expand Down Expand Up @@ -102,7 +103,7 @@ func NewSubscriptionManager(config *conf.EventstreamConf, rpc fabric.RPCClient,
}

// SubscriptionByID used externally to get serializable details
func (s *subscriptionMGR) SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError) {
func (s *subscriptionMGR) SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError) {
id := params.ByName("subscriptionId")
sub, err := s.subscriptionByID(id)
if err != nil {
Expand All @@ -112,17 +113,17 @@ func (s *subscriptionMGR) SubscriptionByID(res http.ResponseWriter, req *http.Re
}

// Subscriptions used externally to get list subscriptions
func (s *subscriptionMGR) Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*SubscriptionInfo {
l := make([]*SubscriptionInfo, 0, len(s.subscriptions))
func (s *subscriptionMGR) Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*eventsapi.SubscriptionInfo {
l := make([]*eventsapi.SubscriptionInfo, 0, len(s.subscriptions))
for _, sub := range s.subscriptions {
l = append(l, sub.info)
}
return l
}

// AddSubscription adds a new subscription
func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError) {
var spec SubscriptionInfo
func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError) {
var spec eventsapi.SubscriptionInfo
if err := json.NewDecoder(req.Body).Decode(&spec); err != nil {
return nil, restutil.NewRestError(fmt.Sprintf(errors.RESTGatewaySubscriptionInvalid, err), 400)
}
Expand All @@ -132,7 +133,7 @@ func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Req
if spec.Stream == "" {
return nil, restutil.NewRestError(`Missing required parameter "stream"`, 400)
}
spec.TimeSorted = TimeSorted{
spec.TimeSorted = eventsapi.TimeSorted{
CreatedISO8601: time.Now().UTC().Format(time.RFC3339),
}
spec.ID = subIDPrefix + utils.UUIDv4()
Expand All @@ -142,6 +143,12 @@ func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Req
// user did not set an initial block, default to newest
spec.FromBlock = FromBlockNewest
}
if spec.Filter.BlockType == "" {
spec.Filter.BlockType = eventsapi.BlockType_TX
}
if spec.Filter.EventFilter == "" && spec.Filter.ChaincodeId != "" {
spec.Filter.EventFilter = ".*"
}
// Create it
sub, err := newSubscription(s, s.rpc, &spec)
if err != nil {
Expand Down Expand Up @@ -221,7 +228,7 @@ func (s *subscriptionMGR) deleteSubscription(sub *subscription) error {
return nil
}

func (s *subscriptionMGR) storeSubscription(info *SubscriptionInfo) (*SubscriptionInfo, error) {
func (s *subscriptionMGR) storeSubscription(info *eventsapi.SubscriptionInfo) (*eventsapi.SubscriptionInfo, error) {
infoBytes, _ := json.MarshalIndent(info, "", " ")
if err := s.db.Put(info.ID, infoBytes); err != nil {
return nil, errors.Errorf(errors.EventStreamsSubscribeStoreFailed, err)
Expand Down Expand Up @@ -472,7 +479,7 @@ func (s *subscriptionMGR) recoverSubscriptions() {
for iSub.Next() {
k := iSub.Key()
if strings.HasPrefix(k, subIDPrefix) {
var subInfo SubscriptionInfo
var subInfo eventsapi.SubscriptionInfo
err := json.Unmarshal(iSub.Value(), &subInfo)
if err != nil {
log.Errorf("Failed to recover subscription '%s': %s", string(iSub.Value()), err)
Expand Down
Loading