-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregation_mongo.rb
94 lines (84 loc) · 2.47 KB
/
aggregation_mongo.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
module Fluent
class AggregationMongo < AggregationBase
Plugin.register_output('aggregation_mongo', self)
def initialize
super
require 'mongo'
end
=begin
if RUBY_VERSION >= '1.9'
UTF8_ENCODING = Encoding.find('utf-8')
BINARY_ENCODING = Encoding.find('binary')
def to_utf8_binary(str)
begin
str.unpack("U*")
rescue => ex
raise InvalidStringEncoding, "String not valid utf-8: #{str.inspect}"
end
str.encode(UTF8_ENCODING).force_encoding(BINARY_ENCODING)
end
else
def to_utf8_binary(str)
begin
str.unpack("U*")
rescue => ex
raise InvalidStringEncoding, "String not valid utf-8: #{str.inspect}"
end
str
end
end
=end
def configure(conf)
super
raise ConfigError, "'database' parameter is required on MongoDump output" unless @database = conf['database']
raise ConfigError, "'collectoin' parameter is required on MongoDump output" unless @collection = conf['collection']
@host = conf['host'] || 'localhost'
@port = conf['port'] || 27017
if conf.has_key?('capped')
@collection_opts = { :capped => true }
@collectoin_opts[:size] = conf.has_key?('cap_size') ? Config.size_value(cap_size) : Config.size_value('1000m')
@collection_opts[:max] = Config.size_value(cap_size) if conf.has_key?('cap_max')
end
end
def start
@coll = get_collection(@collection)
super
end
def shutdown
super
@coll.db.connection.close
end
def write(chunk)
super
@pre_aggregation.each{ |uid, record|
inc_hash = { :count => record["count"] }
if values = record["value"]
values.each{ |k,v| inc_hash["value."+k.strip] = v }
end
@coll.update(
{
:_id => uid
#:_id => to_utf8_binary(uid)
}, {
"$inc" => inc_hash,
"$set" => {
:name => record["name"],
:key => record["key"],
:partition => record["partition"]
}
}, {
:safe => true,
:upsert => true
})
}
end
def get_collection(collection)
db = Mongo::Connection.new(@host, @port).db(@database)
if db.collection_names.include?(collection)
return db.collection(collection)
else
return db.create_collection(collection, @collection_opts)
end
end
end
end