Skip to content

Commit

Permalink
fix: release memory quickly (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo authored Jan 23, 2024
1 parent 4553f6a commit 794c214
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pkg/collectconfig/executor/consumer_log_sub_analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func (c *logAnalysisSubConsumer) ProcessGroup(iw *inputWrapper, ctx *LogContext,

func (c *logAnalysisSubConsumer) Emit(expectedTs int64) bool {
var state *logAnalysisSubConsumerState
c.parent.timeline.View(func(timeline *storage.Timeline) {
shard := c.parent.timeline.GetShard(expectedTs)
c.parent.timeline.Update(func(timeline *storage.Timeline) {
shard := c.parent.timeline.GetShard(expectedTs, true)
if shard == nil {
return
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/collectconfig/executor/consumer_log_sub_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,13 @@ func (c *logStatSubConsumer) ProcessGroup(iw *inputWrapper, ctx *LogContext, max
}

func (c *logStatSubConsumer) Emit(expectedTs int64) bool {

// TODO 取走数据后给shard打一个标记, 表示已经取走数据了
// 下次如果还往该shard写数据, 这这些数据是旧的
// TODO 我们的case里是可以幂等写的!!!

var datum []*model.DetailData
c.parent.timeline.View(func(timeline *storage.Timeline) {
shard := timeline.GetShard(expectedTs)
c.parent.timeline.Update(func(timeline *storage.Timeline) {
shard := timeline.GetShard(expectedTs, true)
if shard == nil {
logger.Infoz("[consumer] [log] emit nil", //
zap.String("key", c.parent.key), //
Expand Down
14 changes: 12 additions & 2 deletions pkg/collectconfig/executor/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (s *Storage) GetTimeline(key string) *Timeline {
return s.timelines[key]
}

func (s *Storage) InternalGetTimeline() map[string]*Timeline {
return s.timelines
}

func (s *Storage) SetTimeline(key string, t *Timeline) {
s.timelines[key] = t
}
Expand Down Expand Up @@ -154,7 +158,7 @@ func (t *Timeline) Unlock() {
}

func (t *Timeline) GetOrCreateShard(ts int64) *Shard {
shard := t.GetShard(ts)
shard := t.GetShard(ts, false)
if shard == nil {
shard = t.CreateShard(ts)
}
Expand All @@ -165,7 +169,7 @@ func (t *Timeline) InternalGetShard() []*Shard {
return t.shards
}

func (t *Timeline) GetShard(ts int64) *Shard {
func (t *Timeline) GetShard(ts int64, clear bool) *Shard {
if t.shards == nil {
return nil
}
Expand All @@ -178,6 +182,12 @@ func (t *Timeline) GetShard(ts int64) *Shard {
if s.no != no {
return nil
}
// release memory quickly
if clear {
s.points = nil
s.Data = nil
s.Data2 = nil
}
return s
}

Expand Down
28 changes: 28 additions & 0 deletions pkg/pipeline/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/traas-stack/holoinsight-agent/pkg/util"
"github.com/traas-stack/holoinsight-agent/pkg/util/recoverutils"
"go.uber.org/zap"
"net/http"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -110,6 +111,33 @@ func (m *Manager) Start() {
}

m.ctm.Listen(m.listener)

http.HandleFunc("/api/storage/list", func(writer http.ResponseWriter, request *http.Request) {
m.s.View(func(s *storage.Storage) {
timelines := s.InternalGetTimeline()
for key, timeline := range timelines {
bytes := 0
points := 0
timeline.View(func(timeline *storage.Timeline) {
shards := timeline.InternalGetShard()
for _, shard := range shards {
if shard == nil {
continue
}
points2 := shard.InternalGetAllPoints()
points += len(points2)
for pkey, p := range points2 {
bytes += len(pkey)
for _, key := range p.Keys {
bytes += len(key)
}
}
}
})
fmt.Fprintf(writer, "%s %d %d\n", key, points, bytes)
}
})
})
}

func (m *Manager) processTask(task *collecttask.CollectTask, add bool, init bool) {
Expand Down
7 changes: 6 additions & 1 deletion scripts/api/pprof/heap
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
curl localhost:9117/debug/pprof/heap > heap
curl localhost:9117/debug/pprof/heap > /tmp/heap
echo

echo Use the following command to download the heap and view the flame graph.
echo kubectl -n $POD_NAMESPACE cp $POD_NAME:/tmp/heap /tmp/heap
echo go tool pprof -http=":$RANDOM" /tmp/heap
6 changes: 3 additions & 3 deletions scripts/api/pprof/profile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
curl localhost:9117/debug/pprof/profile?seconds=60 > profile
curl localhost:9117/debug/pprof/profile?seconds=60 > /tmp/profile
echo

echo Use the following command to download the profile and view the flame graph.
echo kubectl -n $POD_NAMESPACE cp $POD_NAME:$PWD/profile ./profile
echo go tool pprof -http=":$RANDOM" profile
echo kubectl -n $POD_NAMESPACE cp $POD_NAME:/tmp/profile /tmp/profile
echo go tool pprof -http=":$RANDOM" /tmp/profile

0 comments on commit 794c214

Please sign in to comment.