Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 20 additions & 23 deletions lib/cloud_controller/metrics/periodic_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,27 @@ def initialize(start_time, log_counter, logger, statsd_updater, prometheus_updat
end

def setup_updates
update!
@update_tasks = []
@update_tasks << Concurrent::TimerTask.new(execution_interval: 600) { catch_error { update_user_count } }
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_length } }
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_load } }
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_failed_job_count } }
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_vitals } }
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_log_counts } }
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_task_stats } }
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_deploying_count } }
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_webserver_stats } }
setup_task(@update_tasks, 600, :update_user_count)
setup_task(@update_tasks, 30, :update_job_queue_length)
setup_task(@update_tasks, 30, :update_job_queue_load)
setup_task(@update_tasks, 30, :update_failed_job_count)
setup_task(@update_tasks, 30, :update_vitals)
setup_task(@update_tasks, 30, :update_log_counts)
setup_task(@update_tasks, 30, :update_task_stats)
setup_task(@update_tasks, 30, :update_deploying_count)
setup_task(@update_tasks, 30, :update_webserver_stats)
@update_tasks.each(&:execute)
end

def stop_updates
return unless @update_tasks
return true unless @update_tasks

@update_tasks.each(&:shutdown)
end
@update_tasks.each(&:kill) # in-progress tasks will be allowed to complete, enqueued tasks will be dismissed
all_tasks_terminated = true
@update_tasks.each { |task| task.wait_for_termination(1) || (all_tasks_terminated = false) } # wait up to 1 second for each task to terminate

def update!
update_user_count
update_job_queue_length
update_job_queue_load
update_failed_job_count
update_vitals
update_log_counts
update_task_stats
update_deploying_count
update_webserver_stats
all_tasks_terminated # true if all tasks terminated, false if any are still running
end

def catch_error
Expand Down Expand Up @@ -172,5 +163,11 @@ def update_webserver_stats
end
@prometheus_updater.update_webserver_stats_puma(worker_count, worker_stats)
end

private

def setup_task(update_tasks, interval, method_name)
update_tasks << Concurrent::TimerTask.new(execution_interval: interval, interval_type: :fixed_rate, run_now: true) { catch_error { send(method_name) } }
end
end
end
16 changes: 12 additions & 4 deletions lib/cloud_controller/runners/puma_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,20 @@ def start!
private

def stop_periodic_updates
@periodic_updater&.stop_updates
@logger.info('Successfully stopped periodic updates in after_stopped')
return unless @periodic_updater

if @periodic_updater.stop_updates
@logger.info('Successfully stopped periodic updates in after_stopped')
else
@logger.warn('Failed to stop all periodic update tasks in after_stopped')
end
rescue ThreadError
at_exit do
@periodic_updater&.stop_updates
@logger.info('Successfully stopped periodic updates in at_exit')
if @periodic_updater.stop_updates
@logger.info('Successfully stopped periodic updates in at_exit')
else
@logger.warn('Failed to stop all periodic update tasks in at_exit')
end
end
rescue StandardError => e
@logger.error("Failed to stop periodic updates: #{e}\n#{e.backtrace&.join("\n")}")
Expand Down
32 changes: 8 additions & 24 deletions spec/request/internal/metrics_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
10.times do
VCAP::CloudController::User.make
end
CloudController::DependencyLocator.instance.periodic_updater.update!
CloudController::DependencyLocator.instance.periodic_updater.update_user_count
end

it 'reports the total number of users' do
Expand All @@ -54,7 +54,7 @@

context 'cc_vitals' do
it 'reports vitals' do
CloudController::DependencyLocator.instance.periodic_updater.update!
CloudController::DependencyLocator.instance.periodic_updater.update_vitals
get '/internal/v4/metrics', nil

expect(last_response.body).to match(/cc_vitals_num_cores [1-9][0-9]*\.\d+/)
Expand All @@ -71,7 +71,7 @@
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now + 1.day })
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.day })

CloudController::DependencyLocator.instance.periodic_updater.update!
CloudController::DependencyLocator.instance.periodic_updater.update_job_queue_length
end

after do
Expand All @@ -91,27 +91,11 @@
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now })
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now })

CloudController::DependencyLocator.instance.periodic_updater.update!
end

after do
Delayed::Job.dataset.delete
end

it 'includes job queue load metric labelled for each queue' do
get '/internal/v4/metrics', nil

expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 1\.0/)
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 1\.0/)
end
end

context 'cc_job_queue_load_not_ready_to_run_now' do
before do
# jobs with run_at in the future should not be counted towards load
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now + 1.minute })
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.minute })

CloudController::DependencyLocator.instance.periodic_updater.update!
CloudController::DependencyLocator.instance.periodic_updater.update_job_queue_load
end

after do
Expand All @@ -121,8 +105,8 @@
it 'includes job queue load metric labelled for each queue' do
get '/internal/v4/metrics', nil

expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 0\.0/)
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 0\.0/)
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 1\.0/)
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 1\.0/)
end
end

Expand All @@ -132,7 +116,7 @@
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.day })
Delayed::Job.dataset.update(failed_at: Time.now.utc)

CloudController::DependencyLocator.instance.periodic_updater.update!
CloudController::DependencyLocator.instance.periodic_updater.update_failed_job_count
end

after do
Expand Down
138 changes: 70 additions & 68 deletions spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,62 +80,23 @@ module VCAP::CloudController::Metrics
allow(prometheus_updater).to receive(:update_job_queue_load)
allow(prometheus_updater).to receive(:update_failed_job_count)
allow(prometheus_updater).to receive(:update_vitals)
allow(prometheus_updater).to receive(:update_log_counts)
allow(prometheus_updater).to receive(:update_task_stats)
allow(prometheus_updater).to receive(:update_deploying_count)
allow(prometheus_updater).to receive(:update_webserver_stats_puma)
end

it 'bumps the number of users and sets periodic timer' do
expect(periodic_updater).to receive(:update_user_count).once
periodic_updater.setup_updates
end

it 'bumps the length of cc job queues and sets periodic timer' do
expect(periodic_updater).to receive(:update_job_queue_length).once
periodic_updater.setup_updates
end

it 'bumps the load of cc job queues and sets periodic timer' do
expect(periodic_updater).to receive(:update_job_queue_load).once
periodic_updater.setup_updates
end

it 'bumps the length of cc failed job queues and sets periodic timer' do
expect(periodic_updater).to receive(:update_failed_job_count).once
periodic_updater.setup_updates
end

it 'updates the vitals' do
expect(periodic_updater).to receive(:update_vitals).once
periodic_updater.setup_updates
end

it 'updates the log counts' do
expect(periodic_updater).to receive(:update_log_counts).once
periodic_updater.setup_updates
end

it 'updates the task stats' do
expect(periodic_updater).to receive(:update_task_stats).once
periodic_updater.setup_updates
end

it 'updates the deploying count' do
expect(periodic_updater).to receive(:update_deploying_count).once
periodic_updater.setup_updates
end

context 'when Concurrent::TimerTasks are run' do
before do
@periodic_timers = []

allow(Concurrent::TimerTask).to receive(:new) do |opts, &block|
@periodic_timers << {
interval: opts[:execution_interval],
type: opts[:interval_type],
now: opts[:run_now],
block: block
}
double('TimerTask', execute: nil, shutdown: nil, kill: nil, running?: false)
double('TimerTask', execute: nil)
end

periodic_updater.setup_updates
Expand All @@ -145,6 +106,8 @@ module VCAP::CloudController::Metrics
expect(periodic_updater).to receive(:catch_error).once.and_call_original
expect(periodic_updater).to receive(:update_user_count).once
expect(@periodic_timers[0][:interval]).to eq(600)
expect(@periodic_timers[0][:type]).to eq(:fixed_rate)
expect(@periodic_timers[0][:now]).to be(true)

@periodic_timers[0][:block].call
end
Expand All @@ -153,6 +116,8 @@ module VCAP::CloudController::Metrics
expect(periodic_updater).to receive(:catch_error).once.and_call_original
expect(periodic_updater).to receive(:update_job_queue_length).once
expect(@periodic_timers[1][:interval]).to eq(30)
expect(@periodic_timers[1][:type]).to eq(:fixed_rate)
expect(@periodic_timers[1][:now]).to be(true)

@periodic_timers[1][:block].call
end
Expand All @@ -161,6 +126,8 @@ module VCAP::CloudController::Metrics
expect(periodic_updater).to receive(:catch_error).once.and_call_original
expect(periodic_updater).to receive(:update_job_queue_load).once
expect(@periodic_timers[2][:interval]).to eq(30)
expect(@periodic_timers[2][:type]).to eq(:fixed_rate)
expect(@periodic_timers[2][:now]).to be(true)

@periodic_timers[2][:block].call
end
Expand All @@ -169,6 +136,8 @@ module VCAP::CloudController::Metrics
expect(periodic_updater).to receive(:catch_error).once.and_call_original
expect(periodic_updater).to receive(:update_failed_job_count).once
expect(@periodic_timers[3][:interval]).to eq(30)
expect(@periodic_timers[3][:type]).to eq(:fixed_rate)
expect(@periodic_timers[3][:now]).to be(true)

@periodic_timers[3][:block].call
end
Expand All @@ -177,6 +146,8 @@ module VCAP::CloudController::Metrics
expect(periodic_updater).to receive(:catch_error).once.and_call_original
expect(periodic_updater).to receive(:update_vitals).once
expect(@periodic_timers[4][:interval]).to eq(30)
expect(@periodic_timers[4][:type]).to eq(:fixed_rate)
expect(@periodic_timers[4][:now]).to be(true)

@periodic_timers[4][:block].call
end
Expand All @@ -185,6 +156,8 @@ module VCAP::CloudController::Metrics
expect(periodic_updater).to receive(:catch_error).once.and_call_original
expect(periodic_updater).to receive(:update_log_counts).once
expect(@periodic_timers[5][:interval]).to eq(30)
expect(@periodic_timers[5][:type]).to eq(:fixed_rate)
expect(@periodic_timers[5][:now]).to be(true)

@periodic_timers[5][:block].call
end
Expand All @@ -193,30 +166,75 @@ module VCAP::CloudController::Metrics
expect(periodic_updater).to receive(:catch_error).once.and_call_original
expect(periodic_updater).to receive(:update_task_stats).once
expect(@periodic_timers[6][:interval]).to eq(30)
expect(@periodic_timers[6][:type]).to eq(:fixed_rate)
expect(@periodic_timers[6][:now]).to be(true)

@periodic_timers[6][:block].call
end

it 'updates the deploying count' do
expect(periodic_updater).to receive(:catch_error).once.and_call_original
expect(periodic_updater).to receive(:update_deploying_count).once
expect(@periodic_timers[7][:interval]).to eq(30)
expect(@periodic_timers[7][:type]).to eq(:fixed_rate)
expect(@periodic_timers[7][:now]).to be(true)

@periodic_timers[7][:block].call
end

it 'updates the webserver stats' do
expect(periodic_updater).to receive(:catch_error).once.and_call_original
expect(periodic_updater).to receive(:update_webserver_stats).once
expect(@periodic_timers[8][:interval]).to eq(30)
expect(@periodic_timers[8][:type]).to eq(:fixed_rate)
expect(@periodic_timers[8][:now]).to be(true)

@periodic_timers[8][:block].call
end
end

context 'when PeriodicUpdater is shut down' do
it 'does nothing when updates have not been setup' do
expect { periodic_updater.stop_updates }.not_to raise_error
end

it 'shuts down all timer tasks after setup_updates' do
timer_doubles = []
allow(Concurrent::TimerTask).to receive(:new) do |*|
dbl = double('TimerTask', execute: nil, shutdown: nil)
timer_doubles << dbl
dbl
context 'when Concurrent::TimerTasks are stopped' do
let(:tasks) { [] }
let(:wait_for_termination_response) { nil }

before do
allow(Concurrent::TimerTask).to receive(:new) do |*|
dbl = double('TimerTask', execute: nil, kill: nil, wait_for_termination: wait_for_termination_response)
tasks << dbl
dbl
end

periodic_updater.setup_updates
end

periodic_updater.setup_updates
context 'when tasks are terminated in time' do
let(:wait_for_termination_response) { true }

expect(timer_doubles.size).to eq(9)
expect(timer_doubles).to all(receive(:shutdown).once)
it 'stops all tasks and returns true' do
expect(tasks.size).to eq(9)
expect(tasks).to all(receive(:kill).once)
expect(tasks).to all(receive(:wait_for_termination).with(1).once)

periodic_updater.stop_updates
expect(periodic_updater.stop_updates).to be(true)
end
end

context 'when tasks are not terminated in time' do
let(:wait_for_termination_response) { false }

it 'stops all tasks and returns false' do
expect(tasks.size).to eq(9)
expect(tasks).to all(receive(:kill).once)
expect(tasks).to all(receive(:wait_for_termination).with(1).once)

expect(periodic_updater.stop_updates).to be(false)
end
end
end
end
end
Expand Down Expand Up @@ -642,22 +660,6 @@ module VCAP::CloudController::Metrics
end
end

describe '#update!' do
it 'calls all update methods' do
expect(periodic_updater).to receive(:update_user_count).once
expect(periodic_updater).to receive(:update_job_queue_length).once
expect(periodic_updater).to receive(:update_job_queue_load).once
expect(periodic_updater).to receive(:update_failed_job_count).once
expect(periodic_updater).to receive(:update_vitals).once
expect(periodic_updater).to receive(:update_log_counts).once
expect(periodic_updater).to receive(:update_task_stats).once
expect(periodic_updater).to receive(:update_deploying_count).once
expect(periodic_updater).to receive(:update_webserver_stats).once

periodic_updater.update!
end
end

describe '#catch_error' do
it 'calls a block' do
was_called = false
Expand Down
Loading