diff --git a/collector.go b/collector.go index 4c05f9a..1c3d7d0 100644 --- a/collector.go +++ b/collector.go @@ -27,6 +27,8 @@ type Table struct { FlushInterval int mu sync.Mutex Sender Sender + TickerChan *chan struct{} + lastUpdate time.Time // todo add Last Error } @@ -36,7 +38,10 @@ type Collector struct { mu sync.RWMutex Count int FlushInterval int + CleanInterval int + RemoveQueryID bool Sender Sender + TickerChan *chan struct{} } // NewTable - default table constructor @@ -50,12 +55,17 @@ func NewTable(name string, sender Sender, count int, interval int) (t *Table) { } // NewCollector - default collector constructor -func NewCollector(sender Sender, count int, interval int) (c *Collector) { +func NewCollector(sender Sender, count int, interval int, cleanInterval int, removeQueryID bool) (c *Collector) { c = new(Collector) c.Sender = sender c.Tables = make(map[string]*Table) c.Count = count c.FlushInterval = interval + c.CleanInterval = cleanInterval + c.RemoveQueryID = removeQueryID + if cleanInterval > 0 { + c.TickerChan = c.RunTimer() + } return c } @@ -106,13 +116,21 @@ func (t *Table) GetCount() int { } // RunTimer - timer for periodical savings data -func (t *Table) RunTimer() { - ticker := time.NewTicker(time.Millisecond * time.Duration(t.FlushInterval)) +func (t *Table) RunTimer() *chan struct{} { + done := make(chan struct{}) go func() { - for range ticker.C { - t.CheckFlush() + ticker := time.NewTicker(time.Millisecond * time.Duration(t.FlushInterval)) + defer ticker.Stop() + for { + select { + case <-ticker.C: + t.CheckFlush() + case <-done: + return + } } }() + return &done } // Add - Adding query to table @@ -120,7 +138,7 @@ func (t *Table) Add(text string) { t.mu.Lock() defer t.mu.Unlock() t.count++ - if t.Format != "RowBinary" { + if t.Format == "TabSeparated" { t.Rows = append(t.Rows, strings.Split(text, "\n")...) } else { t.Rows = append(t.Rows, text) @@ -128,6 +146,46 @@ func (t *Table) Add(text string) { if len(t.Rows) >= t.FlushCount { t.Flush() } + t.lastUpdate = time.Now() +} + +// CleanTable - delete table from map +func (t *Table) CleanTable() { + t.mu.Lock() + close(*t.TickerChan) + t = nil +} + +// CleanTables - clean unsused tables +func (c *Collector) CleanTables() { + c.mu.Lock() + defer c.mu.Unlock() + for k, t := range c.Tables { + if t.lastUpdate.Add(time.Duration(c.CleanInterval) * time.Millisecond).Before(time.Now()) { + // table was not updated for CleanInterval - delete that table - otherwise it can cause memLeak + t.CleanTable() + defer delete(c.Tables, k) + } + + } +} + +// RunTimer - timer for periodical cleaning unused tables +func (c *Collector) RunTimer() *chan struct{} { + done := make(chan struct{}) + go func() { + ticker := time.NewTicker(time.Duration(c.CleanInterval) * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + c.CleanTables() + case <-done: + return + } + } + }() + return &done } // Empty - check if all tables are empty @@ -203,12 +261,29 @@ func (c *Collector) addTable(name string) *Table { t.Params = params t.Format = c.getFormat(query) c.Tables[name] = t - t.RunTimer() + t.TickerChan = t.RunTimer() + t.lastUpdate = time.Now() return t } // Push - adding query to collector with query params (with query) and rows -func (c *Collector) Push(params string, content string) { +func (c *Collector) Push(paramsIn string, content string) { + // as we are using all params a table key, we have to remove query_id + params := "" + if c.RemoveQueryID { + items := strings.Split(paramsIn, "&") + for _, p := range items { + if !HasPrefix(p, "query_id=") { + //params = strings.ReplaceAll(params, p, "") + params += "&" + p + } + } + if len(params) > 0 { + params = strings.TrimSpace(params[1:]) + } + } else { + params = paramsIn + } c.mu.RLock() table, ok := c.Tables[params] if ok { diff --git a/collector_test.go b/collector_test.go index c19200a..2910558 100644 --- a/collector_test.go +++ b/collector_test.go @@ -30,14 +30,14 @@ var escSelect = url.QueryEscape(qSelect) var escParamsAndSelect = qParams + "&query=" + escSelect func BenchmarkCollector_Push(t *testing.B) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) for i := 0; i < 30000; i++ { c.Push(escTitle, qContent) } } func TestCollector_Push(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) for i := 0; i < 10400; i++ { c.Push(escTitle, qContent) } @@ -45,7 +45,7 @@ func TestCollector_Push(t *testing.T) { } func BenchmarkCollector_ParseQuery(b *testing.B) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) c.ParseQuery("", qTitle+" "+qContent) c.ParseQuery(qParams, qTitle+" "+qContent) c.ParseQuery("query="+escTitle, qContent) @@ -53,7 +53,7 @@ func BenchmarkCollector_ParseQuery(b *testing.B) { } func TestCollector_ParseQuery(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) var params string var content string var insert bool @@ -139,20 +139,20 @@ func TestCollector_ParseQuery(t *testing.T) { } func TestCollector_separateQuery(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) query, params := c.separateQuery(escParamsAndSelect) assert.Equal(t, qSelect, query) assert.Equal(t, qParams, params) } func TestTable_getFormat(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) f := c.getFormat(qTitle) assert.Equal(t, "TabSeparated", f) } func TestTable_CheckFlush(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) c.Push(qTitle, qContent) count := 0 for !c.Tables[qTitle].Empty() { @@ -163,7 +163,7 @@ func TestTable_CheckFlush(t *testing.T) { } func TestCollector_FlushAll(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) c.Push(qTitle, qContent) c.FlushAll() } diff --git a/config.sample.json b/config.sample.json index 518167e..9974419 100644 --- a/config.sample.json +++ b/config.sample.json @@ -2,6 +2,8 @@ "listen": ":8124", "flush_count": 10000, "flush_interval": 1000, + "clean_interval": 0, + "remove_query_id": true, "dump_check_interval": 300, "debug": false, "dump_dir": "dumps", @@ -12,4 +14,4 @@ "http://127.0.0.1:8123" ] } -} \ No newline at end of file +} diff --git a/go.mod b/go.mod index 513ad30..a0cce61 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,14 @@ require ( github.com/labstack/gommon v0.2.8 // indirect github.com/mattn/go-colorable v0.1.0 // indirect github.com/mattn/go-isatty v0.0.4 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/nikepan/go-datastructures v1.0.32 github.com/prometheus/client_golang v1.1.0 github.com/stretchr/testify v1.3.0 github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 // indirect + golang.org/x/tools v0.0.0-20201105220310-78b158585360 // indirect ) go 1.13 diff --git a/go.sum b/go.sum index 8306b0d..77d9690 100644 --- a/go.sum +++ b/go.sum @@ -67,19 +67,43 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 h1:gKMu1Bf6QINDnvyZuTaACm9ofY+PRh+5vFz4oxBZeF8= github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4/go.mod h1:50wTf68f99/Zt14pr046Tgt3Lp2vLyFZKzbFXTOabXw= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201105220310-78b158585360 h1:/9CzsU8hOpnSUCtem1vfWNgsVeCTgkMdx+VE5YIYxnU= +golang.org/x/tools v0.0.0-20201105220310-78b158585360/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/server.go b/server.go index cbc3679..b821c92 100644 --- a/server.go +++ b/server.go @@ -10,6 +10,11 @@ import ( "syscall" "time" + // debug stuff + _ "net/http/pprof" + "runtime" + "runtime/debug" + "github.com/labstack/echo" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -65,6 +70,30 @@ func (server *Server) statusHandler(c echo.Context) error { return c.JSON(200, Status{Status: "ok"}) } +func (server *Server) gcHandler(c echo.Context) error { + runtime.GC() + return c.JSON(200, Status{Status: "GC"}) +} + +func (server *Server) freeMemHandler(c echo.Context) error { + debug.FreeOSMemory() + return c.JSON(200, Status{Status: "freeMem"}) +} + +// manual trigger for cleaning tables +func (server *Server) tablesCleanHandler(c echo.Context) error { + log.Printf("DEBUG: clean tables:\n%+v", server.Collector.Tables) + for k, t := range server.Collector.Tables { + log.Printf("DEBUG: check if table is empty: %+v with key:%+v\n", t, k) + if ok := t.Empty(); ok { + log.Printf("DEBUG: delete empty table: %+v with key:%+v\n", t, k) + server.Collector.Tables[k].CleanTable() + defer delete(server.Collector.Tables, k) + } + } + return c.JSON(200, Status{Status: "cleaned empty tables"}) +} + // Start - start http server func (server *Server) Start() error { return server.echo.Start(server.Listen) @@ -81,6 +110,11 @@ func InitServer(listen string, collector *Collector, debug bool) *Server { server.echo.POST("/", server.writeHandler) server.echo.GET("/status", server.statusHandler) server.echo.GET("/metrics", echo.WrapHandler(promhttp.Handler())) + // debug stuff + server.echo.GET("/debug/gc", server.gcHandler) + server.echo.GET("/debug/freemem", server.freeMemHandler) + server.echo.GET("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux)) + server.echo.GET("/debug/tables-clean", server.tablesCleanHandler) return server } @@ -107,7 +141,7 @@ func RunServer(cnf Config) { sender.AddServer(url) } - collect := NewCollector(sender, cnf.FlushCount, cnf.FlushInterval) + collect := NewCollector(sender, cnf.FlushCount, cnf.FlushInterval, cnf.CleanInterval, cnf.RemoveQueryID) // send collected data on SIGTERM and exit signals := make(chan os.Signal) diff --git a/server_test.go b/server_test.go index 120ba53..ea941c1 100644 --- a/server_test.go +++ b/server_test.go @@ -18,7 +18,7 @@ import ( ) func TestRunServer(t *testing.T) { - collector := NewCollector(&fakeSender{}, 1000, 1000) + collector := NewCollector(&fakeSender{}, 1000, 1000, 0, true) server := InitServer("", collector, false) go server.Start() server.echo.POST("/", server.writeHandler) @@ -58,7 +58,7 @@ func TestRunServer(t *testing.T) { func TestServer_SafeQuit(t *testing.T) { sender := &fakeSender{} - collect := NewCollector(sender, 1000, 1000) + collect := NewCollector(sender, 1000, 1000, 0, true) collect.AddTable("test") collect.Push("sss", "sss") @@ -96,7 +96,7 @@ func TestServer_MultiServer(t *testing.T) { sender := NewClickhouse(10, 10) sender.AddServer(s1.URL) sender.AddServer(s2.URL) - collect := NewCollector(sender, 1000, 1000) + collect := NewCollector(sender, 1000, 1000, 0, true) collect.AddTable("test") collect.Push("eee", "eee") collect.Push("fff", "fff") diff --git a/utils.go b/utils.go index bfc0028..9de428c 100644 --- a/utils.go +++ b/utils.go @@ -22,6 +22,8 @@ type Config struct { Clickhouse clickhouseConfig `json:"clickhouse"` FlushCount int `json:"flush_count"` FlushInterval int `json:"flush_interval"` + CleanInterval int `json:"clean_interval"` + RemoveQueryID bool `json:"remove_query_id"` DumpCheckInterval int `json:"dump_check_interval"` DumpDir string `json:"dump_dir"` Debug bool `json:"debug"` @@ -54,6 +56,17 @@ func readEnvInt(name string, value *int) { } } +func readEnvBool(name string, value *bool) { + s := os.Getenv(name) + if s != "" { + v, err := strconv.ParseBool(s) + if err != nil { + log.Printf("ERROR: Wrong %+v env: %+v\n", name, err) + } + *value = v + } +} + // ReadConfig init config data func ReadConfig(configFile string) (Config, error) { cnf := Config{} @@ -66,8 +79,11 @@ func ReadConfig(configFile string) (Config, error) { } } + readEnvBool("CLICKHOUSE_BULK_DEBUG", &cnf.Debug) readEnvInt("CLICKHOUSE_FLUSH_COUNT", &cnf.FlushCount) readEnvInt("CLICKHOUSE_FLUSH_INTERVAL", &cnf.FlushInterval) + readEnvInt("CLICKHOUSE_CLEAN_INTERVAL", &cnf.CleanInterval) + readEnvBool("CLICKHOUSE_REMOVE_QUERY_ID", &cnf.RemoveQueryID) readEnvInt("DUMP_CHECK_INTERVAL", &cnf.DumpCheckInterval) readEnvInt("CLICKHOUSE_DOWN_TIMEOUT", &cnf.Clickhouse.DownTimeout) readEnvInt("CLICKHOUSE_CONNECT_TIMEOUT", &cnf.Clickhouse.ConnectTimeout)