Skip to content

Commit 9fcbac0

Browse files
authored
Remove EventMachine from Puma (#4661)
* preparation for Thin removal * replace EventMachine with Concurrent::TimerTask * remove "thread_info_event_machine" metrics
1 parent 5fdcb1e commit 9fcbac0

File tree

8 files changed

+23
-270
lines changed

8 files changed

+23
-270
lines changed

lib/cloud_controller/metrics/periodic_updater.rb

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,21 @@ def initialize(start_time, log_counter, logger, statsd_updater, prometheus_updat
1616

1717
def setup_updates
1818
update!
19-
EM.add_periodic_timer(600) { catch_error { update_user_count } }
20-
EM.add_periodic_timer(30) { catch_error { update_job_queue_length } }
21-
EM.add_periodic_timer(30) { catch_error { update_job_queue_load } }
22-
EM.add_periodic_timer(30) { catch_error { update_thread_info } }
23-
EM.add_periodic_timer(30) { catch_error { update_failed_job_count } }
24-
EM.add_periodic_timer(30) { catch_error { update_vitals } }
25-
EM.add_periodic_timer(30) { catch_error { update_log_counts } }
26-
EM.add_periodic_timer(30) { catch_error { update_task_stats } }
27-
EM.add_periodic_timer(30) { catch_error { update_deploying_count } }
28-
EM.add_periodic_timer(30) { catch_error { update_webserver_stats } }
19+
Concurrent::TimerTask.new(execution_interval: 600) { catch_error { update_user_count } }.execute
20+
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_length } }.execute
21+
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_load } }.execute
22+
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_failed_job_count } }.execute
23+
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_vitals } }.execute
24+
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_log_counts } }.execute
25+
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_task_stats } }.execute
26+
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_deploying_count } }.execute
27+
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_webserver_stats } }.execute
2928
end
3029

3130
def update!
3231
update_user_count
3332
update_job_queue_length
3433
update_job_queue_load
35-
update_thread_info
3634
update_failed_job_count
3735
update_vitals
3836
update_log_counts
@@ -110,13 +108,6 @@ def update_job_queue_load
110108
@prometheus_updater.update_job_queue_load(pending_job_load_by_queue)
111109
end
112110

113-
def update_thread_info
114-
return unless VCAP::CloudController::Config.config.get(:webserver) == 'thin'
115-
116-
local_thread_info = thread_info_thin
117-
[@statsd_updater, @prometheus_updater].each { |u| u.update_thread_info_thin(local_thread_info) }
118-
end
119-
120111
def update_failed_job_count
121112
jobs_by_queue_with_count = Delayed::Job.where(Sequel.lit('failed_at IS NOT NULL')).group_and_count(:queue)
122113

@@ -173,24 +164,5 @@ def update_webserver_stats
173164
end
174165
@prometheus_updater.update_webserver_stats_puma(worker_count, worker_stats)
175166
end
176-
177-
def thread_info_thin
178-
threadqueue = EM.instance_variable_get(:@threadqueue) || []
179-
resultqueue = EM.instance_variable_get(:@resultqueue) || []
180-
{
181-
thread_count: Thread.list.size,
182-
event_machine: {
183-
connection_count: EventMachine.connection_count,
184-
threadqueue: {
185-
size: threadqueue.size,
186-
num_waiting: threadqueue.is_a?(Array) ? 0 : threadqueue.num_waiting
187-
},
188-
resultqueue: {
189-
size: resultqueue.size,
190-
num_waiting: resultqueue.is_a?(Array) ? 0 : resultqueue.num_waiting
191-
}
192-
}
193-
}
194-
end
195167
end
196168
end

lib/cloud_controller/metrics/prometheus_updater.rb

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,6 @@ def self.allow_pid_label
4141
{ type: :gauge, name: :cc_deployments_in_progress_total, docstring: 'Number of in progress deployments', aggregation: :most_recent }
4242
].freeze
4343

44-
THIN_METRICS = [
45-
{ type: :gauge, name: :cc_thread_info_thread_count, docstring: 'Thread count' },
46-
{ type: :gauge, name: :cc_thread_info_event_machine_connection_count, docstring: 'EventMachine connection count' },
47-
{ type: :gauge, name: :cc_thread_info_event_machine_threadqueue_size, docstring: 'EventMachine thread queue size' },
48-
{ type: :gauge, name: :cc_thread_info_event_machine_threadqueue_num_waiting, docstring: 'EventMachine num waiting in thread' },
49-
{ type: :gauge, name: :cc_thread_info_event_machine_resultqueue_size, docstring: 'EventMachine queue size' },
50-
{ type: :gauge, name: :cc_thread_info_event_machine_resultqueue_num_waiting, docstring: 'EventMachine requests waiting in queue' }
51-
].freeze
52-
5344
PUMA_METRICS = [
5445
{ type: :gauge, name: :cc_puma_worker_count, docstring: 'Puma worker count', aggregation: :most_recent },
5546
{ type: :gauge, name: :cc_puma_worker_started_at, docstring: 'Puma worker: started_at', labels: %i[index pid], aggregation: :most_recent },
@@ -88,7 +79,6 @@ def initialize(registry: Prometheus::Client.registry, cc_worker: false)
8879
return if cc_worker
8980

9081
METRICS.each { |metric| register(metric) }
91-
THIN_METRICS.each { |metric| register(metric) } if VCAP::CloudController::Config.config&.get(:webserver) == 'thin'
9282
PUMA_METRICS.each { |metric| register(metric) } if VCAP::CloudController::Config.config&.get(:webserver) == 'puma'
9383
end
9484

@@ -136,15 +126,6 @@ def update_job_queue_load(update_job_queue_load)
136126
end
137127
end
138128

139-
def update_thread_info_thin(thread_info)
140-
update_gauge_metric(:cc_thread_info_thread_count, thread_info[:thread_count])
141-
update_gauge_metric(:cc_thread_info_event_machine_connection_count, thread_info[:event_machine][:connection_count])
142-
update_gauge_metric(:cc_thread_info_event_machine_threadqueue_size, thread_info[:event_machine][:threadqueue][:size])
143-
update_gauge_metric(:cc_thread_info_event_machine_threadqueue_num_waiting, thread_info[:event_machine][:threadqueue][:num_waiting])
144-
update_gauge_metric(:cc_thread_info_event_machine_resultqueue_size, thread_info[:event_machine][:resultqueue][:size])
145-
update_gauge_metric(:cc_thread_info_event_machine_resultqueue_num_waiting, thread_info[:event_machine][:resultqueue][:num_waiting])
146-
end
147-
148129
def update_failed_job_count(failed_jobs_by_queue)
149130
failed_jobs_by_queue.each do |key, value|
150131
update_gauge_metric(:cc_failed_jobs_total, value, labels: { queue: key.to_s.underscore })

lib/cloud_controller/runners/puma_runner.rb

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,8 @@ def initialize(config, app, logger, periodic_updater, request_logs)
5959
events = Puma::Events.new
6060
events.after_booted do
6161
prometheus_updater.update_gauge_metric(:cc_db_connection_pool_timeouts_total, 0, labels: { process_type: 'main' })
62-
Thread.new do
63-
EM.run { periodic_updater.setup_updates }
64-
end
65-
end
66-
events.after_stopped do
67-
EM.stop
62+
periodic_updater.setup_updates
6863
end
69-
7064
@puma_launcher = Puma::Launcher.new(puma_config, log_writer:, events:)
7165
end
7266

spec/request/internal/metrics_spec.rb

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,19 +119,6 @@
119119
end
120120
end
121121

122-
context 'cc_thread_info' do
123-
it 'reports thread info' do
124-
get '/internal/v4/metrics', nil
125-
126-
expect(last_response.body).to match(/cc_thread_info_thread_count [0-9][0-9]*\.\d+/)
127-
expect(last_response.body).to match(/cc_thread_info_event_machine_connection_count [0-9][0-9]*\.\d+/)
128-
expect(last_response.body).to match(/cc_thread_info_event_machine_threadqueue_size [0-9][0-9]*\.\d+/)
129-
expect(last_response.body).to match(/cc_thread_info_event_machine_threadqueue_num_waiting [0-9][0-9]*\.\d+/)
130-
expect(last_response.body).to match(/cc_thread_info_event_machine_resultqueue_size [0-9][0-9]*\.\d+/)
131-
expect(last_response.body).to match(/cc_thread_info_event_machine_resultqueue_num_waiting [0-9][0-9]*\.\d+/)
132-
end
133-
end
134-
135122
context 'cc_failed_job_count' do
136123
before do
137124
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now + 1.day })

spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb

Lines changed: 13 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,10 @@ module VCAP::CloudController::Metrics
66
let(:periodic_updater) { PeriodicUpdater.new(start_time, log_counter, logger, statsd_updater, prometheus_updater) }
77
let(:statsd_updater) { double(:statsd_updater) }
88
let(:prometheus_updater) { double(:prometheus_updater) }
9-
let(:threadqueue) { double(EventMachine::Queue, size: 20, num_waiting: 0) }
10-
let(:resultqueue) { double(EventMachine::Queue, size: 0, num_waiting: 1) }
119
let(:start_time) { Time.now.utc - 90 }
1210
let(:log_counter) { double(:log_counter, counts: {}) }
1311
let(:logger) { double(:logger) }
1412

15-
before do
16-
allow(EventMachine).to receive(:connection_count).and_return(123)
17-
18-
allow(EventMachine).to receive(:instance_variable_get) do |instance_var|
19-
case instance_var
20-
when :@threadqueue
21-
threadqueue
22-
when :@resultqueue
23-
resultqueue
24-
else
25-
raise "Unexpected call: #{instance_var}"
26-
end
27-
end
28-
end
29-
3013
describe 'task stats' do
3114
before do
3215
allow(statsd_updater).to receive(:update_task_stats)
@@ -73,7 +56,6 @@ module VCAP::CloudController::Metrics
7356
allow(statsd_updater).to receive(:update_user_count)
7457
allow(statsd_updater).to receive(:update_job_queue_length)
7558
allow(statsd_updater).to receive(:update_job_queue_load)
76-
allow(statsd_updater).to receive(:update_thread_info_thin)
7759
allow(statsd_updater).to receive(:update_failed_job_count)
7860
allow(statsd_updater).to receive(:update_vitals)
7961
allow(statsd_updater).to receive(:update_log_counts)
@@ -83,14 +65,11 @@ module VCAP::CloudController::Metrics
8365
allow(prometheus_updater).to receive(:update_user_count)
8466
allow(prometheus_updater).to receive(:update_job_queue_length)
8567
allow(prometheus_updater).to receive(:update_job_queue_load)
86-
allow(prometheus_updater).to receive(:update_thread_info_thin)
8768
allow(prometheus_updater).to receive(:update_failed_job_count)
8869
allow(prometheus_updater).to receive(:update_vitals)
8970
allow(prometheus_updater).to receive(:update_log_counts)
9071
allow(prometheus_updater).to receive(:update_task_stats)
9172
allow(prometheus_updater).to receive(:update_deploying_count)
92-
93-
allow(EventMachine).to receive(:add_periodic_timer)
9473
end
9574

9675
it 'bumps the number of users and sets periodic timer' do
@@ -113,11 +92,6 @@ module VCAP::CloudController::Metrics
11392
periodic_updater.setup_updates
11493
end
11594

116-
it 'updates thread count and event machine queues' do
117-
expect(periodic_updater).to receive(:update_thread_info).once
118-
periodic_updater.setup_updates
119-
end
120-
12195
it 'updates the vitals' do
12296
expect(periodic_updater).to receive(:update_vitals).once
12397
periodic_updater.setup_updates
@@ -138,15 +112,16 @@ module VCAP::CloudController::Metrics
138112
periodic_updater.setup_updates
139113
end
140114

141-
context 'when EventMachine periodic_timer tasks are run' do
115+
context 'when Concurrent::TimerTasks are run' do
142116
before do
143117
@periodic_timers = []
144118

145-
allow(EventMachine).to receive(:add_periodic_timer) do |interval, &block|
119+
allow(Concurrent::TimerTask).to receive(:new) do |opts, &block|
146120
@periodic_timers << {
147-
interval:,
148-
block:
121+
interval: opts[:execution_interval],
122+
block: block
149123
}
124+
double('TimerTask', execute: nil, shutdown: nil, kill: nil, running?: false)
150125
end
151126

152127
periodic_updater.setup_updates
@@ -176,44 +151,36 @@ module VCAP::CloudController::Metrics
176151
@periodic_timers[2][:block].call
177152
end
178153

179-
it 'updates thread count and event machine queues' do
180-
expect(periodic_updater).to receive(:catch_error).once.and_call_original
181-
expect(periodic_updater).to receive(:update_thread_info).once
182-
expect(@periodic_timers[3][:interval]).to eq(30)
183-
184-
@periodic_timers[3][:block].call
185-
end
186-
187154
it 'bumps the length of cc failed job queues and sets periodic timer' do
188155
expect(periodic_updater).to receive(:catch_error).once.and_call_original
189156
expect(periodic_updater).to receive(:update_failed_job_count).once
190-
expect(@periodic_timers[4][:interval]).to eq(30)
157+
expect(@periodic_timers[3][:interval]).to eq(30)
191158

192-
@periodic_timers[4][:block].call
159+
@periodic_timers[3][:block].call
193160
end
194161

195162
it 'updates the vitals' do
196163
expect(periodic_updater).to receive(:catch_error).once.and_call_original
197164
expect(periodic_updater).to receive(:update_vitals).once
198-
expect(@periodic_timers[5][:interval]).to eq(30)
165+
expect(@periodic_timers[4][:interval]).to eq(30)
199166

200-
@periodic_timers[5][:block].call
167+
@periodic_timers[4][:block].call
201168
end
202169

203170
it 'updates the log counts' do
204171
expect(periodic_updater).to receive(:catch_error).once.and_call_original
205172
expect(periodic_updater).to receive(:update_log_counts).once
206-
expect(@periodic_timers[6][:interval]).to eq(30)
173+
expect(@periodic_timers[5][:interval]).to eq(30)
207174

208-
@periodic_timers[6][:block].call
175+
@periodic_timers[5][:block].call
209176
end
210177

211178
it 'updates the task stats' do
212179
expect(periodic_updater).to receive(:catch_error).once.and_call_original
213180
expect(periodic_updater).to receive(:update_task_stats).once
214-
expect(@periodic_timers[7][:interval]).to eq(30)
181+
expect(@periodic_timers[6][:interval]).to eq(30)
215182

216-
@periodic_timers[7][:block].call
183+
@periodic_timers[6][:block].call
217184
end
218185
end
219186
end
@@ -538,75 +505,6 @@ module VCAP::CloudController::Metrics
538505
end
539506
end
540507

541-
describe '#update_thread_info' do
542-
before do
543-
allow(statsd_updater).to receive(:update_thread_info_thin)
544-
allow(prometheus_updater).to receive(:update_thread_info_thin)
545-
end
546-
547-
it 'contains EventMachine data and send it to all updaters' do
548-
expected_thread_info = {
549-
thread_count: Thread.list.size,
550-
event_machine: {
551-
connection_count: 123,
552-
threadqueue: {
553-
size: 20,
554-
num_waiting: 0
555-
},
556-
resultqueue: {
557-
size: 0,
558-
num_waiting: 1
559-
}
560-
}
561-
}
562-
563-
periodic_updater.update_thread_info
564-
565-
expect(statsd_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
566-
expect(prometheus_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
567-
end
568-
569-
context 'when resultqueue and/or threadqueue is not a queue' do
570-
let(:resultqueue) { [] }
571-
let(:threadqueue) { nil }
572-
573-
it 'does not blow up' do
574-
expected_thread_info = {
575-
thread_count: Thread.list.size,
576-
event_machine: {
577-
connection_count: 123,
578-
threadqueue: {
579-
size: 0,
580-
num_waiting: 0
581-
},
582-
resultqueue: {
583-
size: 0,
584-
num_waiting: 0
585-
}
586-
}
587-
}
588-
589-
periodic_updater.update_thread_info
590-
591-
expect(statsd_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
592-
expect(prometheus_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
593-
end
594-
end
595-
596-
context 'when Puma is configured as webserver' do
597-
before do
598-
TestConfig.override(webserver: 'puma')
599-
end
600-
601-
it 'does not send EventMachine data to updaters' do
602-
periodic_updater.update_thread_info
603-
604-
expect(statsd_updater).not_to have_received(:update_thread_info_thin)
605-
expect(prometheus_updater).not_to have_received(:update_thread_info_thin)
606-
end
607-
end
608-
end
609-
610508
describe '#update_vitals' do
611509
before do
612510
allow(statsd_updater).to receive(:update_vitals)
@@ -736,7 +634,6 @@ module VCAP::CloudController::Metrics
736634
expect(periodic_updater).to receive(:update_user_count).once
737635
expect(periodic_updater).to receive(:update_job_queue_length).once
738636
expect(periodic_updater).to receive(:update_job_queue_load).once
739-
expect(periodic_updater).to receive(:update_thread_info).once
740637
expect(periodic_updater).to receive(:update_failed_job_count).once
741638
expect(periodic_updater).to receive(:update_vitals).once
742639
expect(periodic_updater).to receive(:update_log_counts).once

0 commit comments

Comments
 (0)