Skip to content

Commit

Permalink
Merge pull request resque#85 from PA to nevans
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuaflanagan committed Nov 5, 2014
2 parents 71643f5 + 35e70e8 commit 7848c41
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 35 deletions.
33 changes: 32 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ SIGNALS

The pool manager responds to the following signals:

* `HUP` - reload the config file, reload logfiles, restart all workers.
* `HUP` - reset config loader (reload the config file), reload logfiles, restart all workers.
* `QUIT` - gracefully shut down workers (via `QUIT`) and shutdown the manager
after all workers are done.
* `INT` - gracefully shut down workers (via `QUIT`) and immediately shutdown manager
Expand All @@ -132,6 +132,37 @@ defined by Resque 1.22 and above. See http://hone.heroku.com/resque/2012/08/21/r
for details, overriding any command-line configuration for `TERM`. Setting `TERM_CHILD` tells
us you know what you're doing.

Custom Configuration Loader
---------------------------

If the static YAML file configuration approach does not meet you needs, you can
specify a custom configuration loader.

Set the `config_loader` class variable on Resque::Pool to an object that
responds to `#call` (which can simply be a lambda/Proc). The class attribute
needs to be set before starting the pool. This is usually accomplished by
in the `resque:pool:setup` rake task, as described above.

For example, if you wanted to vary the number of worker processes based on a
value stored in Redis, you could do something like:

```ruby
task resque:pool:setup do
Resque::Pool.config_loader = lambda {|env|
worker_count = Redis.current.get("pool_workers_#{env}").to_i
{"queueA,queueB" => worker_count }
}
end
```

The configuration loader's `#call` method will be invoked every time a worker
completes a job. This allows the configuration to constantly change, for example,
to scale the number of workers up/down based on different conditions.
If the response is generally static, the loader may want to cache the value it
returns. It can optionally implement a `#reset!` method, which will be invoked
when the HUP signal is received, allowing the loader to flush its cache, or
perform any other re-initialization.

Other Features
--------------

Expand Down
59 changes: 26 additions & 33 deletions lib/resque/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'resque/pool/version'
require 'resque/pool/logging'
require 'resque/pool/pooled_worker'
require 'resque/pool/file_or_hash_loader'
require 'erb'
require 'fcntl'
require 'yaml'
Expand All @@ -18,10 +19,11 @@ class Pool
include Logging
extend Logging
attr_reader :config
attr_reader :config_loader
attr_reader :workers

def initialize(config)
init_config(config)
def initialize(config_loader=nil)
init_config(config_loader)
@workers = Hash.new { |workers, queues| workers[queues] = {} }
procline "(initialized)"
end
Expand Down Expand Up @@ -56,10 +58,9 @@ def call_after_prefork!
end

# }}}
# Config: class methods to start up the pool using the default config {{{
# Config: class methods to start up the pool using the config loader {{{

@config_files = ["resque-pool.yml", "config/resque-pool.yml"]
class << self; attr_accessor :config_files, :app_name; end
class << self; attr_accessor :config_loader, :app_name; end

def self.app_name
@app_name ||= File.basename(Dir.pwd)
Expand All @@ -81,46 +82,37 @@ def self.single_process_group
)
end

def self.choose_config_file
if ENV["RESQUE_POOL_CONFIG"]
ENV["RESQUE_POOL_CONFIG"]
else
@config_files.detect { |f| File.exist?(f) }
end
end

def self.run
if GC.respond_to?(:copy_on_write_friendly=)
GC.copy_on_write_friendly = true
end
Resque::Pool.new(choose_config_file).start.join
create_configured.start.join
end

# }}}
# Config: load config and config file {{{

def config_file
@config_file || (!@config && ::Resque::Pool.choose_config_file)
def self.create_configured
Resque::Pool.new(config_loader)
end

def init_config(config)
case config
when String, nil
@config_file = config
# }}}
# Config: store loader and load config {{{

def init_config(loader)
case loader
when String, Hash, nil
@config_loader = FileOrHashLoader.new(loader)
else
@config = config.dup
@config_loader = loader
end
load_config
end

def load_config
if config_file
@config = YAML.load(ERB.new(IO.read(config_file)).result)
else
@config ||= {}
end
environment and @config[environment] and config.merge!(@config[environment])
config.delete_if {|key, value| value.is_a? Hash }
@config = config_loader.call(environment)
end

def reset_config
config_loader.reset! if config_loader.respond_to?(:reset!)
load_config
end

def environment
Expand Down Expand Up @@ -189,8 +181,8 @@ def handle_sig_queue!
log "#{signal}: sending to all workers"
signal_all_workers(signal)
when :HUP
log "HUP: reload config file and reload logfiles"
load_config
log "HUP: reset configuration and reload logfiles"
reset_config
Logging.reopen_logs!
log "HUP: gracefully shutdown old children (which have old logfiles open)"
if term_child
Expand Down Expand Up @@ -293,6 +285,7 @@ def join
break if handle_sig_queue! == :break
if sig_queue.empty?
master_sleep
load_config
maintain_worker_count
end
procline("managing #{all_pids.inspect}")
Expand Down
55 changes: 55 additions & 0 deletions lib/resque/pool/file_or_hash_loader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
class FileOrHashLoader
def initialize(filename_or_hash=nil)
case filename_or_hash
when String, nil
@filename = filename_or_hash
when Hash
@static_config = filename_or_hash.dup
else
raise "#{self.class} cannot be initialized with #{filename_or_hash.inspect}"
end
end

def call(environment)
@config ||= load_config_from_file(environment)
end

def reset!
@config = nil
end

private

def load_config_from_file(environment)
if @static_config
new_config = @static_config
else
filename = config_filename
new_config = load_config filename
end
apply_environment new_config, environment
end

def apply_environment(config, environment)
environment and config[environment] and config.merge!(config[environment])
config.delete_if {|key, value| value.is_a? Hash }
end

def config_filename
@filename || choose_config_file
end

def load_config(filename)
return {} unless filename
YAML.load(ERB.new(IO.read(filename)).result)
end

CONFIG_FILES = ["resque-pool.yml", "config/resque-pool.yml"]
def choose_config_file
if ENV["RESQUE_POOL_CONFIG"]
ENV["RESQUE_POOL_CONFIG"]
else
CONFIG_FILES.detect { |f| File.exist?(f) }
end
end
end
102 changes: 101 additions & 1 deletion spec/resque_pool_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'spec_helper'

RSpec.configure do |config|
config.include PoolSpecHelpers
config.after {
Object.send(:remove_const, :RAILS_ENV) if defined? RAILS_ENV
ENV.delete 'RACK_ENV'
Expand Down Expand Up @@ -158,12 +159,111 @@ module Rails; end

context "when a custom file is specified" do
before { ENV["RESQUE_POOL_CONFIG"] = 'spec/resque-pool-custom.yml.erb' }
subject { Resque::Pool.new(Resque::Pool.choose_config_file) }
subject { Resque::Pool.new }
it "should find the right file, and parse the ERB" do
subject.config["foo"].should == 2
end
end

context "when the file changes" do
require 'tempfile'

let(:config_file_path) {
config_file = Tempfile.new("resque-pool-test")
config_file.write "orig: 1"
config_file.close
config_file.path
}

subject {
no_spawn(Resque::Pool.new(config_file_path))
}

it "should not automatically load the changes" do
subject.config.keys.should == ["orig"]

File.open(config_file_path, "w"){|f| f.write "changed: 1"}
subject.config.keys.should == ["orig"]
subject.load_config
subject.config.keys.should == ["orig"]
end

it "should reload the changes on HUP signal" do
subject.config.keys.should == ["orig"]

File.open(config_file_path, "w"){|f| f.write "changed: 1"}
subject.config.keys.should == ["orig"]
subject.load_config
subject.config.keys.should == ["orig"]

simulate_signal subject, :HUP

subject.config.keys.should == ["changed"]
end

end

end

describe Resque::Pool, "the pool configuration custom loader" do
it "should retrieve the config based on the environment" do
custom_loader = double(call: Hash.new)
RAILS_ENV = "env"

Resque::Pool.new(custom_loader)

custom_loader.should have_received(:call).with("env")
end

it "should reset the config loader on HUP" do
custom_loader = double(call: Hash.new, reset!: true)

pool = no_spawn(Resque::Pool.new(custom_loader))
custom_loader.should have_received(:call).once

pool.sig_queue.push :HUP
pool.handle_sig_queue!
custom_loader.should have_received(:reset!)
custom_loader.should have_received(:call).twice
end

it "can be a lambda" do
RAILS_ENV = "fake"
count = 1
pool = no_spawn(Resque::Pool.new(lambda {|env|
{env.reverse => count}
}))
pool.config.should == {"ekaf" => 1}

count = 3
pool.sig_queue.push :HUP
pool.handle_sig_queue!

pool.config.should == {"ekaf" => 3}
end
end

describe "the class-level .config_loader attribute" do
context "when not provided" do
subject { Resque::Pool.create_configured }

it "created pools use config file and hash loading logic" do
subject.config_loader.should be_instance_of FileOrHashLoader
end
end

context "when provided with a custom config loader" do
let(:custom_config_loader) {
double(call: Hash.new)
}
before(:each) { Resque::Pool.config_loader = custom_config_loader }
after(:each) { Resque::Pool.config_loader = nil }
subject { Resque::Pool.create_configured }

it "created pools use the specified config loader" do
subject.config_loader.should == custom_config_loader
end
end
end

describe Resque::Pool, "given after_prefork hook" do
Expand Down
13 changes: 13 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
require 'rspec'
$LOAD_PATH << File.expand_path("../lib", File.dirname(__FILE__))
require 'resque/pool'

module PoolSpecHelpers
def no_spawn(pool)
pool.stub(:spawn_worker!) {}
pool
end

def simulate_signal(pool, signal)
pool.sig_queue.clear
pool.sig_queue.push signal
pool.handle_sig_queue!
end
end

0 comments on commit 7848c41

Please sign in to comment.