Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
baelter committed Feb 2, 2017
1 parent f0e71c6 commit 08ae182
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 30 deletions.
7 changes: 4 additions & 3 deletions lib/amqp_actors/actor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ def backend(clazz, &blk)
end

def start_backend(default_backend)
@backend_instance = (@backend || default_backend).new(&@backend_block)
@backend_instance.start_actor(self)
@backend_instance = (@backend || default_backend).new(self, &@backend_block)
@inbox = @backend_instance.start
@running = true
end

def running_threads
@backend_instance.running_threads(self)
@backend_instance.running_threads
end

def message_type(type)
Expand Down
8 changes: 4 additions & 4 deletions lib/amqp_actors/backend/amqp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ def configure(cfg)
end

# Instance methods
def initialize(&blk)
def initialize(type, &blk)
instance_eval(&blk) if block_given?
@type = type
@pub_url ||= self.class.pub_url
@sub_url ||= self.class.sub_url
self.class.connections ||= {}
Expand Down Expand Up @@ -53,16 +54,15 @@ def exchange(x)
@exchange = x
end

def start_actor(type)
def start
@pub_conn.start
@sub_conn.start
cfg = {
exchange: @exchange,
routing_keys: @routing_keys,
queue_name: @queue_name,
}
type.inbox = Channel.new(@pub_conn, @sub_conn, type, cfg)
type.running = true
@inbox = Channel.new(@pub_conn, @sub_conn, @type, cfg)
end

def stop
Expand Down
26 changes: 11 additions & 15 deletions lib/amqp_actors/backend/memory.rb
Original file line number Diff line number Diff line change
@@ -1,38 +1,34 @@
module AmqpActors
class MemoryQueues
def initialize
@threads = {}
def initialize(type, &_blk)
@threads = []
@type = type
end

# @TODO remove dead threads from the array periodically
def start_actor(type)
type.inbox = Queue.new
type.running = true
@threads[type] = Array.new(type.thread_count) do
def start
@threads = Array.new(@type.thread_count) do
Thread.new do
loop do
break unless System.running? && type.running
break unless System.running? && @type.running
begin
msg = type.inbox.pop
type.new.push(msg)
msg = @type.inbox.pop
@type.new.push(msg)
rescue => e
print "[ERROR] \n #{e.backtrace.join("\n ")}\n"
end
end
end
end
Queue.new
end

def stop
# noop
end

def running_threads(type = nil)
if type
@threads[type].select(&:alive?).count
else
@threads.values.flatten.select(&:alive?).count
end
def running_threads
@threads.select(&:alive?).count
end
end
end
8 changes: 0 additions & 8 deletions lib/amqp_actors/system.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ def self.configure(&blk)
instance_eval(&blk)
end

def self.amqp_url(url = nil)
if url
@amqp_url = url
else
@amqp_url
end
end

def self.add(actor)
@actors.add(actor)
actor.start_backend(@default_backend) if @running
Expand Down

0 comments on commit 08ae182

Please sign in to comment.