From 5a963b8cd1db729b5a1f8e591f2b780dd4ce0f99 Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Wed, 25 Aug 2021 14:24:59 -0400 Subject: [PATCH 1/4] Event data and filtering enhancements Signed-off-by: Jim Zhang --- go.mod | 5 +- go.sum | 19 ++-- internal/events/api/event.go | 41 ++++++++- internal/events/{ => api}/timesorted.go | 2 +- internal/events/{ => api}/timesorted_test.go | 2 +- internal/events/eventstream.go | 10 +-- internal/events/evtprocessor.go | 5 +- internal/events/submanager.go | 26 +++--- internal/events/subscription.go | 93 +++++++++----------- internal/fabric/blockdecoder.go | 2 +- internal/fabric/rpc.go | 45 +++++++--- internal/fabric/tx.go | 10 ++- internal/rest/utils/params.go | 13 ++- 13 files changed, 173 insertions(+), 100 deletions(-) rename internal/events/{ => api}/timesorted.go (98%) rename internal/events/{ => api}/timesorted_test.go (99%) diff --git a/go.mod b/go.mod index f168530..2197edf 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/Shopify/sarama v1.29.1 + github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect github.com/fatih/color v1.12.0 // indirect github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 github.com/golang/protobuf v1.5.2 @@ -16,6 +17,8 @@ require ( github.com/hyperledger/fabric-protos-go v0.0.0-20200707132912-fee30f3ccd23 github.com/hyperledger/fabric-sdk-go v1.0.1-0.20210729165856-3be4ed253dcf github.com/julienschmidt/httprouter v1.3.0 + github.com/mattn/go-isatty v0.0.13 // indirect + github.com/mattn/go-runewidth v0.0.13 // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d github.com/oklog/ulid/v2 v2.0.2 @@ -31,7 +34,7 @@ require ( github.com/sykesm/zap-logfmt v0.0.4 // indirect github.com/syndtr/goleveldb v1.0.0 github.com/x-cray/logrus-prefixed-formatter v0.5.2 - golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect + golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf // indirect golang.org/x/tools v0.1.5 // indirect gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 2e6f409..7f8596b 100644 --- a/go.sum +++ b/go.sum @@ -123,8 +123,9 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= -github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= +github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/daaku/go.zipexe v1.0.0/go.mod h1:z8IiR6TsVLEYKwXAoE/I+8ys/sDkgTzSL0CLnGVd57E= @@ -416,13 +417,15 @@ github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= -github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.13 h1:qdl+GuBjcsKKDco5BsxPJlId98mSWNKqYA+Co0SC1yA= +github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.6/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= -github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -555,19 +558,21 @@ github.com/pseudomuto/protokit v0.2.0/go.mod h1:2PdH30hxVHsup8KpBTOXTBeMVhJZVio3 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= -github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= -github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -875,8 +880,8 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k= +golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/internal/events/api/event.go b/internal/events/api/event.go index 44ccb3f..acdf708 100644 --- a/internal/events/api/event.go +++ b/internal/events/api/event.go @@ -16,6 +16,46 @@ package api +const ( + BlockType_TX = "tx" // corresponds to blocks containing regular transactions + BlockType_Config = "config" // corresponds to blocks containing channel configurations and updates +) + +// persistedFilter is the part of the filter we record to storage +// BlockType: optional. only notify on blocks of a specific type +// types are defined in github.com/hyperledger/fabric-protos-go/common: +// "config": for HeaderType_CONFIG, HeaderType_CONFIG_UPDATE +// "tx": for HeaderType_ENDORSER_TRANSACTION +// ChaincodeId: optional, only notify on blocks containing events for chaincode Id +// Filter: optional. regexp applied to the event name. can be used independent of Chaincode ID +// FromBlock: optional. "newest", "oldest", a number. default is "newest" +type persistedFilter struct { + BlockType string `json:"blockType,omitempty"` + ChaincodeId string `json:"chaincodeId,omitempty"` + EventFilter string `json:"eventFilter,omitempty"` + Filter string `json:"filter,omitempty"` + ToBlock uint64 `json:"toBlock,omitempty"` +} + +// SubscriptionInfo is the persisted data for the subscription +type SubscriptionInfo struct { + TimeSorted + ID string `json:"id,omitempty"` + ChannelId string `json:"channel,omitempty"` + Path string `json:"path"` + Summary string `json:"-"` // System generated name for the subscription + Name string `json:"name"` // User provided name for the subscription, set to Summary if missing + Stream string `json:"stream"` // the event stream this subscription is associated under + Signer string `json:"signer"` + FromBlock string `json:"fromBlock,omitempty"` + Filter persistedFilter `json:"filter"` +} + +// GetID returns the ID (for sorting) +func (info *SubscriptionInfo) GetID() string { + return info.ID +} + type EventEntry struct { ChaincodeId string `json:"chaincodeId"` BlockNumber uint64 `json:"blockNumber"` @@ -24,5 +64,4 @@ type EventEntry struct { Payload []byte `json:"payload"` Timestamp uint64 `json:"timestamp,omitempty"` SubID string `json:"subId"` - Signature string `json:"signature"` } diff --git a/internal/events/timesorted.go b/internal/events/api/timesorted.go similarity index 98% rename from internal/events/timesorted.go rename to internal/events/api/timesorted.go index 08c961d..2a888a0 100644 --- a/internal/events/timesorted.go +++ b/internal/events/api/timesorted.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package events +package api // TimeSorted base structure for time sortable things type TimeSorted struct { diff --git a/internal/events/timesorted_test.go b/internal/events/api/timesorted_test.go similarity index 99% rename from internal/events/timesorted_test.go rename to internal/events/api/timesorted_test.go index f0f9e0c..71487ee 100644 --- a/internal/events/timesorted_test.go +++ b/internal/events/api/timesorted_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package events +package api import ( "sort" diff --git a/internal/events/eventstream.go b/internal/events/eventstream.go index d365f6c..4167393 100644 --- a/internal/events/eventstream.go +++ b/internal/events/eventstream.go @@ -27,7 +27,7 @@ import ( "github.com/hyperledger-labs/firefly-fabconnect/internal/auth" "github.com/hyperledger-labs/firefly-fabconnect/internal/errors" - "github.com/hyperledger-labs/firefly-fabconnect/internal/events/api" + eventsapi "github.com/hyperledger-labs/firefly-fabconnect/internal/events/api" "github.com/hyperledger-labs/firefly-fabconnect/internal/ws" log "github.com/sirupsen/logrus" @@ -59,7 +59,7 @@ const ( // StreamInfo configures the stream to perform an action for each event type StreamInfo struct { - TimeSorted + eventsapi.TimeSorted ID string `json:"id"` Name string `json:"name,omitempty"` Path string `json:"path"` @@ -111,7 +111,7 @@ type eventStream struct { } type eventStreamAction interface { - attemptBatch(batchNumber, attempt uint64, events []*api.EventEntry) error + attemptBatch(batchNumber, attempt uint64, events []*eventsapi.EventEntry) error } func validateWebSocket(w *webSocketActionInfo) error { @@ -570,7 +570,7 @@ func (a *eventStream) processBatch(batchNumber uint64, events []*eventData) { attempt++ log.Infof("%s: Batch %d initiated with %d events. FirstBlock=%d LastBlock=%d", a.spec.ID, batchNumber, len(events), events[0].event.BlockNumber, events[len(events)-1].event.BlockNumber) a.updateWG.Add(1) - eventEntries := make([]*api.EventEntry, len(events)) + eventEntries := make([]*eventsapi.EventEntry, len(events)) for i, entry := range events { eventEntries[i] = entry.event } @@ -611,7 +611,7 @@ func (a *eventStream) processBatch(batchNumber uint64, events []*eventData) { // performActionWithRetry performs an action, with exponential backoff retry up // to a given threshold -func (a *eventStream) performActionWithRetry(batchNumber uint64, events []*api.EventEntry) (err error) { +func (a *eventStream) performActionWithRetry(batchNumber uint64, events []*eventsapi.EventEntry) (err error) { startTime := time.Now() endTime := startTime.Add(time.Duration(a.spec.RetryTimeoutSec) * time.Second) delay := a.initialRetryDelay diff --git a/internal/events/evtprocessor.go b/internal/events/evtprocessor.go index ebcb629..35f26ef 100644 --- a/internal/events/evtprocessor.go +++ b/internal/events/evtprocessor.go @@ -63,14 +63,15 @@ func (ep *evtProcessor) initBlockHWM(intVal uint64) { ep.hwmSync.Unlock() } -func (ep *evtProcessor) processEventEntry(subInfo string, entry *api.EventEntry) (err error) { +func (ep *evtProcessor) processEventEntry(subID string, entry *api.EventEntry) (err error) { + entry.SubID = subID result := eventData{ event: entry, batchComplete: ep.batchComplete, } // Ok, now we have the full event in a friendly map output. Pass it down to the stream - log.Infof("%s: Dispatching event. BlockNumber=%d TxId=%s", subInfo, result.event.BlockNumber, result.event.TransactionId) + log.Infof("%s: Dispatching event. BlockNumber=%d TxId=%s", subID, result.event.BlockNumber, result.event.TransactionId) ep.stream.handleEvent(&result) return nil } diff --git a/internal/events/submanager.go b/internal/events/submanager.go index b1364d6..96480ad 100644 --- a/internal/events/submanager.go +++ b/internal/events/submanager.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger-labs/firefly-fabconnect/internal/conf" "github.com/hyperledger-labs/firefly-fabconnect/internal/errors" + eventsapi "github.com/hyperledger-labs/firefly-fabconnect/internal/events/api" "github.com/hyperledger-labs/firefly-fabconnect/internal/fabric" "github.com/hyperledger-labs/firefly-fabconnect/internal/kvstore" restutil "github.com/hyperledger-labs/firefly-fabconnect/internal/rest/utils" @@ -59,9 +60,9 @@ type SubscriptionManager interface { SuspendStream(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError) ResumeStream(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError) DeleteStream(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError) - AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError) - Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*SubscriptionInfo - SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError) + AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError) + Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*eventsapi.SubscriptionInfo + SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError) ResetSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError) DeleteSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*map[string]string, *restutil.RestError) Close() @@ -102,7 +103,7 @@ func NewSubscriptionManager(config *conf.EventstreamConf, rpc fabric.RPCClient, } // SubscriptionByID used externally to get serializable details -func (s *subscriptionMGR) SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError) { +func (s *subscriptionMGR) SubscriptionByID(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError) { id := params.ByName("subscriptionId") sub, err := s.subscriptionByID(id) if err != nil { @@ -112,8 +113,8 @@ func (s *subscriptionMGR) SubscriptionByID(res http.ResponseWriter, req *http.Re } // Subscriptions used externally to get list subscriptions -func (s *subscriptionMGR) Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*SubscriptionInfo { - l := make([]*SubscriptionInfo, 0, len(s.subscriptions)) +func (s *subscriptionMGR) Subscriptions(res http.ResponseWriter, req *http.Request, params httprouter.Params) []*eventsapi.SubscriptionInfo { + l := make([]*eventsapi.SubscriptionInfo, 0, len(s.subscriptions)) for _, sub := range s.subscriptions { l = append(l, sub.info) } @@ -121,8 +122,8 @@ func (s *subscriptionMGR) Subscriptions(res http.ResponseWriter, req *http.Reque } // AddSubscription adds a new subscription -func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*SubscriptionInfo, *restutil.RestError) { - var spec SubscriptionInfo +func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*eventsapi.SubscriptionInfo, *restutil.RestError) { + var spec eventsapi.SubscriptionInfo if err := json.NewDecoder(req.Body).Decode(&spec); err != nil { return nil, restutil.NewRestError(fmt.Sprintf(errors.RESTGatewaySubscriptionInvalid, err), 400) } @@ -132,7 +133,7 @@ func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Req if spec.Stream == "" { return nil, restutil.NewRestError(`Missing required parameter "stream"`, 400) } - spec.TimeSorted = TimeSorted{ + spec.TimeSorted = eventsapi.TimeSorted{ CreatedISO8601: time.Now().UTC().Format(time.RFC3339), } spec.ID = subIDPrefix + utils.UUIDv4() @@ -142,6 +143,9 @@ func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Req // user did not set an initial block, default to newest spec.FromBlock = FromBlockNewest } + if spec.Filter.BlockType == "" { + spec.Filter.BlockType = eventsapi.BlockType_TX + } // Create it sub, err := newSubscription(s, s.rpc, &spec) if err != nil { @@ -221,7 +225,7 @@ func (s *subscriptionMGR) deleteSubscription(sub *subscription) error { return nil } -func (s *subscriptionMGR) storeSubscription(info *SubscriptionInfo) (*SubscriptionInfo, error) { +func (s *subscriptionMGR) storeSubscription(info *eventsapi.SubscriptionInfo) (*eventsapi.SubscriptionInfo, error) { infoBytes, _ := json.MarshalIndent(info, "", " ") if err := s.db.Put(info.ID, infoBytes); err != nil { return nil, errors.Errorf(errors.EventStreamsSubscribeStoreFailed, err) @@ -472,7 +476,7 @@ func (s *subscriptionMGR) recoverSubscriptions() { for iSub.Next() { k := iSub.Key() if strings.HasPrefix(k, subIDPrefix) { - var subInfo SubscriptionInfo + var subInfo eventsapi.SubscriptionInfo err := json.Unmarshal(iSub.Value(), &subInfo) if err != nil { log.Errorf("Failed to recover subscription '%s': %s", string(iSub.Value()), err) diff --git a/internal/events/subscription.go b/internal/events/subscription.go index 8490c88..0fb72a9 100644 --- a/internal/events/subscription.go +++ b/internal/events/subscription.go @@ -22,54 +22,28 @@ import ( "strconv" "github.com/hyperledger-labs/firefly-fabconnect/internal/errors" + eventsapi "github.com/hyperledger-labs/firefly-fabconnect/internal/events/api" "github.com/hyperledger-labs/firefly-fabconnect/internal/fabric" "github.com/hyperledger/fabric-sdk-go/pkg/client/event" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" log "github.com/sirupsen/logrus" ) -// persistedFilter is the part of the filter we record to storage -// BlockType: optional. only notify on blocks of a specific type -// types are defined in github.com/hyperledger/fabric-protos-go/common: -// HeaderType_CONFIG, HeaderType_CONFIG_UPDATE, HeaderType_MESSAGE -// ChaincodeId: optional, only notify on blocks containing events for chaincode Id -// Filter: optional. regexp applied to the event name. can be used independent of Chaincode ID -// FromBlock: optional. "newest", "oldest", a number. default is "newest" -type persistedFilter struct { - BlockType string `json:"blockType,omitempty"` - ChaincodeId string `json:"chaincodeId,omitempty"` - Filter string `json:"filter,omitempty"` - ToBlock uint64 `json:"toBlock,omitempty"` -} - -// SubscriptionInfo is the persisted data for the subscription -type SubscriptionInfo struct { - TimeSorted - ID string `json:"id,omitempty"` - ChannelId string `json:"channel,omitempty"` - Path string `json:"path"` - Summary string `json:"-"` // System generated name for the subscription - Name string `json:"name"` // User provided name for the subscription, set to Summary if missing - Stream string `json:"stream"` // the event stream this subscription is associated under - Signer string `json:"signer"` - FromBlock string `json:"fromBlock,omitempty"` - Filter persistedFilter `json:"filter"` -} - // subscription is the runtime that manages the subscription type subscription struct { - info *SubscriptionInfo - client fabric.RPCClient - ep *evtProcessor - registration fab.Registration - notifier <-chan *fab.BlockEvent - eventClient *event.Client - filterStale bool - deleting bool - resetRequested bool + info *eventsapi.SubscriptionInfo + client fabric.RPCClient + ep *evtProcessor + registration fab.Registration + blockEventNotifier <-chan *fab.BlockEvent + ccEventNotifier <-chan *fab.CCEvent + eventClient *event.Client + filterStale bool + deleting bool + resetRequested bool } -func newSubscription(sm subscriptionManager, rpc fabric.RPCClient, i *SubscriptionInfo) (*subscription, error) { +func newSubscription(sm subscriptionManager, rpc fabric.RPCClient, i *eventsapi.SubscriptionInfo) (*subscription, error) { stream, err := sm.streamByID(i.Stream) if err != nil { return nil, err @@ -90,12 +64,7 @@ func newSubscription(sm subscriptionManager, rpc fabric.RPCClient, i *Subscripti return s, nil } -// GetID returns the ID (for sorting) -func (info *SubscriptionInfo) GetID() string { - return info.ID -} - -func restoreSubscription(sm subscriptionManager, rpc fabric.RPCClient, i *SubscriptionInfo) (*subscription, error) { +func restoreSubscription(sm subscriptionManager, rpc fabric.RPCClient, i *eventsapi.SubscriptionInfo) (*subscription, error) { if i.GetID() == "" { return nil, errors.Errorf(errors.EventStreamsNoID) } @@ -138,12 +107,13 @@ func (s *subscription) setCheckpointBlockHeight(i uint64) { } func (s *subscription) restartFilter(ctx context.Context, since uint64) error { - reg, notifier, eventClient, err := s.client.SubscribeEvent(s.info.ChannelId, s.info.Signer, since) + reg, blockEventNotifier, ccEventNotifier, eventClient, err := s.client.SubscribeEvent(s.info, since) if err != nil { return errors.Errorf(errors.RPCCallReturnedError, "SubscribeEvent", err) } s.registration = reg - s.notifier = notifier + s.blockEventNotifier = blockEventNotifier + s.ccEventNotifier = ccEventNotifier s.eventClient = eventClient s.markFilterStale(false) @@ -156,13 +126,30 @@ func (s *subscription) restartFilter(ctx context.Context, since uint64) error { func (s *subscription) processNewEvents() { for { - blockEvent, ok := <-s.notifier - if !ok { - log.Infof("%s: Event notifier channel closed", s.info.ID) - return - } - events := fabric.GetEvents(blockEvent.Block) - for _, event := range events { + select { + case blockEvent, ok := <-s.blockEventNotifier: + if !ok { + log.Infof("%s: Block event notifier channel closed", s.info.ID) + return + } + events := fabric.GetEvents(blockEvent.Block) + for _, event := range events { + if err := s.ep.processEventEntry(s.info.ID, event); err != nil { + log.Errorf("Failed to process event: %s", err) + } + } + case ccEvent, ok := <-s.ccEventNotifier: + if !ok { + log.Infof("%s: Chaincode event notifier channel closed", s.info.ID) + return + } + event := &eventsapi.EventEntry{ + ChaincodeId: ccEvent.ChaincodeID, + BlockNumber: ccEvent.BlockNumber, + TransactionId: ccEvent.TxID, + EventName: ccEvent.EventName, + Payload: ccEvent.Payload, + } if err := s.ep.processEventEntry(s.info.ID, event); err != nil { log.Errorf("Failed to process event: %s", err) } diff --git a/internal/fabric/blockdecoder.go b/internal/fabric/blockdecoder.go index 464cf70..eb01db9 100644 --- a/internal/fabric/blockdecoder.go +++ b/internal/fabric/blockdecoder.go @@ -37,7 +37,7 @@ func GetEvents(block *cb.Block) []*api.EventEntry { for _, ccAction := range ccActions { event := ccAction.GetChaincodeEvent() eventEntry := api.EventEntry{ - ChaincodeId: event.TxId, + ChaincodeId: event.ChaincodeId, BlockNumber: fb.Number, TransactionId: tx.Txid, EventName: event.EventName, diff --git a/internal/fabric/rpc.go b/internal/fabric/rpc.go index 354113e..4973443 100644 --- a/internal/fabric/rpc.go +++ b/internal/fabric/rpc.go @@ -19,6 +19,7 @@ package fabric import ( "github.com/hyperledger-labs/firefly-fabconnect/internal/conf" "github.com/hyperledger-labs/firefly-fabconnect/internal/errors" + eventsapi "github.com/hyperledger-labs/firefly-fabconnect/internal/events/api" "github.com/hyperledger-labs/firefly-fabconnect/internal/rest/identity" cb "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-sdk-go/pkg/client/channel" @@ -49,7 +50,7 @@ type RPCClient interface { Invoke(channelId, signer, chaincodeName, method string, args []string) (*TxReceipt, error) Query(channelId, signer, chaincodeName, method string, args []string) (*channel.Response, error) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) - SubscribeEvent(channelId, signer string, since uint64) (fab.Registration, <-chan *fab.BlockEvent, *event.Client, error) + SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (fab.Registration, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, *event.Client, error) Close() } @@ -249,10 +250,10 @@ func (w *rpcWrapper) QueryChainInfo(channelId, signer string) (*fab.BlockchainIn } // The returned registration must be closed when done -func (w *rpcWrapper) SubscribeEvent(channelId, signer string, since uint64) (fab.Registration, <-chan *fab.BlockEvent, *event.Client, error) { - client, err := w.getChannelClient(channelId, signer) +func (w *rpcWrapper) SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (fab.Registration, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, *event.Client, error) { + client, err := w.getChannelClient(subInfo.ChannelId, subInfo.Signer) if err != nil { - return nil, nil, nil, errors.Errorf("Failed to get channel client. %s", err) + return nil, nil, nil, nil, errors.Errorf("Failed to get channel client. %s", err) } eventOpts := []event.ClientOption{ @@ -262,17 +263,39 @@ func (w *rpcWrapper) SubscribeEvent(channelId, signer string, since uint64) (fab eventOpts = append(eventOpts, event.WithSeekType(seek.FromBlock), event.WithBlockNum(since)) } eventClient, err := event.New(client.channelProvider, eventOpts...) + log.Debugf("event service used: %+v", eventClient) if err != nil { log.Errorf("Failed to create event client. %s", err) - return nil, nil, nil, errors.Errorf("Failed to create event client. %s", err) + return nil, nil, nil, nil, errors.Errorf("Failed to create event client. %s", err) } - reg, notifier, err := eventClient.RegisterBlockEvent(headertypefilter.New(cb.HeaderType_ENDORSER_TRANSACTION)) - if err != nil { - return nil, nil, nil, errors.Errorf("Failed to subscribe to block events. %s", err) + if subInfo.Filter.ChaincodeId != "" { + if subInfo.Filter.EventFilter == "" { + subInfo.Filter.EventFilter = ".*" + } + reg, notifier, err := eventClient.RegisterChaincodeEvent(subInfo.Filter.ChaincodeId, subInfo.Filter.EventFilter) + if err != nil { + return nil, nil, nil, nil, errors.Errorf("Failed to subscribe to chaincode events. %s", err) + } + log.Infof("Subscribed to events in channel %s from block %d (0 means newest)", subInfo.ChannelId, since) + return reg, nil, notifier, eventClient, nil + } else { + blockType := subInfo.Filter.BlockType + var blockfilter fab.BlockFilter + if blockType == eventsapi.BlockType_TX { + blockfilter = headertypefilter.New(cb.HeaderType_ENDORSER_TRANSACTION) + } else if blockType == eventsapi.BlockType_Config { + blockfilter = headertypefilter.New(cb.HeaderType_CONFIG, cb.HeaderType_CONFIG_UPDATE) + } else { + blockfilter = headertypefilter.New(cb.HeaderType_ENDORSER_TRANSACTION) + } + + reg, notifier, err := eventClient.RegisterBlockEvent(blockfilter) + if err != nil { + return nil, nil, nil, nil, errors.Errorf("Failed to subscribe to block events. %s", err) + } + log.Infof("Subscribed to events in channel %s from block %d (0 means newest)", subInfo.ChannelId, since) + return reg, notifier, nil, eventClient, nil } - log.Infof("Subscribed to events in channel %s from block %d (0 means newest)", channelId, since) - log.Debugf("event service used: %+v", eventClient) - return reg, notifier, eventClient, nil } func (w *rpcWrapper) sendTransaction(channelId, signer, chaincodeName, method string, args []string, isInit bool) (*msp.IdentityIdentifier, *channel.Response, *fab.TxStatusEvent, error) { diff --git a/internal/fabric/tx.go b/internal/fabric/tx.go index c72d2f6..8bb2480 100644 --- a/internal/fabric/tx.go +++ b/internal/fabric/tx.go @@ -79,10 +79,16 @@ func (tx *Tx) GetTXReceipt(ctx context.Context, rpc RPCClient) (bool, error) { } // Send sends an individual transaction -func (tx *Tx) Send(ctx context.Context, rpc RPCClient) (err error) { +func (tx *Tx) Send(ctx context.Context, rpc RPCClient) error { start := time.Now().UTC() - receipt, err := rpc.Invoke(tx.ChannelID, tx.Signer, tx.ChaincodeName, tx.Function, tx.Args) + var receipt *TxReceipt + var err error + if tx.IsInit { + receipt, err = rpc.Init(tx.ChannelID, tx.Signer, tx.ChaincodeName, tx.Function, tx.Args) + } else { + receipt, err = rpc.Invoke(tx.ChannelID, tx.Signer, tx.ChaincodeName, tx.Function, tx.Args) + } tx.lock.Lock() tx.Receipt = receipt tx.lock.Unlock() diff --git a/internal/rest/utils/params.go b/internal/rest/utils/params.go index 1bd018d..8a7206e 100644 --- a/internal/rest/utils/params.go +++ b/internal/rest/utils/params.go @@ -104,11 +104,16 @@ func BuildTxMessage(res http.ResponseWriter, req *http.Request, params httproute msg.Headers.ChaincodeName = chaincode isInitVal := body["init"] if isInitVal != nil { - isInit, err := strconv.ParseBool(isInitVal.(string)) - if err != nil { - return nil, nil, NewRestError(err.Error(), 400) + strVal, ok := isInitVal.(string) + if ok { + isInit, err := strconv.ParseBool(strVal) + if err != nil { + return nil, nil, NewRestError(err.Error(), 400) + } + msg.IsInit = isInit + } else { + msg.IsInit = isInitVal.(bool) } - msg.IsInit = isInit } msg.Function = body["func"].(string) if msg.Function == "" { From 7f21d5e05cf4e324bae3c4593526a4bff41e24ec Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Thu, 26 Aug 2021 09:56:59 -0400 Subject: [PATCH 2/4] Added event filtering default if empty input Signed-off-by: Jim Zhang --- internal/events/submanager.go | 3 +++ internal/fabric/rpc.go | 3 --- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/events/submanager.go b/internal/events/submanager.go index 96480ad..1b36641 100644 --- a/internal/events/submanager.go +++ b/internal/events/submanager.go @@ -146,6 +146,9 @@ func (s *subscriptionMGR) AddSubscription(res http.ResponseWriter, req *http.Req if spec.Filter.BlockType == "" { spec.Filter.BlockType = eventsapi.BlockType_TX } + if spec.Filter.EventFilter == "" && spec.Filter.ChaincodeId != "" { + spec.Filter.EventFilter = ".*" + } // Create it sub, err := newSubscription(s, s.rpc, &spec) if err != nil { diff --git a/internal/fabric/rpc.go b/internal/fabric/rpc.go index 4973443..50af80f 100644 --- a/internal/fabric/rpc.go +++ b/internal/fabric/rpc.go @@ -269,9 +269,6 @@ func (w *rpcWrapper) SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since u return nil, nil, nil, nil, errors.Errorf("Failed to create event client. %s", err) } if subInfo.Filter.ChaincodeId != "" { - if subInfo.Filter.EventFilter == "" { - subInfo.Filter.EventFilter = ".*" - } reg, notifier, err := eventClient.RegisterChaincodeEvent(subInfo.Filter.ChaincodeId, subInfo.Filter.EventFilter) if err != nil { return nil, nil, nil, nil, errors.Errorf("Failed to subscribe to chaincode events. %s", err) From 549008e250d68c4902ac0defaf19c5eba0e84efe Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Thu, 26 Aug 2021 16:19:55 -0400 Subject: [PATCH 3/4] Added support for user-supplied request ID in async handler Signed-off-by: Jim Zhang --- internal/rest/async/asyncdispatcher.go | 14 ++++++++------ internal/rest/utils/params.go | 2 ++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/rest/async/asyncdispatcher.go b/internal/rest/async/asyncdispatcher.go index d4791e8..5b4d248 100644 --- a/internal/rest/async/asyncdispatcher.go +++ b/internal/rest/async/asyncdispatcher.go @@ -106,19 +106,21 @@ func (w *asyncDispatcher) processMsg(ctx context.Context, msg *messages.SendTran return nil, 400, errors.Errorf(errors.RequestHandlerInvalidMsgType, msg.Headers.MsgType) } - // We always generate the ID. It cannot be set by the user - msgID := utils.UUIDv4() - msg.Headers.ID = msgID + // Generate a message ID if not already set + if msg.Headers.ID == "" { + msgID := utils.UUIDv4() + msg.Headers.ID = msgID + } // Pass to the handler - log.Infof("Request handler accepted message. MsgID: %s Type: %s", msgID, msg.Headers.MsgType) - msgAck, status, err := w.handler.dispatchMsg(ctx, msg.Headers.ChannelID, msgID, msg, ack) + log.Infof("Request handler accepted message. MsgID: %s Type: %s", msg.Headers.ID, msg.Headers.MsgType) + msgAck, status, err := w.handler.dispatchMsg(ctx, msg.Headers.ChannelID, msg.Headers.ID, msg, ack) if err != nil { return nil, status, err } return &messages.AsyncSentMsg{ Sent: true, - Request: msgID, + Request: msg.Headers.ID, Msg: msgAck, }, 200, nil } diff --git a/internal/rest/utils/params.go b/internal/rest/utils/params.go index 8a7206e..91bf79b 100644 --- a/internal/rest/utils/params.go +++ b/internal/rest/utils/params.go @@ -84,6 +84,7 @@ func BuildTxMessage(res http.ResponseWriter, req *http.Request, params httproute return nil, nil, NewRestError(err.Error(), 400) } + msgId := getFlyParam("id", body, req) channel := getFlyParam("channel", body, req) if channel == "" { return nil, nil, NewRestError("Must specify the channel", 400) @@ -98,6 +99,7 @@ func BuildTxMessage(res http.ResponseWriter, req *http.Request, params httproute } msg := messages.SendTransaction{} + msg.Headers.ID = msgId // this could be empty msg.Headers.MsgType = messages.MsgTypeSendTransaction msg.Headers.ChannelID = channel msg.Headers.Signer = signer From 8e54e5f78a7f7358476a9d36bf44d64d2204c361 Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Mon, 30 Aug 2021 18:04:15 -0400 Subject: [PATCH 4/4] WithOrg() function takes the org name instead of MSP ID Signed-off-by: Jim Zhang --- internal/fabric/rpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/fabric/rpc.go b/internal/fabric/rpc.go index 50af80f..1ec7be5 100644 --- a/internal/fabric/rpc.go +++ b/internal/fabric/rpc.go @@ -163,7 +163,7 @@ func (w *rpcWrapper) getChannelClient(channelId string, signer string) (*clientW } clientOfUser := w.channelClients[channelId][id.Identifier().ID] if clientOfUser == nil { - channelProvider := w.sdk.ChannelContext(channelId, fabsdk.WithOrg(id.Identifier().MSPID), fabsdk.WithUser(id.Identifier().ID)) + channelProvider := w.sdk.ChannelContext(channelId, fabsdk.WithOrg(w.identityConfig.Client().Organization), fabsdk.WithUser(id.Identifier().ID)) cClient, err := channel.New(channelProvider) if err != nil { return nil, err