Skip to content

feat(collect-agent ): UT for collector-agent #536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion collector-agent/agent/AgentRouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type TSpan struct {
ServerType int32 `json:"stp,string"`
TransactionId string `json:"tid"`
Uri string `json:"uri"`
UT string `json:"UT,omitempty"`
EndPoint string `json:"server"`
RemoteAddr string `json:"client"`
AcceptorHost string `json:"Ah"`
Expand All @@ -100,6 +101,35 @@ type TSpan struct {
ApacheHeader string `json:"AP,omitempty"`
}

func (span *TSpan) IsFailed() bool {
if span.ErrorInfo != nil || len(span.ExceptionInfo) > 0 {
return true
}
return false
}

//note
// FindHistogramLevel must come with histogramSize
func (span *TSpan) FindHistogramLevel() int {
if span.GetElapsedTime() <= 100 {
return 0
} else if span.GetElapsedTime() <= 300 {
return 1
} else if span.GetElapsedTime() <= 500 {
return 2
} else if span.GetElapsedTime() <= 1000 {
return 3
} else if span.GetElapsedTime() <= 3000 {
return 4
} else if span.GetElapsedTime() <= 5000 {
return 5
} else if span.GetElapsedTime() <= 8000 {
return 6
} else {
return 7
}
}

func (span *TSpan) GetAppServerType() int32 {
if span.AppServerTypeV2 != 0 {
return span.AppServerTypeV2
Expand Down Expand Up @@ -237,7 +267,7 @@ func (manager *AgentRouter) DispatchPacket(packet *RawPacket) error {
log.Debug("Read-lock is release")
}

agent.CheckValid(appname, serverType) // CA just checking the name and ft
agent.CheckValid(span)
agent.SendSpan(span)
return nil

Expand Down
67 changes: 51 additions & 16 deletions collector-agent/agent/GrpcAgent.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type GrpcAgent struct {
spanSender SpanSender
AgentOnLine bool
requestCounter RequestProfiler
utReport *UrlTemplateReport
tasksGroup sync.WaitGroup
tSpanCh chan *TSpan
ExitCh chan bool
Expand Down Expand Up @@ -64,6 +65,9 @@ func (agent *GrpcAgent) Interceptor(_ *TSpan) bool {
if !agent.AgentOnLine {
agent.log.Debugf("span dropped,as agent offline")
}

//note log url templated

return agent.AgentOnLine
}

Expand Down Expand Up @@ -171,6 +175,9 @@ func (agent *GrpcAgent) registerFilter() {
agent.log.Debug("register requestCounter filter")
agent.AddFilter(&agent.requestCounter)

// req UrlTemplateReport
agent.log.Debug("register UrlTemplate Report filter")
agent.AddFilter(agent.utReport)
// send span
agent.log.Debug("register spanSender filter")
agent.AddFilter(&agent.spanSender)
Expand Down Expand Up @@ -206,20 +213,45 @@ func (agent *GrpcAgent) sendStat() {
return
}

for {
msg := CollectPStateMessage(agent.requestCounter.GetMaxAvg, agent.requestCounter.GetReqTimeProfiler)
// todo send agentstat
var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
msg := CollectPStateMessage(agent.requestCounter.GetMaxAvg, agent.requestCounter.GetReqTimeProfiler)

agent.log.Infof("%v", msg)
if err := stream.Send(msg); err != nil {
agent.log.Warn(err)
return
}
//config.StatInterval
if common.WaitChannelEvent(agent.ExitCh, 0) == common.E_AGENT_STOPPING {
return
agent.log.Debugf("%v", msg)
if err := stream.Send(msg); err != nil {
agent.log.Warn(err)
break
}
//config.StatInterval
if common.WaitChannelEvent(agent.ExitCh, 0) == common.E_AGENT_STOPPING {
break
}
}
}
wg.Done()
}()
// wg.Add(1)
// todo send uri templated
wg.Add(1)
go func() {
for {
msg := agent.utReport.MoveUtReprot()

agent.log.Debugf("%v", msg)
if err := stream.Send(msg); err != nil {
agent.log.Warn(err)
break
}
//config.StatInterval
if common.WaitChannelEvent(agent.ExitCh, 30) == common.E_AGENT_STOPPING {
break
}
}
wg.Done()
}()
wg.Wait()
}

func (agent *GrpcAgent) uploadStatInfo() {
Expand Down Expand Up @@ -258,6 +290,8 @@ func (agent *GrpcAgent) Init(id, _name string, _type int32, StartTime string) {
"socketid": pingIdStr,
})

agent.utReport = CreateUrlTemplateReport()

config := common.GetConfig()

agent.tSpanCh = make(chan *TSpan, config.AgentChannelSize)
Expand Down Expand Up @@ -308,15 +342,15 @@ func (agent *GrpcAgent) collectorActiveThreadCount(conn *grpc.ClientConn, respon
res.TimeStamp = time.Now().Unix()
res.HistogramSchemaType = 2

agent.log.Infof("try to send PCmdActiveThreadCountRes:%v", res)
agent.log.Debugf("try to send PCmdActiveThreadCountRes:%v", res)

if err := activeThreadCountClient.Send(&res); err != nil {
agent.log.Infof("collectorActiveThreadCount:responseId:%d end with:%s", responseId, err)
agent.log.Warnf("collectorActiveThreadCount:responseId:%d end with:%s", responseId, err)
break
}

if common.WaitChannelEvent(agent.ExitCh, interval) == common.E_AGENT_STOPPING {
agent.log.Info("catch exit during send collectorActiveThreadCount")
agent.log.Warnf("catch exit during send collectorActiveThreadCount")
break
}
}
Expand Down Expand Up @@ -351,6 +385,7 @@ func (agent *GrpcAgent) handleCommand(conn *grpc.ClientConn, wg *sync.WaitGroup)
//config.AgentReTryTimeout
ctx, _ := common.BuildPinpointCtx(-1, agent.pingMd)

//todo update HandleCommand to HandleCommandV2
commandClient, err := client.HandleCommand(ctx)

if err != nil {
Expand Down Expand Up @@ -427,8 +462,8 @@ func (agent *GrpcAgent) consumeJsonSpan() {
}
}

func (agent *GrpcAgent) CheckValid(name string, ft int32) bool {
if name != agent.agentName || ft != agent.agentType {
func (agent *GrpcAgent) CheckValid(span *TSpan) bool {
if span.GetAppname() != agent.agentName || span.GetAppServerType() != agent.agentType {
agent.log.Warn("name or FT not equal")
return false
} else {
Expand Down
5 changes: 3 additions & 2 deletions collector-agent/agent/Stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
type GetMaxAvg func() (max, avg uint32)
type GetReqTimeCounter func() [4]uint16

func CollectPStateMessage(getMacAvr GetMaxAvg, getReqTimeCounter GetReqTimeCounter) *v1.PStatMessage {
func CollectPStateMessage(getMaxAvr GetMaxAvg, getReqTimeCounter GetReqTimeCounter) *v1.PStatMessage {
config := common.GetConfig()
max, avg := getMacAvr()
max, avg := getMaxAvr()
responseTime := v1.PResponseTime{
Max: int64(max),
Avg: int64(avg),
Expand All @@ -35,6 +35,7 @@ func CollectPStateMessage(getMacAvr GetMaxAvg, getReqTimeCounter GetReqTimeCount
JvmGcOldTime: 0,
JvmGcDetailed: &v1.PJvmGcDetailed{},
}
// cpu.Percent calcuate cpu in config.StatInterval
totalPer, _ := cpu.Percent(config.StatInterval*time.Second, false)
totalCpuUsage := totalPer[0] / 100
cpuload := v1.PCpuLoad{
Expand Down
111 changes: 111 additions & 0 deletions collector-agent/agent/UrlTemplate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package agent

import (
"sync"
"time"

v1 "github.com/pinpoint-apm/pinpoint-c-agent/collector-agent/pinpoint-grpc-idl/proto/v1"
)

const bucketVersion = 0
const histogramSize = 8

type uriStatHistogram struct {
Total int64
Max int64
TimestampHistogram [histogramSize]int32
}

func (ust *uriStatHistogram) Update(span *TSpan) {
elapseTime := span.GetElapsedTime()
ust.Total += int64(elapseTime)

if int64(elapseTime) > ust.Max {
ust.Max = int64(elapseTime)
}
ust.TimestampHistogram[span.FindHistogramLevel()] += 1
}

func (ust *uriStatHistogram) ToUriHistogrm() *v1.PUriHistogram {
pbUriHistogram := &v1.PUriHistogram{
Total: ust.Total,
Max: ust.Max,
Histogram: ust.TimestampHistogram[:],
}
return pbUriHistogram
}

type statHisograms struct {
TotalHistogram uriStatHistogram
FailedHistogram uriStatHistogram
}

func (st *statHisograms) Update(span *TSpan) {
st.TotalHistogram.Update(span)
if span.IsFailed() {
st.FailedHistogram.Update(span)
}
}

type UrlTemplateReport struct {
uriMap map[string]*statHisograms
BucketVersion int32
mu sync.Mutex
}

func (utr *UrlTemplateReport) Interceptor(span *TSpan) bool {
if len(span.UT) > 0 {
// found uri templated
utr.updateUriSnapshot(span)
}
return true
}

func (utr *UrlTemplateReport) updateUriSnapshot(span *TSpan) {
utr.mu.Lock()
defer utr.mu.Unlock()
ut := span.UT
var st *statHisograms
var ok bool
if st, ok = utr.uriMap[ut]; !ok {
st = &statHisograms{}
utr.uriMap[ut] = st
}
st.Update(span)
}

func (utr *UrlTemplateReport) MoveUtReprot() *v1.PStatMessage {
utr.mu.Lock()
defer utr.mu.Unlock()

agentUriStat := &v1.PAgentUriStat{
BucketVersion: int32(utr.BucketVersion),
}

for url, st := range utr.uriMap {
eachUriStat := &v1.PEachUriStat{
Uri: url,
TotalHistogram: st.TotalHistogram.ToUriHistogrm(),
FailedHistogram: st.FailedHistogram.ToUriHistogrm(),
Timestamp: time.Now().UnixMilli(),
}
agentUriStat.EachUriStat = append(agentUriStat.EachUriStat, eachUriStat)
}
//note: create a new one
utr.uriMap = make(map[string]*statHisograms)
pbStat := &v1.PStatMessage{
Field: &v1.PStatMessage_AgentUriStat{
AgentUriStat: agentUriStat,
},
}

return pbStat
}

func CreateUrlTemplateReport() *UrlTemplateReport {
ut := &UrlTemplateReport{
uriMap: make(map[string]*statHisograms),
BucketVersion: bucketVersion,
}
return ut
}
62 changes: 62 additions & 0 deletions collector-agent/agent/UrlTemplate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package agent

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestUrlTemplateReport(t *testing.T) {
spans := []TSpan{
{
UT: "/hello",
Uri: "/hello",
ElapsedTime: 32,
},
{
UT: "/hello",
Uri: "/hello",
ElapsedTime: 320,
},
{
UT: "/hello",
Uri: "/hello",
ElapsedTime: 3200,
},
{
UT: "/hello_exp",
Uri: "/hello",
ElapsedTime: 32000,
ExceptionInfo: "exp",
},
}

ut := CreateUrlTemplateReport()
for _, span := range spans {
ut.Interceptor(&span)
}

if len(ut.uriMap) < 2 {
t.Log(len(ut.uriMap))
}

pbStatMessage := ut.MoveUtReprot()
t.Log(pbStatMessage)
assert.NotEqual(t, pbStatMessage.GetAgentUriStat(), nil, "GetAgentUriStat")

pbUriStat := pbStatMessage.GetAgentUriStat()

assert.Equal(t, pbUriStat.GetBucketVersion(), int32(0), "GetBucketVersion")

eachUriStat := pbUriStat.GetEachUriStat()

assert.Equal(t, len(eachUriStat), 2, "len(eachUriStat)")

assert.NotEqual(t, eachUriStat[0].GetFailedHistogram(), nil, "GetFailedHistogram")
assert.NotEqual(t, eachUriStat[0].GetTotalHistogram(), nil, "GetTotalHistogram")
totalHis := eachUriStat[0].GetTotalHistogram()
assert.Equal(t, len(totalHis.GetHistogram()), histogramSize, "len(totalHis.GetHistogram())")

assert.Equal(t, totalHis.Max, int64(3200), "totalHis.Max")

}
1 change: 1 addition & 0 deletions collector-agent/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/shirou/gopsutil v3.21.2+incompatible
github.com/shirou/gopsutil/v3 v3.21.2
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.6.1
github.com/x-cray/logrus-prefixed-formatter v0.5.2
google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98 // indirect
google.golang.org/grpc v1.36.0
Expand Down
2 changes: 2 additions & 0 deletions collector-agent/start-collector-agent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ export PP_COLLECTOR_AGENT_ISDOCKER=false
# export PP_LOG_DIR=/tmp/
export PP_Log_Level=DEBUG
export PP_ADDRESS=0.0.0.0@9999
export GO_PATH=~/go
export PATH=$PATH:$GO_PATH/bin
make && ./collector-agent
Loading