Skip to content
This repository has been archived by the owner on Aug 31, 2022. It is now read-only.

telemetry client updated to stream structured events #111

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
23 changes: 22 additions & 1 deletion gnmi_server/client_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,31 @@ type Client struct {
// Wait for all sub go routine to finish
w sync.WaitGroup
fatal bool
logLevel int
}

// Syslog level for error
const logLevelError int = 3
const logLevelDebug int = 7
const logLevelMax int = logLevelDebug

// NewClient returns a new initialized client.
func NewClient(addr net.Addr) *Client {
pq := queue.NewPriorityQueue(1, false)
return &Client{
addr: addr,
q: pq,
logLevel: logLevelError,
}
}

func (c *Client) setLogLevel(lvl int) {
if (lvl >= 0) {
if lvl < logLevelMax {
c.logLevel = lvl
} else {
c.logLevel = logLevelMax
}
}
}

Expand Down Expand Up @@ -121,8 +138,12 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {
}
var dc sdc.Client

mode := c.subscribe.GetMode()

if target == "OTHERS" {
dc, err = sdc.NewNonDbClient(paths, prefix)
} else if ((target == "EVENTS") && (mode == gnmipb.SubscriptionList_STREAM)) {
dc, err = sdc.NewEventClient(paths, prefix, c.logLevel)
} else if _, ok, _, _ := sdc.IsTargetDb(target); ok {
dc, err = sdc.NewDbClient(paths, prefix)
} else {
Expand All @@ -134,7 +155,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {
return grpc.Errorf(codes.NotFound, "%v", err)
}

switch mode := c.subscribe.GetMode(); mode {
switch mode {
case gnmipb.SubscriptionList_STREAM:
c.stop = make(chan struct{}, 1)
c.w.Add(1)
Expand Down
3 changes: 3 additions & 0 deletions gnmi_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Config struct {
// Port for the Server to listen on. If 0 or unset the Server will pick a port
// for this Server.
Port int64
LogLevel int
UserAuth AuthTypes
}

Expand Down Expand Up @@ -233,6 +234,8 @@ func (s *Server) Subscribe(stream gnmipb.GNMI_SubscribeServer) error {

c := NewClient(pr.Addr)

c.setLogLevel(s.config.LogLevel)

s.cMu.Lock()
if oc, ok := s.clients[c.String()]; ok {
log.V(2).Infof("Delete duplicate client %s", oc)
Expand Down
195 changes: 195 additions & 0 deletions sonic_data_client/events_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package client

/*
#cgo CFLAGS: -g -Wall -I/sonic/src/sonic-swss-common/common -Wformat -Werror=format-security -fPIE
#cgo LDFLAGS: -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -lswsscommon
#include <stdlib.h>
#include "events_wrap.h"
renukamanavalan marked this conversation as resolved.
Show resolved Hide resolved
*/
import "C"

import (
"encoding/json"
"fmt"
"sync"
"time"
"unsafe"

spb "github.com/Azure/sonic-telemetry/proto"
"github.com/Workiva/go-datastructures/queue"
log "github.com/golang/glog"
gnmipb "github.com/openconfig/gnmi/proto/gnmi"
)

type EventClient struct {

prefix *gnmipb.Path
path *gnmipb.Path

q *queue.PriorityQueue
channel chan struct{}

wg *sync.WaitGroup // wait for all sub go routines to finish

subs_handle unsafe.Pointer

stopped int
}

const SUBSCRIBER_TIMEOUT = (2 * 1000) // 2 seconds
const HEARTBEAT_TIMEOUT = 2
const EVENT_BUFFSZ = 4096
const MISSED_BUFFSZ = 16

func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Client, error) {
var evtc EventClient
evtc.prefix = prefix
for _, path := range paths {
// Only one path is expected. Take the last if many
evtc.path = path
}
C.swssSetLogPriority(C.int(logLevel))

/* Init subscriber with 2 seconds time out */
subs_data := make(map[string]interface{})
subs_data["recv_timeout"] = SUBSCRIBER_TIMEOUT
j, err := json.Marshal(subs_data)
if err != nil {
log.V(3).Infof("events_init_subscriber: Failed to marshal")
return nil, err
}
js := string(j)
evtc.subs_handle = C.events_init_subscriber_wrap(C.CString(js))
evtc.stopped = 0

log.V(7).Infof("NewEventClient constructed. logLevel=%d", logLevel)

return &evtc, nil
}

// String returns the target the client is querying.
func (evtc *EventClient) String() string {
return fmt.Sprintf("EventClient Prefix %v", evtc.prefix.GetTarget())
}


func get_events(evtc *EventClient, updateChannel chan string) {

evt_ptr := C.malloc(C.sizeof_char * EVENT_BUFFSZ)
missed_ptr := C.malloc(C.sizeof_char * MISSED_BUFFSZ)

defer C.free(unsafe.Pointer(evt_ptr))
defer C.free(unsafe.Pointer(missed_ptr))

c_eptr := (*C.char)(unsafe.Pointer(evt_ptr))
c_mptr := (*C.char)(unsafe.Pointer(missed_ptr))

for {
rc := C.event_receive_wrap(evtc.subs_handle, c_eptr, EVENT_BUFFSZ, c_mptr, MISSED_BUFFSZ)
log.V(7).Infof("C.event_receive_wrap rc=%d evt:%s", rc, (*C.char)(evt_ptr))

if rc != 0 {
updateChannel <- C.GoString((*C.char)(evt_ptr))
}
if evtc.stopped == 1 {
log.V(1).Infof("%v stop channel closed, exiting get_events routine", evtc)
C.events_deinit_subscriber_wrap(evtc.subs_handle)
evtc.subs_handle = nil
return
}
// TODO: Record missed count in stats table.
// intVar, err := strconv.Atoi(C.GoString((*C.char)(c_mptr)))
}
}


func send_event(evtc *EventClient, tv *gnmipb.TypedValue) error {
spbv := &spb.Value{
Prefix: evtc.prefix,
Path: evtc.path,
Timestamp: time.Now().UnixNano(),
Val: tv,
}

log.V(7).Infof("Sending spbv")
if err := evtc.q.Put(Value{spbv}); err != nil {
log.V(3).Infof("Queue error: %v", err)
return err
}
log.V(7).Infof("send_event done")
return nil
}

func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) {
hbData := make(map[string]interface{})
hbData["heart"] = "beat"
hbVal, _ := json.Marshal(hbData)

hbTv := &gnmipb.TypedValue {
Value: &gnmipb.TypedValue_JsonIetfVal{
JsonIetfVal: hbVal,
}}


evtc.wg = wg
defer evtc.wg.Done()

evtc.q = q
evtc.channel = stop

updateChannel := make(chan string)
go get_events(evtc, updateChannel)

for {
select {
case nextEvent := <-updateChannel:
log.V(7).Infof("update received: %v", nextEvent)
evtTv := &gnmipb.TypedValue {
Value: &gnmipb.TypedValue_StringVal {
StringVal: nextEvent,
}}
if err := send_event(evtc, evtTv); err != nil {
return
}

case <-IntervalTicker(time.Second * HEARTBEAT_TIMEOUT):
log.V(7).Infof("Ticker received")
if err := send_event(evtc, hbTv); err != nil {
return
}
case <-evtc.channel:
evtc.stopped = 1
log.V(3).Infof("Channel closed by client")
return
}
}
log.V(3).Infof("Event exiting streamrun")
}


func (evtc *EventClient) Get(wg *sync.WaitGroup) ([]*spb.Value, error) {
return nil, nil
}

func (evtc *EventClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) {
return
}

func (evtc *EventClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) {
return
}


func (evtc *EventClient) Close() error {
return nil
}

func (evtc *EventClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error {
return nil
}
func (evtc *EventClient) Capabilities() []gnmipb.ModelData {
return nil
}

// cgo LDFLAGS: -L/sonic/target/files/bullseye -lxswsscommon -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -lxxeventxx -Wl,-rpath,/sonic/target/files/bullseye

59 changes: 38 additions & 21 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/x509"
"flag"
"io/ioutil"
"strconv"
"time"

log "github.com/golang/glog"
Expand All @@ -16,7 +17,7 @@ import (
)

var (
userAuth = gnmi.AuthTypes{"password": false, "cert": false, "jwt": false}
userAuth = gnmi.AuthTypes{"password": false, "cert": false, "jwt": false}
port = flag.Int("port", -1, "port to listen on")
// Certificate files.
caCert = flag.String("ca_crt", "", "CA certificate for client certificate validation. Optional.")
Expand All @@ -41,12 +42,12 @@ func main() {
defUserAuth = gnmi.AuthTypes{"jwt": false, "password": false, "cert": false}
}

if isFlagPassed("client_auth") {
log.V(1).Infof("client_auth provided")
}else {
log.V(1).Infof("client_auth not provided, using defaults.")
userAuth = defUserAuth
}
if isFlagPassed("client_auth") {
log.V(1).Infof("client_auth provided")
}else {
log.V(1).Infof("client_auth not provided, using defaults.")
userAuth = defUserAuth
}

switch {
case *port <= 0:
Expand All @@ -58,8 +59,14 @@ func main() {

cfg := &gnmi.Config{}
cfg.Port = int64(*port)
cfg.LogLevel = 3
var opts []grpc.ServerOption

if val, err := strconv.Atoi(getflag("v")); err == nil {
cfg.LogLevel = val
log.Errorf("flag: log level %v", cfg.LogLevel)
}

if !*noTLS {
var certificate tls.Certificate
var err error
Expand All @@ -69,13 +76,13 @@ func main() {
log.Exitf("could not load server key pair: %s", err)
}
} else {
switch {
case *serverCert == "":
log.Errorf("serverCert must be set.")
return
case *serverKey == "":
log.Errorf("serverKey must be set.")
return
switch {
case *serverCert == "":
log.Errorf("serverCert must be set.")
return
case *serverKey == "":
log.Errorf("serverKey must be set.")
return
}
certificate, err = tls.LoadX509KeyPair(*serverCert, *serverKey)
if err != nil {
Expand Down Expand Up @@ -144,11 +151,21 @@ func main() {
}

func isFlagPassed(name string) bool {
found := false
flag.Visit(func(f *flag.Flag) {
if f.Name == name {
found = true
}
})
return found
found := false
flag.Visit(func(f *flag.Flag) {
if f.Name == name {
found = true
}
})
return found
}

func getflag(name string) string {
val := ""
flag.VisitAll(func(f *flag.Flag) {
if f.Name == name {
val = f.Value.String()
}
})
return val
}