forked from sehanko/fluent-plugin-clickhouse-json
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathout_clickhousejson.rb
104 lines (91 loc) · 3.63 KB
/
out_clickhousejson.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
require 'fluent/plugin/output'
require 'fluent/config/error'
require 'net/http'
require 'date'
require 'yajl'
module Fluent
module Plugin
class ClickhousejsonOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output("clickhousejson", self)
helpers :compat_parameters
DEFAULT_TIMEKEY = 60 * 60 * 24
desc "IP or fqdn of ClickHouse node"
config_param :host, :string
desc "Port of ClickHouse HTTP interface"
config_param :port, :integer, default: 8123
desc "Database to use"
config_param :database, :string, default: "default"
desc "Table to use"
config_param :table, :string
desc "User of Clickhouse database"
config_param :user, :string, default: "default"
desc "Password of Clickhouse database"
config_param :password, :string, default: ""
desc "Offset in minutes, could be useful to substract timestamps because of timezones"
config_param :tz_offset, :integer, default: 0
desc "Name of internal fluentd time field (if need to use)"
config_param :datetime_name, :string, default: nil
config_section :buffer do
config_set_default :@type, "memory"
config_set_default :flush_mode, :interval
config_set_default :flush_interval, 1
config_set_default :flush_thread_interval, 0.05
config_set_default :flush_thread_burst_interval, 0.05
config_set_default :chunk_limit_size, 1 * 1024 ** 2 # 1MB
config_set_default :total_limit_size, 1 * 1024 ** 3 # 1GB
config_set_default :chunk_limit_records, 500
end
def multi_workers_ready?
true
end
def configure(conf)
super
@uri, @uri_params = make_uri(conf)
@table = conf["table"]
@tz_offset = conf["tz_offset"].to_i
@datetime_name = conf["datetime_name"]
test_connection(conf)
end
def test_connection(conf)
uri = @uri.clone
uri.query = URI.encode_www_form(@uri_params.merge({"query" => "SHOW TABLES"}))
begin
res = Net::HTTP.get_response(uri)
rescue Errno::ECONNREFUSED
raise Fluent::ConfigError, "Couldn't connect to ClickHouse at #{ @uri } - connection refused"
end
if res.code != "200"
raise Fluent::ConfigError, "ClickHouse server responded non-200 code: #{ res.body }"
end
end
def make_uri(conf)
uri = URI("http://#{ conf["host"] }:#{ conf["port"] || 8123 }/")
params = {
"database" => conf["database"] || "default",
"user" => conf["user"] || "default",
"password" => conf["password"] || "",
"input_format_skip_unknown_fields" => 1
}
return uri, params
end
def format(tag, timestamp, record)
if @datetime_name
record[@datetime_name] = timestamp + @tz_offset * 60
end
return Yajl.dump(record) + "\n"
end
def write(chunk)
uri = @uri.clone
query = {"query" => "INSERT INTO #{@table} FORMAT JSONEachRow"}
uri.query = URI.encode_www_form(@uri_params.merge(query))
req = Net::HTTP::Post.new(uri)
req.body = chunk.read
http = Net::HTTP.new(uri.hostname, uri.port)
resp = http.request(req)
if resp.code != "200"
raise "Clickhouse responded: #{resp.body}"
end
end
end
end
end