From 3552cc2ce59066e9dc073e8c44e816618cdb589d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20B=C3=A4lter?= Date: Tue, 7 Feb 2017 11:09:05 +0100 Subject: [PATCH] remove todos, enable gzip --- README.md | 3 --- lib/amqp_actors/actor.rb | 9 ++++---- lib/amqp_actors/backend/amqp.rb | 37 +++++++++++++++++++------------ lib/amqp_actors/backend/memory.rb | 3 +-- 4 files changed, 28 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 2f7fea9..6c12207 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,3 @@ end TestActor.push 0 TestActor.output # => 1 ``` - -## TODO -* Add AMQP as backend diff --git a/lib/amqp_actors/actor.rb b/lib/amqp_actors/actor.rb index 175dec7..6d551bb 100644 --- a/lib/amqp_actors/actor.rb +++ b/lib/amqp_actors/actor.rb @@ -5,7 +5,6 @@ def die self.class.die end - # @TODO these methods should not be part of DSL def push(msg) instance_exec(msg, &self.class.act_block) end @@ -23,15 +22,14 @@ def act(&block) @act_block = block end - def push(msg, block: false) + def push(msg) unless valid_types?(msg) raise ArgumentError, "Illegal message type, expected #{@message_type}" end raise NotConfigured, 'you must provide an act block' unless @act_block - @backend_instance&.push(msg, block: block) unless @backend_instance&.closed? && @running + @backend_instance&.push(msg) unless @backend_instance&.closed? && @running end - # @TODO these should be private to the module def backend(clazz, &blk) raise ArgumentError, "Backend must implement :start and :stop" unless valid_backend? clazz @backend = clazz @@ -79,7 +77,8 @@ def helpers(&block) class_eval(&block) if block_given? end - # @TODO these methods should not be part of DSL + private + def valid_types?(type) return true if @message_type.nil? if type.is_a?(Enumerable) diff --git a/lib/amqp_actors/backend/amqp.rb b/lib/amqp_actors/backend/amqp.rb index 404ae49..123bf89 100644 --- a/lib/amqp_actors/backend/amqp.rb +++ b/lib/amqp_actors/backend/amqp.rb @@ -74,7 +74,7 @@ def closed? @inbox.closed? end - def push(msg, block: false) + def push(msg) @inbox.push_to(@inbox.name, msg) end @@ -146,11 +146,11 @@ def publish(rks, msg) rks.each { |rk| publish(rk, msg) } return end - @content_handler = ContentHandler.resolve_content_handler(msg, @cfg[:content_type]) - @exchange.publish @content_handler.encode(msg), { + content_handler = ContentHandler.resolve_content_handler(msg, @cfg[:content_type]) + @exchange.publish content_handler.encode(msg), { routing_key: rks, persistent: true, - content_type: @content_handler.encoding, + content_type: content_handler.encoding, } success = @chan.wait_for_confirms raise "[ERROR] error=publish reason=not-confirmed" unless success @@ -175,7 +175,6 @@ def initialize(type, conn, cfg) @type = type @qname = "AmqpActor::#{cfg[:queue_name] || snake_case(type.to_s)}" @exchange_type = cfg[:exchange] || 'amq.topic' - @routing_keys = (cfg[:routing_keys] || []) << @qname @cfg = cfg end @@ -185,18 +184,21 @@ def create_sub_channel ch end - def subscribe - @chan = create_sub_channel + def subscribe(qname = nil) + @chan ||= create_sub_channel + qname ||= @qname x = @chan.topic(@exchange_type, durable: true) - @q = @chan.queue @qname, durable: true - @routing_keys.each { |rk| @q.bind(x, routing_key: rk) } + @q = @chan.queue qname, durable: true + routing_keys = (@cfg[:routing_keys] || []) << qname + routing_keys.each { |rk| @q.bind(x, routing_key: rk) } @q.subscribe(manual_ack: true, block: false, exclusive: true) do |delivery, headers, body| begin - raw_data = decode(headers.content_encoding, body) - content_handler = ContentHandler - .resolve_content_handler(raw_data, @cfg[:content_type] || headers.content_type) - msg = content_handler.decode(raw_data) - @type.new.push msg + data = parse(body, headers) + if block_given? + yield delivery, headers, data + else + @type.new.push(parse(body, headers)) + end @chan.acknowledge(delivery.delivery_tag, false) rescue => e print "[ERROR] #{e.inspect} #{e.message}\n #{e.backtrace.join("\n ")}\n" @@ -252,6 +254,13 @@ def decode(content_encoding, body) body end end + + def parse(body, headers) + raw_data = decode(headers.content_encoding, body) + content_handler = ContentHandler + .resolve_content_handler(raw_data, @cfg[:content_type] || headers.content_type) + content_handler.decode(raw_data) + end end module ContentHandler diff --git a/lib/amqp_actors/backend/memory.rb b/lib/amqp_actors/backend/memory.rb index 396bdbb..078b842 100644 --- a/lib/amqp_actors/backend/memory.rb +++ b/lib/amqp_actors/backend/memory.rb @@ -5,7 +5,6 @@ def initialize(type, &_blk) @type = type end - # @TODO remove dead threads from the array periodically def start @threads = Array.new(@type.thread_count) do Thread.new do @@ -24,7 +23,7 @@ def start self end - def push(msg, block: false) + def push(msg) @inbox.push(msg) end