diff --git a/pkg/collectconfig/executor/consumer_log_sub_analysis.go b/pkg/collectconfig/executor/consumer_log_sub_analysis.go index f613b29..50234df 100644 --- a/pkg/collectconfig/executor/consumer_log_sub_analysis.go +++ b/pkg/collectconfig/executor/consumer_log_sub_analysis.go @@ -139,8 +139,8 @@ func (c *logAnalysisSubConsumer) ProcessGroup(iw *inputWrapper, ctx *LogContext, func (c *logAnalysisSubConsumer) Emit(expectedTs int64) bool { var state *logAnalysisSubConsumerState - c.parent.timeline.View(func(timeline *storage.Timeline) { - shard := c.parent.timeline.GetShard(expectedTs) + c.parent.timeline.Update(func(timeline *storage.Timeline) { + shard := c.parent.timeline.GetShard(expectedTs, true) if shard == nil { return } diff --git a/pkg/collectconfig/executor/consumer_log_sub_stat.go b/pkg/collectconfig/executor/consumer_log_sub_stat.go index b483a08..dddbe2f 100644 --- a/pkg/collectconfig/executor/consumer_log_sub_stat.go +++ b/pkg/collectconfig/executor/consumer_log_sub_stat.go @@ -145,14 +145,13 @@ func (c *logStatSubConsumer) ProcessGroup(iw *inputWrapper, ctx *LogContext, max } func (c *logStatSubConsumer) Emit(expectedTs int64) bool { - // TODO 取走数据后给shard打一个标记, 表示已经取走数据了 // 下次如果还往该shard写数据, 这这些数据是旧的 // TODO 我们的case里是可以幂等写的!!! var datum []*model.DetailData - c.parent.timeline.View(func(timeline *storage.Timeline) { - shard := timeline.GetShard(expectedTs) + c.parent.timeline.Update(func(timeline *storage.Timeline) { + shard := timeline.GetShard(expectedTs, true) if shard == nil { logger.Infoz("[consumer] [log] emit nil", // zap.String("key", c.parent.key), // diff --git a/pkg/collectconfig/executor/storage/storage.go b/pkg/collectconfig/executor/storage/storage.go index 661b0d3..04c3940 100644 --- a/pkg/collectconfig/executor/storage/storage.go +++ b/pkg/collectconfig/executor/storage/storage.go @@ -78,6 +78,10 @@ func (s *Storage) GetTimeline(key string) *Timeline { return s.timelines[key] } +func (s *Storage) InternalGetTimeline() map[string]*Timeline { + return s.timelines +} + func (s *Storage) SetTimeline(key string, t *Timeline) { s.timelines[key] = t } @@ -154,7 +158,7 @@ func (t *Timeline) Unlock() { } func (t *Timeline) GetOrCreateShard(ts int64) *Shard { - shard := t.GetShard(ts) + shard := t.GetShard(ts, false) if shard == nil { shard = t.CreateShard(ts) } @@ -165,7 +169,7 @@ func (t *Timeline) InternalGetShard() []*Shard { return t.shards } -func (t *Timeline) GetShard(ts int64) *Shard { +func (t *Timeline) GetShard(ts int64, clear bool) *Shard { if t.shards == nil { return nil } @@ -178,6 +182,12 @@ func (t *Timeline) GetShard(ts int64) *Shard { if s.no != no { return nil } + // release memory quickly + if clear { + s.points = nil + s.Data = nil + s.Data2 = nil + } return s } diff --git a/pkg/pipeline/manager.go b/pkg/pipeline/manager.go index 3c40f97..f606b55 100644 --- a/pkg/pipeline/manager.go +++ b/pkg/pipeline/manager.go @@ -29,6 +29,7 @@ import ( "github.com/traas-stack/holoinsight-agent/pkg/util" "github.com/traas-stack/holoinsight-agent/pkg/util/recoverutils" "go.uber.org/zap" + "net/http" "runtime" "strings" "sync" @@ -110,6 +111,33 @@ func (m *Manager) Start() { } m.ctm.Listen(m.listener) + + http.HandleFunc("/api/storage/list", func(writer http.ResponseWriter, request *http.Request) { + m.s.View(func(s *storage.Storage) { + timelines := s.InternalGetTimeline() + for key, timeline := range timelines { + bytes := 0 + points := 0 + timeline.View(func(timeline *storage.Timeline) { + shards := timeline.InternalGetShard() + for _, shard := range shards { + if shard == nil { + continue + } + points2 := shard.InternalGetAllPoints() + points += len(points2) + for pkey, p := range points2 { + bytes += len(pkey) + for _, key := range p.Keys { + bytes += len(key) + } + } + } + }) + fmt.Fprintf(writer, "%s %d %d\n", key, points, bytes) + } + }) + }) } func (m *Manager) processTask(task *collecttask.CollectTask, add bool, init bool) { diff --git a/scripts/api/pprof/heap b/scripts/api/pprof/heap index 45e1ce1..8dcfdda 100644 --- a/scripts/api/pprof/heap +++ b/scripts/api/pprof/heap @@ -1 +1,6 @@ -curl localhost:9117/debug/pprof/heap > heap +curl localhost:9117/debug/pprof/heap > /tmp/heap +echo + +echo Use the following command to download the heap and view the flame graph. +echo kubectl -n $POD_NAMESPACE cp $POD_NAME:/tmp/heap /tmp/heap +echo go tool pprof -http=":$RANDOM" /tmp/heap diff --git a/scripts/api/pprof/profile b/scripts/api/pprof/profile index 5aaf71f..e8ed264 100644 --- a/scripts/api/pprof/profile +++ b/scripts/api/pprof/profile @@ -1,6 +1,6 @@ -curl localhost:9117/debug/pprof/profile?seconds=60 > profile +curl localhost:9117/debug/pprof/profile?seconds=60 > /tmp/profile echo echo Use the following command to download the profile and view the flame graph. -echo kubectl -n $POD_NAMESPACE cp $POD_NAME:$PWD/profile ./profile -echo go tool pprof -http=":$RANDOM" profile +echo kubectl -n $POD_NAMESPACE cp $POD_NAME:/tmp/profile /tmp/profile +echo go tool pprof -http=":$RANDOM" /tmp/profile