Skip to content

Commit

Permalink
remove todos, enable gzip
Browse files Browse the repository at this point in the history
  • Loading branch information
baelter committed Feb 7, 2017
1 parent ae84c50 commit 3552cc2
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 24 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,3 @@ end
TestActor.push 0
TestActor.output # => 1
```

## TODO
* Add AMQP as backend
9 changes: 4 additions & 5 deletions lib/amqp_actors/actor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ def die
self.class.die
end

# @TODO these methods should not be part of DSL
def push(msg)
instance_exec(msg, &self.class.act_block)
end
Expand All @@ -23,15 +22,14 @@ def act(&block)
@act_block = block
end

def push(msg, block: false)
def push(msg)
unless valid_types?(msg)
raise ArgumentError, "Illegal message type, expected #{@message_type}"
end
raise NotConfigured, 'you must provide an act block' unless @act_block
@backend_instance&.push(msg, block: block) unless @backend_instance&.closed? && @running
@backend_instance&.push(msg) unless @backend_instance&.closed? && @running
end

# @TODO these should be private to the module
def backend(clazz, &blk)
raise ArgumentError, "Backend must implement :start and :stop" unless valid_backend? clazz
@backend = clazz
Expand Down Expand Up @@ -79,7 +77,8 @@ def helpers(&block)
class_eval(&block) if block_given?
end

# @TODO these methods should not be part of DSL
private

def valid_types?(type)
return true if @message_type.nil?
if type.is_a?(Enumerable)
Expand Down
37 changes: 23 additions & 14 deletions lib/amqp_actors/backend/amqp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def closed?
@inbox.closed?
end

def push(msg, block: false)
def push(msg)
@inbox.push_to(@inbox.name, msg)
end

Expand Down Expand Up @@ -146,11 +146,11 @@ def publish(rks, msg)
rks.each { |rk| publish(rk, msg) }
return
end
@content_handler = ContentHandler.resolve_content_handler(msg, @cfg[:content_type])
@exchange.publish @content_handler.encode(msg), {
content_handler = ContentHandler.resolve_content_handler(msg, @cfg[:content_type])
@exchange.publish content_handler.encode(msg), {
routing_key: rks,
persistent: true,
content_type: @content_handler.encoding,
content_type: content_handler.encoding,
}
success = @chan.wait_for_confirms
raise "[ERROR] error=publish reason=not-confirmed" unless success
Expand All @@ -175,7 +175,6 @@ def initialize(type, conn, cfg)
@type = type
@qname = "AmqpActor::#{cfg[:queue_name] || snake_case(type.to_s)}"
@exchange_type = cfg[:exchange] || 'amq.topic'
@routing_keys = (cfg[:routing_keys] || []) << @qname
@cfg = cfg
end

Expand All @@ -185,18 +184,21 @@ def create_sub_channel
ch
end

def subscribe
@chan = create_sub_channel
def subscribe(qname = nil)
@chan ||= create_sub_channel
qname ||= @qname
x = @chan.topic(@exchange_type, durable: true)
@q = @chan.queue @qname, durable: true
@routing_keys.each { |rk| @q.bind(x, routing_key: rk) }
@q = @chan.queue qname, durable: true
routing_keys = (@cfg[:routing_keys] || []) << qname
routing_keys.each { |rk| @q.bind(x, routing_key: rk) }
@q.subscribe(manual_ack: true, block: false, exclusive: true) do |delivery, headers, body|
begin
raw_data = decode(headers.content_encoding, body)
content_handler = ContentHandler
.resolve_content_handler(raw_data, @cfg[:content_type] || headers.content_type)
msg = content_handler.decode(raw_data)
@type.new.push msg
data = parse(body, headers)
if block_given?
yield delivery, headers, data
else
@type.new.push(parse(body, headers))
end
@chan.acknowledge(delivery.delivery_tag, false)
rescue => e
print "[ERROR] #{e.inspect} #{e.message}\n #{e.backtrace.join("\n ")}\n"
Expand Down Expand Up @@ -252,6 +254,13 @@ def decode(content_encoding, body)
body
end
end

def parse(body, headers)
raw_data = decode(headers.content_encoding, body)
content_handler = ContentHandler
.resolve_content_handler(raw_data, @cfg[:content_type] || headers.content_type)
content_handler.decode(raw_data)
end
end

module ContentHandler
Expand Down
3 changes: 1 addition & 2 deletions lib/amqp_actors/backend/memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ def initialize(type, &_blk)
@type = type
end

# @TODO remove dead threads from the array periodically
def start
@threads = Array.new(@type.thread_count) do
Thread.new do
Expand All @@ -24,7 +23,7 @@ def start
self
end

def push(msg, block: false)
def push(msg)
@inbox.push(msg)
end

Expand Down

0 comments on commit 3552cc2

Please sign in to comment.