Skip to content

Commit

Permalink
Merge pull request #35 from splichy/memDebug
Browse files Browse the repository at this point in the history
Multiline inserts, fix memory leak caused by query_id, fix stack trace when no live server
  • Loading branch information
nikepan authored Nov 16, 2020
2 parents 1286592 + dcc2f86 commit 2a6d515
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 35 deletions.
19 changes: 13 additions & 6 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ type Clickhouse struct {

// ClickhouseRequest - request struct for queue
type ClickhouseRequest struct {
Params string
Query string
Content string
Count int
Params string
Query string
Content string
Count int
isInsert bool
}

// ErrServerIsDown - signals about server is down
Expand Down Expand Up @@ -179,11 +180,17 @@ func (srv *ClickhouseServer) SendQuery(r *ClickhouseRequest) (response string, s
if r.Params != "" {
url += "?" + r.Params
}
log.Printf("INFO: send %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query)
if r.isInsert {
log.Printf("INFO: sending %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query)
}
resp, err := srv.Client.Post(url, "", strings.NewReader(r.Content))
if err != nil {
srv.Bad = true
return err.Error(), http.StatusBadGateway, ErrServerIsDown
} else {
if r.isInsert {
log.Printf("INFO: sent %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query)
}
}
buf, _ := ioutil.ReadAll(resp.Body)
s := string(buf)
Expand Down Expand Up @@ -211,6 +218,6 @@ func (c *Clickhouse) SendQuery(r *ClickhouseRequest) (response string, status in
}
return response, status, err
}
return response, status, ErrNoServers
return "", http.StatusServiceUnavailable, ErrNoServers
}
}
2 changes: 1 addition & 1 deletion clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestClickhouse_SendQuery(t *testing.T) {
c.GetNextServer()
c.Servers[0].Bad = true
_, status, err := c.SendQuery(&ClickhouseRequest{})
assert.Equal(t, 0, status)
assert.Equal(t, 503, status)
assert.True(t, errors.Is(err, ErrNoServers))
}

Expand Down
104 changes: 92 additions & 12 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Table struct {
FlushInterval int
mu sync.Mutex
Sender Sender
TickerChan *chan struct{}
lastUpdate time.Time
// todo add Last Error
}

Expand All @@ -36,7 +38,10 @@ type Collector struct {
mu sync.RWMutex
Count int
FlushInterval int
CleanInterval int
RemoveQueryID bool
Sender Sender
TickerChan *chan struct{}
}

// NewTable - default table constructor
Expand All @@ -50,12 +55,17 @@ func NewTable(name string, sender Sender, count int, interval int) (t *Table) {
}

// NewCollector - default collector constructor
func NewCollector(sender Sender, count int, interval int) (c *Collector) {
func NewCollector(sender Sender, count int, interval int, cleanInterval int, removeQueryID bool) (c *Collector) {
c = new(Collector)
c.Sender = sender
c.Tables = make(map[string]*Table)
c.Count = count
c.FlushInterval = interval
c.CleanInterval = cleanInterval
c.RemoveQueryID = removeQueryID
if cleanInterval > 0 {
c.TickerChan = c.RunTimer()
}
return c
}

Expand All @@ -71,10 +81,11 @@ func (t *Table) Content() string {
// Flush - sends collected data in table to clickhouse
func (t *Table) Flush() {
req := ClickhouseRequest{
Params: t.Params,
Query: t.Query,
Content: t.Content(),
Count: t.count,
Params: t.Params,
Query: t.Query,
Content: t.Content(),
Count: len(t.Rows),
isInsert: true,
}
t.Sender.Send(&req)
t.Rows = make([]string, 0, t.FlushCount)
Expand Down Expand Up @@ -105,24 +116,76 @@ func (t *Table) GetCount() int {
}

// RunTimer - timer for periodical savings data
func (t *Table) RunTimer() {
ticker := time.NewTicker(time.Millisecond * time.Duration(t.FlushInterval))
func (t *Table) RunTimer() *chan struct{} {
done := make(chan struct{})
go func() {
for range ticker.C {
t.CheckFlush()
ticker := time.NewTicker(time.Millisecond * time.Duration(t.FlushInterval))
defer ticker.Stop()
for {
select {
case <-ticker.C:
t.CheckFlush()
case <-done:
return
}
}
}()
return &done
}

// Add - Adding query to table
func (t *Table) Add(text string) {
t.mu.Lock()
defer t.mu.Unlock()
t.count++
t.Rows = append(t.Rows, text)
if t.Format == "TabSeparated" {
t.Rows = append(t.Rows, strings.Split(text, "\n")...)
} else {
t.Rows = append(t.Rows, text)
}
if len(t.Rows) >= t.FlushCount {
t.Flush()
}
t.lastUpdate = time.Now()
}

// CleanTable - delete table from map
func (t *Table) CleanTable() {
t.mu.Lock()
close(*t.TickerChan)
t = nil
}

// CleanTables - clean unsused tables
func (c *Collector) CleanTables() {
c.mu.Lock()
defer c.mu.Unlock()
for k, t := range c.Tables {
if t.lastUpdate.Add(time.Duration(c.CleanInterval) * time.Millisecond).Before(time.Now()) {
// table was not updated for CleanInterval - delete that table - otherwise it can cause memLeak
t.CleanTable()
defer delete(c.Tables, k)
}

}
}

// RunTimer - timer for periodical cleaning unused tables
func (c *Collector) RunTimer() *chan struct{} {
done := make(chan struct{})
go func() {
ticker := time.NewTicker(time.Duration(c.CleanInterval) * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.CleanTables()
case <-done:
return
}
}
}()
return &done
}

// Empty - check if all tables are empty
Expand Down Expand Up @@ -198,12 +261,29 @@ func (c *Collector) addTable(name string) *Table {
t.Params = params
t.Format = c.getFormat(query)
c.Tables[name] = t
t.RunTimer()
t.TickerChan = t.RunTimer()
t.lastUpdate = time.Now()
return t
}

// Push - adding query to collector with query params (with query) and rows
func (c *Collector) Push(params string, content string) {
func (c *Collector) Push(paramsIn string, content string) {
// as we are using all params as a table key, we have to remove query_id
// otherwise every query will be threated as unique thus it will consume more memory
params := ""
if c.RemoveQueryID {
items := strings.Split(paramsIn, "&")
for _, p := range items {
if !HasPrefix(p, "query_id=") {
params += "&" + p
}
}
if len(params) > 0 {
params = strings.TrimSpace(params[1:])
}
} else {
params = paramsIn
}
c.mu.RLock()
table, ok := c.Tables[params]
if ok {
Expand Down
16 changes: 8 additions & 8 deletions collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,30 @@ var escSelect = url.QueryEscape(qSelect)
var escParamsAndSelect = qParams + "&query=" + escSelect

func BenchmarkCollector_Push(t *testing.B) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
for i := 0; i < 30000; i++ {
c.Push(escTitle, qContent)
}
}

func TestCollector_Push(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
for i := 0; i < 10400; i++ {
c.Push(escTitle, qContent)
}
assert.Equal(t, 400, c.Tables[escTitle].GetCount())
}

func BenchmarkCollector_ParseQuery(b *testing.B) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
c.ParseQuery("", qTitle+" "+qContent)
c.ParseQuery(qParams, qTitle+" "+qContent)
c.ParseQuery("query="+escTitle, qContent)
c.ParseQuery(qParams+"&query="+escTitle, qContent)
}

func TestCollector_ParseQuery(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
var params string
var content string
var insert bool
Expand Down Expand Up @@ -139,20 +139,20 @@ func TestCollector_ParseQuery(t *testing.T) {
}

func TestCollector_separateQuery(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
query, params := c.separateQuery(escParamsAndSelect)
assert.Equal(t, qSelect, query)
assert.Equal(t, qParams, params)
}

func TestTable_getFormat(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
f := c.getFormat(qTitle)
assert.Equal(t, "TabSeparated", f)
}

func TestTable_CheckFlush(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
c.Push(qTitle, qContent)
count := 0
for !c.Tables[qTitle].Empty() {
Expand All @@ -163,7 +163,7 @@ func TestTable_CheckFlush(t *testing.T) {
}

func TestCollector_FlushAll(t *testing.T) {
c := NewCollector(&fakeSender{}, 1000, 1000)
c := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
c.Push(qTitle, qContent)
c.FlushAll()
}
4 changes: 3 additions & 1 deletion config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"listen": ":8124",
"flush_count": 10000,
"flush_interval": 1000,
"clean_interval": 0,
"remove_query_id": true,
"dump_check_interval": 300,
"debug": false,
"dump_dir": "dumps",
Expand All @@ -12,4 +14,4 @@
"http://127.0.0.1:8123"
]
}
}
}
6 changes: 4 additions & 2 deletions dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,18 @@ func (d *FileDumper) ProcessNextDump(sender Sender) error {
}
if data != "" {
params := ""
query := ""
lines := strings.Split(data, "\n")
if !HasPrefix(lines[0], "insert") {
params = lines[0]
query = lines[1]
data = strings.Join(lines[1:], "\n")
}
_, status, err := sender.SendQuery(&ClickhouseRequest{Params: params, Content: data})
_, status, err := sender.SendQuery(&ClickhouseRequest{Params: params, Content: data, Query: query, Count: len(lines[2:]), isInsert: true})
if err != nil {
return fmt.Errorf("server error (%+v) %+v", status, err)
}
log.Printf("INFO: dump sended: %+v\n", f)
log.Printf("INFO: dump sent: %+v\n", f)
}
err = d.DeleteDump(f)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ require (
github.com/labstack/gommon v0.2.8 // indirect
github.com/mattn/go-colorable v0.1.0 // indirect
github.com/mattn/go-isatty v0.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/nikepan/go-datastructures v1.0.32
github.com/prometheus/client_golang v1.1.0
github.com/stretchr/testify v1.3.0
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 // indirect
golang.org/x/tools v0.0.0-20201105220310-78b158585360 // indirect
)

go 1.13
24 changes: 24 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,43 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 h1:gKMu1Bf6QINDnvyZuTaACm9ofY+PRh+5vFz4oxBZeF8=
github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4/go.mod h1:50wTf68f99/Zt14pr046Tgt3Lp2vLyFZKzbFXTOabXw=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201105220310-78b158585360 h1:/9CzsU8hOpnSUCtem1vfWNgsVeCTgkMdx+VE5YIYxnU=
golang.org/x/tools v0.0.0-20201105220310-78b158585360/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var date = "unknown"
func main() {

log.SetOutput(os.Stdout)
log.SetFlags(log.LstdFlags | log.Lmicroseconds)

configFile := flag.String("config", "config.json", "config file (json)")

Expand Down
Loading

0 comments on commit 2a6d515

Please sign in to comment.