Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE. Intro serializer lib with json and protobuf. #107

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.17

require (
github.com/go-redis/redis/v8 v8.11.5
github.com/gogo/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.8.0
github.com/stretchr/testify v1.7.1
Expand Down
31 changes: 31 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ github.com/go-logr/logr v1.2.0 h1:QK40JKJyMdUDz+h+xvCsru/bJhvG0UxvePV0ufL/AcE=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -28,12 +32,39 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 2 additions & 2 deletions resource-management/cmds/service-api/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"global-resource-service/resource-management/pkg/aggregrator"
localMetrics "global-resource-service/resource-management/pkg/common-lib/metrics"
"global-resource-service/resource-management/pkg/common-lib/types/event"
"global-resource-service/resource-management/pkg/common-lib/types"
"global-resource-service/resource-management/pkg/distributor"
"global-resource-service/resource-management/pkg/service-api/endpoints"
"global-resource-service/resource-management/pkg/store/redis"
Expand Down Expand Up @@ -88,7 +88,7 @@ func Run(c *Config) error {
defer wg.Done()
for {
time.Sleep(c.EventMetricsDumpFrequency)
event.PrintLatencyReport()
types.PrintLatencyReport()
}
}()
}
Expand Down
52 changes: 25 additions & 27 deletions resource-management/pkg/aggregrator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (

distributor "global-resource-service/resource-management/pkg/common-lib/interfaces/distributor"
"global-resource-service/resource-management/pkg/common-lib/metrics"
"global-resource-service/resource-management/pkg/common-lib/serializer"
"global-resource-service/resource-management/pkg/common-lib/serializer/protobuf"
"global-resource-service/resource-management/pkg/common-lib/types"
"global-resource-service/resource-management/pkg/common-lib/types/event"
)

type Aggregator struct {
urls []string
EventProcessor distributor.Interface
serializer serializer.Serializer
}

// To be client of Resource Region Manager
Expand All @@ -28,14 +30,6 @@ type ClientOfRRM struct {
HTTPClient *http.Client
}

// RRM: Resource Region Manager
//
type ResponseFromRRM struct {
RegionNodeEvents [][]*event.NodeEvent
RvMap types.TransitResourceVersionMap
Length uint64
}

// RRM: Resource Region Manager
//
type PullDataFromRRM struct {
Expand All @@ -55,6 +49,7 @@ func NewAggregator(urls []string, EventProcessor distributor.Interface) *Aggrega
return &Aggregator{
urls: urls,
EventProcessor: EventProcessor,
serializer: protobuf.NewSerializer("foo"),
}
}

Expand All @@ -77,7 +72,7 @@ func (a *Aggregator) Run() (err error) {
}()

var crv types.TransitResourceVersionMap
var regionNodeEvents [][]*event.NodeEvent
var regionNodeEvents []types.RpNodeEvents
var length uint64
var eventProcess bool

Expand All @@ -86,20 +81,20 @@ func (a *Aggregator) Run() (err error) {

klog.V(3).Infof("Starting loop pulling nodes from region: %v", a.urls[i])
for {
time.Sleep(100 * time.Millisecond)

// Call the Pull methods
// when composite RV is nil, the method initPull is called;
// otherwise the method subsequentPull is called.
// To simplify the codes, we use one method initPullOrSubsequentPull instead
pullStarts := time.Now()
regionNodeEvents, length = a.initPullOrSubsequentPull(c, DefaultBatchLength, crv)
if length != 0 {
klog.V(4).Infof("Total (%v) region node events are pulled successfully in (%v) RPs", length, len(regionNodeEvents))
pullEnds := time.Now()
klog.V(4).Infof("Total (%v) region node events are pulled successfully in (%v) RPs. pull duration %v", length, len(regionNodeEvents), pullEnds.Sub(pullStarts))

// Convert 2D array to 1D array
minRecordNodeEvents := make([]*event.NodeEvent, 0, length)
minRecordNodeEvents := make([]*types.NodeEvent, 0, length)
for j := 0; j < len(regionNodeEvents); j++ {
minRecordNodeEvents = append(minRecordNodeEvents, regionNodeEvents[j]...)
minRecordNodeEvents = append(minRecordNodeEvents, regionNodeEvents[j].NodeEvents...)
}
klog.V(6).Infof("Total (%v) mini node events are converted successfully with length (%v)", len(minRecordNodeEvents), length)

Expand All @@ -118,6 +113,9 @@ func (a *Aggregator) Run() (err error) {
if eventProcess {
a.postCRV(c, crv)
}
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, there will be a comparison between actual data size and batch size. If data size == batch size, no need to wait for subsequent pull. Wait a bit if data size < batch size. Without waiting, even a single event can cause immediate subsequent pull. This seems a bit misaligned with 100ms for empty pull.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. will fix.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently the batch size is not used, so comparing with expected ( batch size ) and the actually got data ( the length ) won't help here for now.

since the goal is to avoid waitless pull()s from the aggregator to avoid busy cpu spins, we will check the durations of pull() and/or the processEvent() and make adjustment of wait time here.

// only wait for empty pulls
time.Sleep(100 * time.Millisecond)
}
}
}(i)
Expand All @@ -142,7 +140,7 @@ func (a *Aggregator) createClient(url string) *ClientOfRRM {
// or
// Call the resource region manager's SubsequentPull method {url}/resources/subsequentpull when crv is not nil
//
func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64, crv types.TransitResourceVersionMap) ([][]*event.NodeEvent, uint64) {
func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64, crv types.TransitResourceVersionMap) ([]types.RpNodeEvents, uint64) {
var path string

if len(crv) == 0 {
Expand All @@ -152,7 +150,7 @@ func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64
}

bytes, _ := json.Marshal(PullDataFromRRM{BatchLength: batchLength, CRV: crv.Copy()})
req, err := http.NewRequest(http.MethodGet, path, strings.NewReader((string(bytes))))
req, err := http.NewRequest(http.MethodGet, path, strings.NewReader(string(bytes)))
if err != nil {
klog.Errorf(err.Error())
}
Expand All @@ -172,21 +170,21 @@ func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64
return nil, 0
}

var ResponseObject ResponseFromRRM
err = json.Unmarshal(bodyBytes, &ResponseObject)
if err != nil {
klog.Errorf("Error from JSON Unmarshal:", err)
var ResponseObject types.ResponseFromRRM
_, err1 := a.serializer.Decode(bodyBytes, &ResponseObject)
if err1 != nil {
klog.Errorf("Error decode response body:", err)
return nil, 0
}

// log out node ids for debugging some prolonged node transitions
if klog.V(9).Enabled() {
for rp, rpNodes := range ResponseObject.RegionNodeEvents {
if len(rpNodes) == 0 {
if len(rpNodes.NodeEvents) == 0 {
continue
}
buf := make([]string, len(rpNodes))
for i, node := range rpNodes {
buf := make([]string, len(rpNodes.NodeEvents))
for i, node := range rpNodes.NodeEvents {
buf[i] = node.Node.Id
}

Expand All @@ -196,9 +194,9 @@ func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64

if metrics.ResourceManagementMeasurement_Enabled {
for i := 0; i < len(ResponseObject.RegionNodeEvents); i++ {
for j := 0; j < len(ResponseObject.RegionNodeEvents[i]); j++ {
if ResponseObject.RegionNodeEvents[i][j] != nil {
ResponseObject.RegionNodeEvents[i][j].SetCheckpoint(metrics.Aggregator_Received)
for j := 0; j < len(ResponseObject.RegionNodeEvents[i].NodeEvents); j++ {
if ResponseObject.RegionNodeEvents[i].NodeEvents[j] != nil {
ResponseObject.RegionNodeEvents[i].NodeEvents[j].SetCheckpoint(metrics.Aggregator_Received)
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions resource-management/pkg/clientSdk/rest/watch/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"

"global-resource-service/resource-management/pkg/common-lib/types"
"global-resource-service/resource-management/pkg/common-lib/types/event"
)

// Decoder implements the watch.Decoder interface for io.ReadClosers that
Expand All @@ -41,15 +40,15 @@ func NewDecoder(decoder *json.Decoder) *Decoder {

// Decode blocks until it can return the next object in the reader. Returns an error
// if the reader is closed or an object can't be decoded.
func (d *Decoder) Decode() (event.EventType, *types.LogicalNode, error) {
var got event.NodeEvent
func (d *Decoder) Decode() (types.EventType, *types.LogicalNode, error) {
var got types.NodeEvent
err := d.decoder.Decode(&got)
if err != nil {
return "", nil, err
}

switch got.Type {
case event.Added, event.Modified, event.Deleted, event.Error, event.Bookmark:
case types.Added, types.Modified, types.Deleted, types.Error, types.Bookmark:
default:
return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
}
Expand Down
2 changes: 1 addition & 1 deletion resource-management/pkg/clientSdk/rmsclient/rmsClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions resource-management/pkg/clientSdk/watch/streamwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package watch

import (
"fmt"
"global-resource-service/resource-management/pkg/common-lib/types/event"
"io"
"k8s.io/klog/v2"
"sync"
Expand All @@ -34,7 +33,7 @@ type Decoder interface {
// Decode should return the type of event, the decoded object, or an error.
// An error will cause StreamWatcher to call Close(). Decode should block until
// it has data or an error occurs.
Decode() (action event.EventType, object *types.LogicalNode, err error)
Decode() (action types.EventType, object *types.LogicalNode, err error)

// Close should close the underlying io.Reader, signalling to the source of
// the stream that it is no longer being watched. Close() must cause any
Expand All @@ -55,7 +54,7 @@ type StreamWatcher struct {
sync.Mutex
source Decoder
reporter Reporter
result chan event.NodeEvent
result chan types.NodeEvent
stopped bool
}

Expand All @@ -67,14 +66,14 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan event.NodeEvent),
result: make(chan types.NodeEvent),
}
go sw.receive()
return sw
}

// ResultChan implements Interface.
func (sw *StreamWatcher) ResultChan() <-chan event.NodeEvent {
func (sw *StreamWatcher) ResultChan() <-chan types.NodeEvent {
return sw.result
}

Expand Down Expand Up @@ -117,15 +116,15 @@ func (sw *StreamWatcher) receive() {
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
} else {
sw.result <- event.NodeEvent{
Type: event.Error,
sw.result <- types.NodeEvent{
Type: types.Error,
Node: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
}
}
}
return
}
sw.result <- event.NodeEvent{
sw.result <- types.NodeEvent{
Type: action,
Node: obj,
}
Expand Down
12 changes: 6 additions & 6 deletions resource-management/pkg/clientSdk/watch/watch.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package watch

import (
"global-resource-service/resource-management/pkg/common-lib/types/event"
"global-resource-service/resource-management/pkg/common-lib/types"
)

// Interface can be implemented by anything that knows how to watch and report changes.
Expand All @@ -13,16 +13,16 @@ type Interface interface {
// ResultChan returns a chan which will receive all the events. If an error occurs
// or Stop() is called, the implementation will close this channel and
// release any resources used by the watch.
ResultChan() <-chan event.NodeEvent
ResultChan() <-chan types.NodeEvent
}

type emptyWatch chan event.NodeEvent
type emptyWatch chan types.NodeEvent

// NewEmptyWatch returns a watch interface that returns no results and is closed.
// May be used in certain error conditions where no information is available but
// an error is not warranted.
func NewEmptyWatch() Interface {
ch := make(chan event.NodeEvent)
ch := make(chan types.NodeEvent)
close(ch)
return emptyWatch(ch)
}
Expand All @@ -32,6 +32,6 @@ func (w emptyWatch) Stop() {
}

// ResultChan implements Interface
func (w emptyWatch) ResultChan() <-chan event.NodeEvent {
return chan event.NodeEvent(w)
func (w emptyWatch) ResultChan() <-chan types.NodeEvent {
return chan types.NodeEvent(w)
}
Loading