Skip to content

Commit

Permalink
troubleshooting memLeak
Browse files Browse the repository at this point in the history
  • Loading branch information
splichy committed Nov 9, 2020
1 parent 5247a54 commit 0985175
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 21 deletions.
91 changes: 83 additions & 8 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Table struct {
FlushInterval int
mu sync.Mutex
Sender Sender
TickerChan *chan struct{}
lastUpdate time.Time
// todo add Last Error
}

Expand All @@ -36,7 +38,10 @@ type Collector struct {
mu sync.RWMutex
Count int
FlushInterval int
CleanInterval int
RemoveQueryID bool
Sender Sender
TickerChan *chan struct{}
}

// NewTable - default table constructor
Expand All @@ -50,12 +55,17 @@ func NewTable(name string, sender Sender, count int, interval int) (t *Table) {
}

// NewCollector - default collector constructor
func NewCollector(sender Sender, count int, interval int) (c *Collector) {
func NewCollector(sender Sender, count int, interval int, cleanInterval int, removeQueryID bool) (c *Collector) {
c = new(Collector)
c.Sender = sender
c.Tables = make(map[string]*Table)
c.Count = count
c.FlushInterval = interval
c.CleanInterval = cleanInterval
c.RemoveQueryID = removeQueryID
if cleanInterval > 0 {
c.TickerChan = c.RunTimer()
}
return c
}

Expand Down Expand Up @@ -106,28 +116,76 @@ func (t *Table) GetCount() int {
}

// RunTimer - timer for periodical savings data
func (t *Table) RunTimer() {
ticker := time.NewTicker(time.Millisecond * time.Duration(t.FlushInterval))
func (t *Table) RunTimer() *chan struct{} {
done := make(chan struct{})
go func() {
for range ticker.C {
t.CheckFlush()
ticker := time.NewTicker(time.Millisecond * time.Duration(t.FlushInterval))
defer ticker.Stop()
for {
select {
case <-ticker.C:
t.CheckFlush()
case <-done:
return
}
}
}()
return &done
}

// Add - Adding query to table
func (t *Table) Add(text string) {
t.mu.Lock()
defer t.mu.Unlock()
t.count++
if t.Format != "RowBinary" {
if t.Format == "TabSeparated" {
t.Rows = append(t.Rows, strings.Split(text, "\n")...)
} else {
t.Rows = append(t.Rows, text)
}
if len(t.Rows) >= t.FlushCount {
t.Flush()
}
t.lastUpdate = time.Now()
}

// CleanTable - delete table from map
func (t *Table) CleanTable() {
t.mu.Lock()
close(*t.TickerChan)
t = nil
}

// CleanTables - clean unsused tables
func (c *Collector) CleanTables() {
c.mu.Lock()
defer c.mu.Unlock()
for k, t := range c.Tables {
if t.lastUpdate.Add(time.Duration(c.CleanInterval) * time.Millisecond).Before(time.Now()) {
// table was not updated for CleanInterval - delete that table - otherwise it can cause memLeak
t.CleanTable()
defer delete(c.Tables, k)
}

}
}

// RunTimer - timer for periodical cleaning unused tables
func (c *Collector) RunTimer() *chan struct{} {
done := make(chan struct{})
go func() {
ticker := time.NewTicker(time.Duration(c.CleanInterval) * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.CleanTables()
case <-done:
return
}
}
}()
return &done
}

// Empty - check if all tables are empty
Expand Down Expand Up @@ -203,12 +261,29 @@ func (c *Collector) addTable(name string) *Table {
t.Params = params
t.Format = c.getFormat(query)
c.Tables[name] = t
t.RunTimer()
t.TickerChan = t.RunTimer()
t.lastUpdate = time.Now()
return t
}

// Push - adding query to collector with query params (with query) and rows
func (c *Collector) Push(params string, content string) {
func (c *Collector) Push(paramsIn string, content string) {
// as we are using all params a table key, we have to remove query_id
params := ""
if c.RemoveQueryID {
items := strings.Split(paramsIn, "&")
for _, p := range items {
if !HasPrefix(p, "query_id=") {
//params = strings.ReplaceAll(params, p, "")
params += "&" + p
}
}
if len(params) > 0 {
params = strings.TrimSpace(params[1:])
}
} else {
params = paramsIn
}
c.mu.RLock()
table, ok := c.Tables[params]
if ok {
Expand Down
16 changes: 8 additions & 8 deletions collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,30 @@ var escSelect = url.QueryEscape(qSelect)
var escParamsAndSelect = qParams + "&query=" + escSelect

func BenchmarkCollector_Push(t *testing.B) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
for i := 0; i < 30000; i++ {
c.Push(escTitle, qContent)
}
}

func TestCollector_Push(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
for i := 0; i < 10400; i++ {
c.Push(escTitle, qContent)
}
assert.Equal(t, 400, c.Tables[escTitle].GetCount())
}

func BenchmarkCollector_ParseQuery(b *testing.B) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
c.ParseQuery("", qTitle+" "+qContent)
c.ParseQuery(qParams, qTitle+" "+qContent)
c.ParseQuery("query="+escTitle, qContent)
c.ParseQuery(qParams+"&query="+escTitle, qContent)
}

func TestCollector_ParseQuery(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
var params string
var content string
var insert bool
Expand Down Expand Up @@ -139,20 +139,20 @@ func TestCollector_ParseQuery(t *testing.T) {
}

func TestCollector_separateQuery(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
query, params := c.separateQuery(escParamsAndSelect)
assert.Equal(t, qSelect, query)
assert.Equal(t, qParams, params)
}

func TestTable_getFormat(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
f := c.getFormat(qTitle)
assert.Equal(t, "TabSeparated", f)
}

func TestTable_CheckFlush(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
c.Push(qTitle, qContent)
count := 0
for !c.Tables[qTitle].Empty() {
Expand All @@ -163,7 +163,7 @@ func TestTable_CheckFlush(t *testing.T) {
}

func TestCollector_FlushAll(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
c.Push(qTitle, qContent)
c.FlushAll()
}
4 changes: 3 additions & 1 deletion config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"listen": ":8124",
"flush_count": 10000,
"flush_interval": 1000,
"clean_interval": 0,
"remove_query_id": true,
"dump_check_interval": 300,
"debug": false,
"dump_dir": "dumps",
Expand All @@ -12,4 +14,4 @@
"http://127.0.0.1:8123"
]
}
}
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ require (
github.com/labstack/gommon v0.2.8 // indirect
github.com/mattn/go-colorable v0.1.0 // indirect
github.com/mattn/go-isatty v0.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/nikepan/go-datastructures v1.0.32
github.com/prometheus/client_golang v1.1.0
github.com/stretchr/testify v1.3.0
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 // indirect
golang.org/x/tools v0.0.0-20201105220310-78b158585360 // indirect
)

go 1.13
24 changes: 24 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,43 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 h1:gKMu1Bf6QINDnvyZuTaACm9ofY+PRh+5vFz4oxBZeF8=
github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4/go.mod h1:50wTf68f99/Zt14pr046Tgt3Lp2vLyFZKzbFXTOabXw=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201105220310-78b158585360 h1:/9CzsU8hOpnSUCtem1vfWNgsVeCTgkMdx+VE5YIYxnU=
golang.org/x/tools v0.0.0-20201105220310-78b158585360/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
36 changes: 35 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
"syscall"
"time"

// debug stuff
_ "net/http/pprof"
"runtime"
"runtime/debug"

"github.com/labstack/echo"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
Expand Down Expand Up @@ -65,6 +70,30 @@ func (server *Server) statusHandler(c echo.Context) error {
return c.JSON(200, Status{Status: "ok"})
}

func (server *Server) gcHandler(c echo.Context) error {
runtime.GC()
return c.JSON(200, Status{Status: "GC"})
}

func (server *Server) freeMemHandler(c echo.Context) error {
debug.FreeOSMemory()
return c.JSON(200, Status{Status: "freeMem"})
}

// manual trigger for cleaning tables
func (server *Server) tablesCleanHandler(c echo.Context) error {
log.Printf("DEBUG: clean tables:\n%+v", server.Collector.Tables)
for k, t := range server.Collector.Tables {
log.Printf("DEBUG: check if table is empty: %+v with key:%+v\n", t, k)
if ok := t.Empty(); ok {
log.Printf("DEBUG: delete empty table: %+v with key:%+v\n", t, k)
server.Collector.Tables[k].CleanTable()
defer delete(server.Collector.Tables, k)
}
}
return c.JSON(200, Status{Status: "cleaned empty tables"})
}

// Start - start http server
func (server *Server) Start() error {
return server.echo.Start(server.Listen)
Expand All @@ -81,6 +110,11 @@ func InitServer(listen string, collector *Collector, debug bool) *Server {
server.echo.POST("/", server.writeHandler)
server.echo.GET("/status", server.statusHandler)
server.echo.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
// debug stuff
server.echo.GET("/debug/gc", server.gcHandler)
server.echo.GET("/debug/freemem", server.freeMemHandler)
server.echo.GET("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux))
server.echo.GET("/debug/tables-clean", server.tablesCleanHandler)

return server
}
Expand All @@ -107,7 +141,7 @@ func RunServer(cnf Config) {
sender.AddServer(url)
}

collect := NewCollector(sender, cnf.FlushCount, cnf.FlushInterval)
collect := NewCollector(sender, cnf.FlushCount, cnf.FlushInterval, cnf.CleanInterval, cnf.RemoveQueryID)

// send collected data on SIGTERM and exit
signals := make(chan os.Signal)
Expand Down
6 changes: 3 additions & 3 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestRunServer(t *testing.T) {
collector := NewCollector(&fakeSender{}, 1000, 1000)
collector := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
server := InitServer("", collector, false)
go server.Start()
server.echo.POST("/", server.writeHandler)
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestRunServer(t *testing.T) {

func TestServer_SafeQuit(t *testing.T) {
sender := &fakeSender{}
collect := NewCollector(sender, 1000, 1000)
collect := NewCollector(sender, 1000, 1000, 0, true)
collect.AddTable("test")
collect.Push("sss", "sss")

Expand Down Expand Up @@ -96,7 +96,7 @@ func TestServer_MultiServer(t *testing.T) {
sender := NewClickhouse(10, 10)
sender.AddServer(s1.URL)
sender.AddServer(s2.URL)
collect := NewCollector(sender, 1000, 1000)
collect := NewCollector(sender, 1000, 1000, 0, true)
collect.AddTable("test")
collect.Push("eee", "eee")
collect.Push("fff", "fff")
Expand Down
Loading

0 comments on commit 0985175

Please sign in to comment.