Skip to content

Commit

Permalink
Merge pull request #16 from nikepan/prometheus
Browse files Browse the repository at this point in the history
Prometheus & dump sender & fixes & refactoring
  • Loading branch information
nikepan authored Oct 22, 2019
2 parents 2478ca2 + 5d9ffdd commit 0139d07
Show file tree
Hide file tree
Showing 20 changed files with 788 additions and 243 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ _testmain.go
*.exe
*.test
*.prof
dumptest
dumps

.vscode
.idea/
clickhouse-bulk
debug
check.py
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: go
sudo: false
go:
- 1.12.4
- 1.13.1
- tip

env:
Expand All @@ -28,4 +28,4 @@ deploy:
on:
tags: true
repo: nikepan/clickhouse-bulk
condition: $TRAVIS_GO_VERSION =~ ^1\.12\.[0-9]+$
condition: $TRAVIS_GO_VERSION =~ ^1\.13\.[0-9]+$
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.12.4 as builder
FROM golang:1.13.1 as builder

ARG GOPROXY
ENV GOOS=linux \
Expand All @@ -18,6 +18,7 @@ RUN go build -v
FROM alpine:latest
RUN apk add ca-certificates
WORKDIR /app
RUN mkdir /app/dumps
COPY --from=builder /go/src/github.com/nikepan/clickhouse-bulk/config.sample.json .
COPY --from=builder /go/src/github.com/nikepan/clickhouse-bulk/clickhouse-bulk .
EXPOSE 8123
Expand Down
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ or
[Use docker image](https://hub.docker.com/r/nikepan/clickhouse-bulk/)


or from sources (Go 1.11+):
or from sources (Go 1.13+):

```text
git clone https://github.com/nikepan/clickhouse-bulk
Expand Down Expand Up @@ -58,10 +58,12 @@ INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')('v4', 'v5', 'v6')
"listen": ":8124",
"flush_count": 10000, // check by \n char
"flush_interval": 1000, // milliseconds
"dump_check_interval": 300, // interval for try to send dumps (seconds); -1 to disable
"debug": false, // log incoming requests
"dump_dir": "dumps", // directory for dump unsended data (if clickhouse errors)
"clickhouse": {
"down_timeout": 300, // wait if server in down (seconds)
"down_timeout": 60, // wait if server in down (seconds)
"connect_timeout": 10, // wait for server connect (seconds)
"servers": [
"http://127.0.0.1:8123"
]
Expand All @@ -80,6 +82,16 @@ INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')('v4', 'v5', 'v6')
`./clickhouse-bulk`
and send queries to :8124

### Metrics
manual check main metrics
`curl -s http://127.0.0.1:8124/metrics | grep "^ch_"`
* `ch_bad_servers 0` - actual count of bad servers
* `ch_dump_count 0` - dumps saved from launch
* `ch_queued_dumps 0` - actual dump files id directory
* `ch_good_servers 1` - actual good servers count
* `ch_received_count 40` - received requests count from launch
* `ch_sent_count 1` - sent request count from launch


### Tips

Expand Down
113 changes: 81 additions & 32 deletions clickhouse.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
Expand All @@ -21,24 +23,37 @@ type ClickhouseServer struct {

// Clickhouse - main clickhouse sender object
type Clickhouse struct {
Servers []*ClickhouseServer
Queue *queue.Queue
mu sync.Mutex
DownTimeout int
Dumper Dumper
wg sync.WaitGroup
Servers []*ClickhouseServer
Queue *queue.Queue
mu sync.Mutex
DownTimeout int
ConnectTimeout int
Dumper Dumper
wg sync.WaitGroup
}

// ClickhouseRequest - request struct for queue
type ClickhouseRequest struct {
Params string
Query string
Content string
Count int
}

// ErrServerIsDown - signals about server is down
var ErrServerIsDown = errors.New("server is down")

// ErrNoServers - signals about no working servers
var ErrNoServers = errors.New("No working clickhouse servers")

// NewClickhouse - get clickhouse object
func NewClickhouse(downTimeout int) (c *Clickhouse) {
func NewClickhouse(downTimeout int, connectTimeout int) (c *Clickhouse) {
c = new(Clickhouse)
c.DownTimeout = downTimeout
c.ConnectTimeout = connectTimeout
if c.ConnectTimeout > 0 {
c.ConnectTimeout = 10
}
c.Servers = make([]*ClickhouseServer, 0)
c.Queue = queue.New(1000)
go c.Run()
Expand All @@ -49,7 +64,26 @@ func NewClickhouse(downTimeout int) (c *Clickhouse) {
func (c *Clickhouse) AddServer(url string) {
c.mu.Lock()
defer c.mu.Unlock()
c.Servers = append(c.Servers, &ClickhouseServer{URL: url, Client: &http.Client{}})
c.Servers = append(c.Servers, &ClickhouseServer{URL: url, Client: &http.Client{
Timeout: time.Second * time.Duration(c.ConnectTimeout),
}})
}

// DumpServers - dump servers state to prometheus
func (c *Clickhouse) DumpServers() {
c.mu.Lock()
defer c.mu.Unlock()
good := 0
bad := 0
for _, s := range c.Servers {
if s.Bad {
bad++
} else {
good++
}
}
goodServers.Set(float64(good))
badServers.Set(float64(bad))
}

// GetNextServer - getting next server for request
Expand Down Expand Up @@ -81,18 +115,18 @@ func (c *Clickhouse) GetNextServer() (srv *ClickhouseServer) {
}

// Send - send request to next server
func (c *Clickhouse) Send(queryString string, data string) {
req := ClickhouseRequest{queryString, data}
func (c *Clickhouse) Send(r *ClickhouseRequest) {
c.wg.Add(1)
c.Queue.Put(req)
c.Queue.Put(r)
}

// Dump - save query to file
func (c *Clickhouse) Dump(params string, data string) error {
func (c *Clickhouse) Dump(params string, content string, response string, prefix string, status int) error {
dumpCounter.Inc()
if c.Dumper != nil {
c.mu.Lock()
defer c.mu.Unlock()
return c.Dumper.Dump(params, data)
return c.Dumper.Dump(params, content, response, prefix, status)
}
return nil
}
Expand All @@ -114,12 +148,19 @@ func (c *Clickhouse) Run() {
for {
datas, err = c.Queue.Poll(1, time.Second*5)
if err == nil {
data := datas[0].(ClickhouseRequest)
resp, status := c.SendQuery(data.Params, data.Content)
if status != http.StatusOK {
log.Printf("Send ERROR %+v: %+v\n", status, resp)
c.Dump(data.Params, data.Content)
data := datas[0].(*ClickhouseRequest)
resp, status, err := c.SendQuery(data)
if err != nil {
log.Printf("ERROR: Send (%+v) %+v; response %+v\n", status, err, resp)
prefix := "1"
if status >= 400 && status < 502 {
prefix = "2"
}
c.Dump(data.Params, data.Content, resp, prefix, status)
} else {
sentCounter.Inc()
}
c.DumpServers()
c.wg.Done()
}
}
Expand All @@ -132,36 +173,44 @@ func (c *Clickhouse) WaitFlush() (err error) {
}

// SendQuery - sends query to server and return result
func (srv *ClickhouseServer) SendQuery(queryString string, data string) (response string, status int) {
func (srv *ClickhouseServer) SendQuery(r *ClickhouseRequest) (response string, status int, err error) {
if srv.URL != "" {

log.Printf("send %+v rows to %+v of %+v\n", strings.Count(data, "\n")+1, srv.URL, queryString)

resp, err := srv.Client.Post(srv.URL+"?"+queryString, "", strings.NewReader(data))
url := srv.URL
if r.Params != "" {
url += "?" + r.Params
}
log.Printf("INFO: send %+v rows to %+v of %+v\n", r.Count, url, r.Query)
resp, err := srv.Client.Post(url, "", strings.NewReader(r.Content))
if err != nil {
srv.Bad = true
return err.Error(), http.StatusBadGateway
return err.Error(), http.StatusBadGateway, ErrServerIsDown
}
buf, _ := ioutil.ReadAll(resp.Body)
s := string(buf)
return s, resp.StatusCode
if resp.StatusCode >= 502 {
srv.Bad = true
err = ErrServerIsDown
} else if resp.StatusCode >= 400 {
err = fmt.Errorf("Wrong server status %+v:\nresponse: %+v\nrequest: %#v", resp.StatusCode, s, r.Content)
}
return s, resp.StatusCode, err
}

return "", http.StatusOK
return "", http.StatusOK, err
}

// SendQuery - sends query to server and return result (with server cycle)
func (c *Clickhouse) SendQuery(queryString string, data string) (response string, status int) {
func (c *Clickhouse) SendQuery(r *ClickhouseRequest) (response string, status int, err error) {
for {
s := c.GetNextServer()
if s != nil {
r, status := s.SendQuery(queryString, data)
if status == http.StatusBadGateway {
response, status, err = s.SendQuery(r)
if errors.Is(err, ErrServerIsDown) {
log.Printf("ERROR: server down (%+v): %+v\n", status, response)
continue
}
return r, status
return response, status, err
}
c.Dump(queryString, data)
return "No working clickhouse servers", http.StatusBadGateway
return response, status, ErrNoServers
}
}
41 changes: 15 additions & 26 deletions clickhouse_test.go
Original file line number Diff line number Diff line change
@@ -1,68 +1,57 @@
package main

import (
"github.com/stretchr/testify/assert"
"io/ioutil"
"errors"
"net/http"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestClickhouse_GetNextServer(t *testing.T) {
c := NewClickhouse(300)
c := NewClickhouse(300, 10)
c.AddServer("")
c.AddServer("http://127.0.0.1:8124")
c.AddServer("http://127.0.0.1:8125")
c.AddServer("http://127.0.0.1:8123")
s := c.GetNextServer()
assert.Equal(t, "", s.URL)
s.SendQuery("", "")
s.SendQuery(&ClickhouseRequest{})
s = c.GetNextServer()
assert.Equal(t, "http://127.0.0.1:8124", s.URL)
resp, status := s.SendQuery("", "")
resp, status, err := s.SendQuery(&ClickhouseRequest{})
assert.NotEqual(t, "", resp)
assert.Equal(t, http.StatusBadGateway, status)
assert.True(t, errors.Is(err, ErrServerIsDown))
assert.Equal(t, true, s.Bad)
c.SendQuery("", "")
c.SendQuery(&ClickhouseRequest{})
}

func TestClickhouse_Send(t *testing.T) {
c := NewClickhouse(300)
c := NewClickhouse(300, 10)
c.AddServer("")
c.Send("", "")
c.Send(&ClickhouseRequest{})
for !c.Queue.Empty() {
time.Sleep(10)
}
}

func TestClickhouse_SendQuery(t *testing.T) {
c := NewClickhouse(300)
c := NewClickhouse(300, 10)
c.AddServer("")
c.GetNextServer()
c.Servers[0].Bad = true
_, status := c.SendQuery("", "")
assert.Equal(t, http.StatusBadGateway, status)
_, status, err := c.SendQuery(&ClickhouseRequest{})
assert.Equal(t, 0, status)
assert.True(t, errors.Is(err, ErrNoServers))
}

func TestClickhouse_SendQuery1(t *testing.T) {
c := NewClickhouse(-1)
c := NewClickhouse(-1, 10)
c.AddServer("")
c.GetNextServer()
c.Servers[0].Bad = true
s := c.GetNextServer()
assert.Equal(t, false, s.Bad)
}

func TestClickhouse_Dump(t *testing.T) {
const dumpName = "dump1.dmp"
c := NewClickhouse(-1)
c.Dumper = new(FileDumper)
c.AddServer("")
c.Dump("eee", "eee")
assert.True(t, c.Empty())
buf, err := ioutil.ReadFile(dumpName)
assert.Nil(t, err)
assert.Equal(t, "eee\neee", string(buf))
os.Remove(dumpName)
}
Loading

0 comments on commit 0139d07

Please sign in to comment.