From 14a5c2478d59a1d18728f4fe54a4afdb1ac369f0 Mon Sep 17 00:00:00 2001 From: Ryan Brunner Date: Wed, 13 Jan 2016 08:56:27 -0500 Subject: [PATCH 1/9] Refactor payloads into a separate object --- lib/multiple_man.rb | 2 + lib/multiple_man/listeners/listener.rb | 28 +++++----- lib/multiple_man/model_populator.rb | 9 ++- lib/multiple_man/payload/v1.rb | 35 ++++++++++++ lib/multiple_man/subscribers/base.rb | 8 +-- .../subscribers/model_subscriber.rb | 15 ++--- spec/listeners/listener_spec.rb | 21 ++----- spec/model_populator_spec.rb | 14 +++-- spec/payload/v1_spec.rb | 55 +++++++++++++++++++ spec/subscribers/model_subscriber_spec.rb | 22 ++++---- spec/subscribers/registry_spec.rb | 6 +- 11 files changed, 147 insertions(+), 68 deletions(-) create mode 100644 lib/multiple_man/payload/v1.rb create mode 100644 spec/payload/v1_spec.rb diff --git a/lib/multiple_man.rb b/lib/multiple_man.rb index 1a01653..67f03ee 100644 --- a/lib/multiple_man.rb +++ b/lib/multiple_man.rb @@ -25,6 +25,8 @@ module MultipleMan require 'multiple_man/channel_maintenance/gc' require 'multiple_man/channel_maintenance/reaper' + require 'multiple_man/payload/v1' + def self.logger configuration.logger end diff --git a/lib/multiple_man/listeners/listener.rb b/lib/multiple_man/listeners/listener.rb index dd5c5dd..4818959 100644 --- a/lib/multiple_man/listeners/listener.rb +++ b/lib/multiple_man/listeners/listener.rb @@ -32,24 +32,24 @@ def init_connection attr_accessor :subscription, :connection def listen - MultipleMan.logger.info "Listening for #{subscription.klass} with routing key #{routing_key}." - queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, _, payload| - process_message(delivery_info, payload) + queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, properties, payload| + parsed_payload = MultipleMan::Payload::V1.new(delivery_info, properties, JSON.parse(payload).with_indifferent_access) + + begin + process_message(parsed_payload) + rescue Exception => ex + handle_error(ex, delivery_info) + else + MultipleMan.logger.debug " Successfully processed!" + queue.channel.acknowledge(delivery_info.delivery_tag, false) + end end end - def process_message(delivery_info, payload) - MultipleMan.logger.info "Processing message for #{delivery_info.routing_key}." - begin - payload = JSON.parse(payload).with_indifferent_access - subscription.send(operation(delivery_info, payload), payload) - rescue ex - handle_error(ex, delivery_info) - else - MultipleMan.logger.debug " Successfully processed!" - queue.channel.acknowledge(delivery_info.delivery_tag, false) - end + def process_message(payload) + MultipleMan.logger.info "Processing message for #{payload}." + subscription.send(payload.operation, payload) end def handle_error(ex, delivery_info) diff --git a/lib/multiple_man/model_populator.rb b/lib/multiple_man/model_populator.rb index f33d529..4a179e5 100644 --- a/lib/multiple_man/model_populator.rb +++ b/lib/multiple_man/model_populator.rb @@ -7,10 +7,9 @@ def initialize(record, fields) end def populate(payload) - data = payload[:id].merge(payload[:data]) - fields_for(data).each do |field| + fields_for(payload).each do |field| source, dest = field.is_a?(Array) ? field : [field, field] - populate_field(dest, data[source]) + populate_field(dest, payload[source]) end record end @@ -37,8 +36,8 @@ def populate_field(field, value) end end - def fields_for(data) - fields || data.keys + def fields_for(payload) + fields || payload.keys end end end diff --git a/lib/multiple_man/payload/v1.rb b/lib/multiple_man/payload/v1.rb new file mode 100644 index 0000000..d847e8f --- /dev/null +++ b/lib/multiple_man/payload/v1.rb @@ -0,0 +1,35 @@ +module MultipleMan::Payload + class V1 + def initialize(delivery_info, properties, payload) + self.payload = payload + self.delivery_info = delivery_info + end + + def keys + (payload['data'].keys + payload['id'].keys).uniq + end + + def [](value) + payload['data'][value.to_s] || payload['id'][value.to_s] + end + + def identify_by + if payload['id'].is_a?(Hash) + payload['id'] + else + {'multiple_man_identifier' => payload['id']} + end + end + + def operation + payload['operation'] || delivery_info.routing_key.split('.').last + end + + def to_s + delivery_info.routing_key + end + + private + attr_accessor :payload, :delivery_info + end +end diff --git a/lib/multiple_man/subscribers/base.rb b/lib/multiple_man/subscribers/base.rb index c6ff924..01c920c 100644 --- a/lib/multiple_man/subscribers/base.rb +++ b/lib/multiple_man/subscribers/base.rb @@ -7,19 +7,19 @@ def initialize(klass) attr_reader :klass - def create(_) + def create(payload) # noop end - def update(_) + def update(payload) # noop end - def destroy(_) + def destroy(payload) # noop end - def seed(_) + def seed(payload) # noop end diff --git a/lib/multiple_man/subscribers/model_subscriber.rb b/lib/multiple_man/subscribers/model_subscriber.rb index ca70f03..806e432 100644 --- a/lib/multiple_man/subscribers/model_subscriber.rb +++ b/lib/multiple_man/subscribers/model_subscriber.rb @@ -10,9 +10,8 @@ def initialize(klass, options) attr_accessor :options def create(payload) - id = payload[:id] - model = find_model(id) - MultipleMan::ModelPopulator.new(model, options[:fields]).populate(id: find_conditions(id), data: payload[:data]) + model = find_model(payload) + MultipleMan::ModelPopulator.new(model, options[:fields]).populate(payload) model.save! end @@ -20,18 +19,14 @@ def create(payload) alias_method :seed, :create def destroy(payload) - model = find_model(payload[:id]) + model = find_model(payload) model.destroy! end private - def find_model(id) - model_class.where(find_conditions(id)).first || model_class.new - end - - def find_conditions(id) - id.kind_of?(Hash) ? cleanse_id(id) : {multiple_man_identifier: id} + def find_model(payload) + model_class.where(cleanse_id(payload.identify_by)).first || model_class.new end def cleanse_id(hash) diff --git a/spec/listeners/listener_spec.rb b/spec/listeners/listener_spec.rb index 841dd9b..776aa51 100644 --- a/spec/listeners/listener_spec.rb +++ b/spec/listeners/listener_spec.rb @@ -44,9 +44,9 @@ class MockClass2; end subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object listener = described_class.new(subscriber) - connection_stub.should_receive(:acknowledge) - subscriber.should_receive(:create).with({"a" => 1, "b" => 2}) - listener.process_message(OpenStruct.new(routing_key: "app.MockClass1.create"), '{"a":1,"b":2}') + subscriber.should_receive(:create).with(instance_of(MultipleMan::Payload::V1)) + + listener.process_message(MultipleMan::Payload::V1.new(double(:delivery_info, routing_key: 'app.MockClass1.create'), nil, "data" => {'a' => 1, 'b' => 2})) end specify "process_message should use the payload to determine the operation if it's available" do @@ -55,21 +55,8 @@ class MockClass2; end subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object listener = described_class.new(subscriber) - connection_stub.should_receive(:acknowledge) subscriber.should_receive(:create) - listener.process_message(OpenStruct.new(routing_key: "some random routing key"), '{"operation":"create","data":{"a":1,"b":2}}') - end - - it "should nack on failure" do - connection_stub = double(MultipleMan::Connection).as_null_object - MultipleMan::Connection.stub(:new).and_return(connection_stub) - subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object - listener = described_class.new(subscriber) - - connection_stub.should_receive(:nack) - MultipleMan.should_receive(:error) - subscriber.should_receive(:create).with({"a" => 1, "b" => 2}).and_raise("fail!") - listener.process_message(OpenStruct.new(routing_key: "app.MockClass1.create"), '{"a":1,"b":2}') + listener.process_message(MultipleMan::Payload::V1.new(double(:delivery_info, routing_key: 'app.MockClass1, update'), nil, "operation" => "create", "data" => {'a' => 1, 'b' => 2})) end end diff --git a/spec/model_populator_spec.rb b/spec/model_populator_spec.rb index b1998e9..7335ab3 100644 --- a/spec/model_populator_spec.rb +++ b/spec/model_populator_spec.rb @@ -7,10 +7,14 @@ class MockModel describe "populate" do let(:model) { MockModel.new } - let(:data) { { a: 1, b: 2 } } - let(:id) { { multiple_man_identifier: 'app_1' }} + let(:payload) { MultipleMan::Payload::V1.new(nil, nil, { + 'id' => id, + 'data' => data + })} + let(:data) { { 'a' => 1, 'b' => 2 } } + let(:id) { { 'multiple_man_identifier' => 'app_1' }} let(:fields) { nil } - subject { described_class.new(model, fields).populate(id: id, data: data) } + subject { described_class.new(model, fields).populate(payload) } its(:multiple_man_identifier) { should == 'app_1' } @@ -38,7 +42,7 @@ class MockModel let(:model) { Class.new do attr_accessor :source_id, :id end.new } - let(:data) { { id: 1 }} + let(:data) { { 'id' => 1 }} its(:source_id) { should == 1 } its(:id) { should be_nil } @@ -47,7 +51,7 @@ class MockModel let(:model) { Class.new do attr_accessor :id end.new } - let(:data) { { id: 1 }} + let(:data) { { 'id' => 1 }} its(:id) { should == 1 } end diff --git a/spec/payload/v1_spec.rb b/spec/payload/v1_spec.rb new file mode 100644 index 0000000..9053978 --- /dev/null +++ b/spec/payload/v1_spec.rb @@ -0,0 +1,55 @@ +require 'spec_helper' + +describe MultipleMan::Payload::V1 do + let(:delivery_info) { + double(:delivery_info, routing_key: 'blah.blah.create') + } + + let(:payload) { + described_class.new(delivery_info, nil, { + 'id' => { + 'id' => 1, + 'database' => 'app' + }, + 'data' => { + 'id' => 1, + 'database' => 'app', + 'foo' => 'bar' + } + }) + } + + it "should return appropriate identify_by keys" do + expect(payload.identify_by).to eq({'id' => 1, 'database' => 'app'}) + end + + it "should return appropriate keys" do + expect(payload.keys).to eq(['id', 'database', 'foo']) + end + + it "should include keys from the id even if they're not in the data" do + payload = described_class.new(nil, nil, {'id' => {'id' => 1}, 'data' => { 'foo' => 'bar'}}) + expect(payload.keys).to include('id') + end + + + it "should construct a multiple man identifier id if none exists" do + payload = described_class.new(delivery_info, nil, {'id' => 1, 'data' => {'foo' => 'bar'}}) + expect(payload.identify_by).to eq({'multiple_man_identifier' => 1}) + end + + it 'should store data appropriately' do + expect(payload['id']).to eq(1) + expect(payload['database']).to eq('app') + expect(payload['foo']).to eq('bar') + end + + it "should have an operation" do + expect(payload.operation).to eq('create') + end + + it "should let payloads override the operation" do + payload = described_class.new(delivery_info, nil, { 'operation' => 'update' }) + expect(payload.operation).to eq('update') + end +end diff --git a/spec/subscribers/model_subscriber_spec.rb b/spec/subscribers/model_subscriber_spec.rb index 7fbce9a..0e8a131 100644 --- a/spec/subscribers/model_subscriber_spec.rb +++ b/spec/subscribers/model_subscriber_spec.rb @@ -5,6 +5,13 @@ class MockClass end + let(:payload) { + MultipleMan::Payload::V1.new(nil, nil, { + 'id' => {'id' => 5 }, + 'data' => {'a' => 1, 'b' => 2} + }) + } + describe "initialize" do it "should listen to the object passed in for to" do subscriber = described_class.new(MockClass, to: 'PublishedClass') @@ -18,23 +25,18 @@ class MockClass MockClass.stub(:where).and_return([mock_object]) mock_populator = double(MultipleMan::ModelPopulator) MultipleMan::ModelPopulator.should_receive(:new).and_return(mock_populator) - mock_populator.should_receive(:populate).with(id: {id:5}, data: {a: 1, b: 2}) + mock_populator.should_receive(:populate).with(payload) mock_object.should_receive(:save!) - described_class.new(MockClass, {}).create({id: {id: 5}, data:{a: 1, b: 2}}) + described_class.new(MockClass, {}).create(payload) end end describe "find_model" do - it "should find by multiple_man_identifier for a single field" do - mock_object = double(MockClass).as_null_object - MockClass.should_receive(:where).with(multiple_man_identifier: 5).and_return([mock_object]) - described_class.new(MockClass, {}).create({id: 5, data:{a: 1, b: 2}}) - end it "should find by the hash for multiple fields" do mock_object = double(MockClass).as_null_object - MockClass.should_receive(:where).with(foo: 'bar').and_return([mock_object]) - described_class.new(MockClass, {}).create({id: {foo: 'bar'}, data:{a: 1, b: 2}}) + MockClass.should_receive(:where).with('id' => 5).and_return([mock_object]) + described_class.new(MockClass, {}).create(payload) end end @@ -44,7 +46,7 @@ class MockClass MockClass.should_receive(:where).and_return([mock_object]) mock_object.should_receive(:destroy!) - described_class.new(MockClass, {}).destroy({id: 1}) + described_class.new(MockClass, {}).destroy(payload) end end end diff --git a/spec/subscribers/registry_spec.rb b/spec/subscribers/registry_spec.rb index 4b8e67a..21b4ef8 100644 --- a/spec/subscribers/registry_spec.rb +++ b/spec/subscribers/registry_spec.rb @@ -1,11 +1,11 @@ require 'spec_helper' -describe MultipleMan::Subscribers::Registry do - describe "register" do +describe MultipleMan::Subscribers::Registry do + describe "register" do it "should add a subscriber" do subscription = double(:subscriber) described_class.register(subscription) described_class.subscriptions[0].should == subscription end end -end \ No newline at end of file +end From a13302ac016ca2fcab3a2a78f8ad028a09b8f3ec Mon Sep 17 00:00:00 2001 From: Ryan Brunner Date: Wed, 10 Feb 2016 11:47:04 -0500 Subject: [PATCH 2/9] Payload should build the correct payload version. --- .gitignore | 1 + lib/multiple_man.rb | 1 + lib/multiple_man/payload/payload.rb | 10 ++++++ lib/multiple_man/payload/v1.rb | 51 ++++++++++++++--------------- spec/payload/payload_spec.rb | 25 ++++++++++++++ 5 files changed, 62 insertions(+), 26 deletions(-) create mode 100644 lib/multiple_man/payload/payload.rb create mode 100644 spec/payload/payload_spec.rb diff --git a/.gitignore b/.gitignore index d87d4be..16f974f 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ spec/reports test/tmp test/version_tmp tmp +node_modules/**/* diff --git a/lib/multiple_man.rb b/lib/multiple_man.rb index 67f03ee..52067b5 100644 --- a/lib/multiple_man.rb +++ b/lib/multiple_man.rb @@ -25,6 +25,7 @@ module MultipleMan require 'multiple_man/channel_maintenance/gc' require 'multiple_man/channel_maintenance/reaper' + require 'multiple_man/payload/payload' require 'multiple_man/payload/v1' def self.logger diff --git a/lib/multiple_man/payload/payload.rb b/lib/multiple_man/payload/payload.rb new file mode 100644 index 0000000..a37d091 --- /dev/null +++ b/lib/multiple_man/payload/payload.rb @@ -0,0 +1,10 @@ +class MultipleMan::Payload + def self.build(delivery_info, properties, data) + case properties.headers["version"] + when "1", nil + V1.new(delivery_info, properties, data) + else + raise "This version of MultipleMan does not support the payload version supplied (#{properties.headers["version"]}). Please upgrade to the latest version." + end + end +end diff --git a/lib/multiple_man/payload/v1.rb b/lib/multiple_man/payload/v1.rb index d847e8f..6ea3081 100644 --- a/lib/multiple_man/payload/v1.rb +++ b/lib/multiple_man/payload/v1.rb @@ -1,35 +1,34 @@ -module MultipleMan::Payload - class V1 - def initialize(delivery_info, properties, payload) - self.payload = payload - self.delivery_info = delivery_info - end - def keys - (payload['data'].keys + payload['id'].keys).uniq - end +class MultipleMan::Payload::V1 + def initialize(delivery_info, properties, payload) + self.payload = payload + self.delivery_info = delivery_info + end - def [](value) - payload['data'][value.to_s] || payload['id'][value.to_s] - end + def keys + (payload['data'].keys + payload['id'].keys).uniq + end - def identify_by - if payload['id'].is_a?(Hash) - payload['id'] - else - {'multiple_man_identifier' => payload['id']} - end - end + def [](value) + payload['data'][value.to_s] || payload['id'][value.to_s] + end - def operation - payload['operation'] || delivery_info.routing_key.split('.').last + def identify_by + if payload['id'].is_a?(Hash) + payload['id'] + else + {'multiple_man_identifier' => payload['id']} end + end - def to_s - delivery_info.routing_key - end + def operation + payload['operation'] || delivery_info.routing_key.split('.').last + end - private - attr_accessor :payload, :delivery_info + def to_s + delivery_info.routing_key end + +private + attr_accessor :payload, :delivery_info end diff --git a/spec/payload/payload_spec.rb b/spec/payload/payload_spec.rb new file mode 100644 index 0000000..c1b6615 --- /dev/null +++ b/spec/payload/payload_spec.rb @@ -0,0 +1,25 @@ +require 'spec_helper' + +describe MultipleMan::Payload do + let(:properties) { Class.new do + attr_accessor :headers + def initialize(version) + self.headers = {"version" => version} + end + end } + + describe "::build" do + it "should assume v1 for a nil version" do + + payload = described_class.build(nil, properties.new(nil), nil) + payload.should be_instance_of(MultipleMan::Payload::V1) + end + it "should support v1" do + payload = described_class.build(nil, properties.new("1"), nil) + payload.should be_instance_of(MultipleMan::Payload::V1) + end + it "should fail on an unknown version" do + expect{ described_class.build(nil, properties.new("3"), nil)}.to raise_exception + end + end +end From 0bf8c17589310018ed7dd7c0312e1e9f03123a50 Mon Sep 17 00:00:00 2001 From: Ryan Brunner Date: Wed, 10 Feb 2016 13:02:40 -0500 Subject: [PATCH 3/9] Version 1.0 --- README.md | 15 +++++++++++++++ lib/multiple_man/listeners/listener.rb | 2 +- lib/multiple_man/version.rb | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7090aec..4577eb3 100644 --- a/README.md +++ b/README.md @@ -184,6 +184,21 @@ MyModel.multiple_man_publish(:seed) 3. Stop the seeder rake task when all of your messages have been processed. You can check your RabbitMQ server +## Versioning + +Because you may have different versions of MultipleMan between publishers and subscribers, +MultipleMan attaches **versions** to every message sent. This ensures that updates to payloads, +metadata, etc. will not affect processing of your messages. + +In general, a subscriber will not be able to process messages published by a newer version of +MultipleMan. We use **minor versions** to indicate changes that may contain a breaking change +to payload formats. + +As a consequence, when upgrading MultipleMan, it's always safe to upgrade patch versions, but +when upgrading to a new major or minor version, ensure that you upgrade your subscribers +prior to upgrading your publishers (if two services both subscribe and publish, you'll need to +update them synchronously.) + ## Contributing 1. Fork it diff --git a/lib/multiple_man/listeners/listener.rb b/lib/multiple_man/listeners/listener.rb index 4818959..d661d2d 100644 --- a/lib/multiple_man/listeners/listener.rb +++ b/lib/multiple_man/listeners/listener.rb @@ -34,7 +34,7 @@ def init_connection def listen MultipleMan.logger.info "Listening for #{subscription.klass} with routing key #{routing_key}." queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, properties, payload| - parsed_payload = MultipleMan::Payload::V1.new(delivery_info, properties, JSON.parse(payload).with_indifferent_access) + parsed_payload = MultipleMan::Payload.build(delivery_info, properties, JSON.parse(payload).with_indifferent_access) begin process_message(parsed_payload) diff --git a/lib/multiple_man/version.rb b/lib/multiple_man/version.rb index 14f1307..5a0633c 100644 --- a/lib/multiple_man/version.rb +++ b/lib/multiple_man/version.rb @@ -1,3 +1,3 @@ module MultipleMan - VERSION = "0.8.1" + VERSION = "1.0.0" end From d230926f4050192eef708dda2bb1d5e51d2efb4e Mon Sep 17 00:00:00 2001 From: Ryan Brunner Date: Wed, 10 Feb 2016 13:24:44 -0500 Subject: [PATCH 4/9] Registry spec fix --- spec/subscribers/registry_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/subscribers/registry_spec.rb b/spec/subscribers/registry_spec.rb index 21b4ef8..b471a85 100644 --- a/spec/subscribers/registry_spec.rb +++ b/spec/subscribers/registry_spec.rb @@ -5,7 +5,7 @@ it "should add a subscriber" do subscription = double(:subscriber) described_class.register(subscription) - described_class.subscriptions[0].should == subscription + described_class.subscriptions.should include subscription end end end From 57699ee60c6396ceec0c5924d795f8a693b8c9c2 Mon Sep 17 00:00:00 2001 From: Ryan Brunner Date: Wed, 10 Feb 2016 15:19:27 -0500 Subject: [PATCH 5/9] Version 2 of payload (identify by moves to headers) --- lib/multiple_man.rb | 2 +- lib/multiple_man/identity.rb | 49 ------------------------- lib/multiple_man/model_publisher.rb | 2 +- lib/multiple_man/payload/v2.rb | 37 +++++++++++++++++++ lib/multiple_man/payload_generator.rb | 18 +++++---- lib/multiple_man/version.rb | 2 +- spec/identity_spec.rb | 43 ---------------------- spec/integration/ephermal_model_spec.rb | 11 ++---- spec/model_publisher_spec.rb | 4 +- spec/payload/v2_spec.rb | 40 ++++++++++++++++++++ spec/payload_generator_spec.rb | 29 ++++++++------- 11 files changed, 112 insertions(+), 125 deletions(-) delete mode 100644 lib/multiple_man/identity.rb create mode 100644 lib/multiple_man/payload/v2.rb delete mode 100644 spec/identity_spec.rb create mode 100644 spec/payload/v2_spec.rb diff --git a/lib/multiple_man.rb b/lib/multiple_man.rb index 52067b5..6a5afc6 100644 --- a/lib/multiple_man.rb +++ b/lib/multiple_man.rb @@ -19,7 +19,6 @@ module MultipleMan require 'multiple_man/listeners/listener' require 'multiple_man/listeners/seeder_listener' require 'multiple_man/model_populator' - require 'multiple_man/identity' require 'multiple_man/publish' require 'multiple_man/channel_maintenance/gc' @@ -27,6 +26,7 @@ module MultipleMan require 'multiple_man/payload/payload' require 'multiple_man/payload/v1' + require 'multiple_man/payload/v2' def self.logger configuration.logger diff --git a/lib/multiple_man/identity.rb b/lib/multiple_man/identity.rb deleted file mode 100644 index 8e6d95c..0000000 --- a/lib/multiple_man/identity.rb +++ /dev/null @@ -1,49 +0,0 @@ -module MultipleMan - class Identity - def self.build(record, options) - if options[:identifier].present? - SingleField.new(record, options[:identifier]) - else - MultipleField.new(record, options[:identify_by]) - end - end - - def initialize(record) - self.record = record - end - - attr_accessor :record - - class MultipleField < Identity - def initialize(record, identify_by) - self.identify_by = identify_by ? [*identify_by] : [:id] - super(record) - end - def value - Hash[identify_by.map do |field| - [field, record.send(field)] - end] - end - - attr_accessor :identify_by - end - - class SingleField < Identity - def initialize(record, identifier = :id) - MultipleMan.logger.warn("Using :identifier in publish is deprecated, please switch to identify_by.") - self.identifier = identifier || :id - super(record) - end - - def value - if identifier.class == Proc - identifier.call(record).to_s - else - record.send(identifier).to_s - end - end - - attr_accessor :identifier - end - end -end \ No newline at end of file diff --git a/lib/multiple_man/model_publisher.rb b/lib/multiple_man/model_publisher.rb index 921ba98..5a29a65 100644 --- a/lib/multiple_man/model_publisher.rb +++ b/lib/multiple_man/model_publisher.rb @@ -33,7 +33,7 @@ def push_record(connection, record, operation) MultipleMan.logger.debug(" Record Data: #{data} | Routing Key: #{routing_key}") - connection.topic.publish(data.payload, routing_key: routing_key) + connection.topic.publish(data.payload, routing_key: routing_key, headers: data.headers) end def all_records(records, &block) diff --git a/lib/multiple_man/payload/v2.rb b/lib/multiple_man/payload/v2.rb new file mode 100644 index 0000000..7054c1c --- /dev/null +++ b/lib/multiple_man/payload/v2.rb @@ -0,0 +1,37 @@ + +class MultipleMan::Payload::V2 + def initialize(delivery_info, properties, payload) + self.payload = payload + self.delivery_info = delivery_info + self.properties = properties + end + + def keys + payload.keys + end + + def [](value) + payload[value.to_s] + end + + def identify_by + Hash[identify_by_header.map do |key| + [key, payload[key]] + end] + end + + def operation + delivery_info.routing_key.split('.').last + end + + def to_s + delivery_info.routing_key + end + +private + attr_accessor :payload, :delivery_info, :properties + + def identify_by_header + JSON.parse(properties.headers['identify_by']) + end +end diff --git a/lib/multiple_man/payload_generator.rb b/lib/multiple_man/payload_generator.rb index dd7a62b..d63a953 100644 --- a/lib/multiple_man/payload_generator.rb +++ b/lib/multiple_man/payload_generator.rb @@ -7,20 +7,22 @@ def initialize(record, operation = :create, options = {}) end def payload - { - type: type, - operation: operation, - id: id, - data: data - }.to_json + data.to_json + end + + def headers + { + 'version' => '2', + 'identify_by' => identify_by.to_json + } end def type options[:as] || record.class.name end - def id - Identity.build(record, options).value + def identify_by + [* (options[:identify_by] || :id) ] end def data diff --git a/lib/multiple_man/version.rb b/lib/multiple_man/version.rb index 5a0633c..1c4c29e 100644 --- a/lib/multiple_man/version.rb +++ b/lib/multiple_man/version.rb @@ -1,3 +1,3 @@ module MultipleMan - VERSION = "1.0.0" + VERSION = "1.1.0" end diff --git a/spec/identity_spec.rb b/spec/identity_spec.rb deleted file mode 100644 index 5ba5cde..0000000 --- a/spec/identity_spec.rb +++ /dev/null @@ -1,43 +0,0 @@ -require 'spec_helper' - -describe MultipleMan::Identity do - let(:record) { double(:model, id: 1, foo: 'foo', bar: 'bar' )} - - context "with identifier" do - subject { described_class.build(record, identifier: identifier).value } - let(:identifier) { :id } - - context "proc identifier" do - let(:identifier) { lambda{|record| "#{record.foo}-#{record.id}" } } - it { should == "foo-1" } - end - context "symbol identifier" do - let(:identifier) { :foo } - it { should == "foo" } - end - context "id identifier" do - let(:identifier) { :id } - it { should == "1" } - end - it "should log a deprecation notice" do - MultipleMan.logger.should_receive(:warn) - subject - end - end - - context "with identify_by" do - subject { described_class.build(record, identify_by: identify_by).value } - context "single field" do - let(:identify_by) { :foo } - it { should == { foo: 'foo'} } - end - context "no identify_by" do - let(:identify_by) { nil } - it { should == { id: 1 } } - end - context "multiple_fields" do - let(:identify_by) { [:foo, :bar] } - it { should == { foo: 'foo', bar: 'bar' } } - end - end -end \ No newline at end of file diff --git a/spec/integration/ephermal_model_spec.rb b/spec/integration/ephermal_model_spec.rb index ca78622..927b5b4 100644 --- a/spec/integration/ephermal_model_spec.rb +++ b/spec/integration/ephermal_model_spec.rb @@ -23,15 +23,12 @@ def initialize(params) it "should publish properly" do obj = ephermal_class.new(id: 5, foo: 'foo', bar: 'bar', baz: 'baz') - payload = { - type: 'Ephermal', - operation: 'create', - id: { id: 5 }, - data: { foo: 'foo', bar: 'bar', baz: 'baz'} - }.to_json + payload = { foo: 'foo', bar: 'bar', baz: 'baz'}.to_json expect_any_instance_of(Bunny::Exchange).to receive(:publish) - .with(payload, routing_key: 'multiple_man.Ephermal.create') + .with(payload, + routing_key: 'multiple_man.Ephermal.create', + headers: { 'version' => '2', 'identify_by' => ['id'].to_json }) obj.multiple_man_publish(:create) end diff --git a/spec/model_publisher_spec.rb b/spec/model_publisher_spec.rb index 52fd763..ec79d66 100644 --- a/spec/model_publisher_spec.rb +++ b/spec/model_publisher_spec.rb @@ -30,7 +30,7 @@ def model_name describe "publish" do it "should send the jsonified version of the model to the correct routing key" do MultipleMan::AttributeExtractor.any_instance.should_receive(:as_json).and_return({foo: "bar"}) - topic_stub.should_receive(:publish).with('{"type":"MockObject","operation":"create","id":{"id":10},"data":{"foo":"bar"}}', routing_key: "app.MockObject.create") + topic_stub.should_receive(:publish).with('{"foo":"bar"}', routing_key: "app.MockObject.create", headers:{'version' => '2', 'identify_by' => ['id'].to_json}) described_class.new(fields: [:foo]).publish(MockObject.new) end @@ -55,7 +55,7 @@ def as_json it "should get its data from the serializer" do obj = MockObject.new - topic_stub.should_receive(:publish).with('{"type":"MockObject","operation":"create","id":{"id":10},"data":{"a":"yes"}}', routing_key: "app.MockObject.create") + topic_stub.should_receive(:publish).with('{"a":"yes"}', routing_key: "app.MockObject.create", headers: {'version' => '2', 'identify_by' => ['id'].to_json}) subject.publish(obj) end end diff --git a/spec/payload/v2_spec.rb b/spec/payload/v2_spec.rb new file mode 100644 index 0000000..fa6c21d --- /dev/null +++ b/spec/payload/v2_spec.rb @@ -0,0 +1,40 @@ +require 'spec_helper' + +describe MultipleMan::Payload::V2 do + let(:delivery_info) { + double(:delivery_info, routing_key: 'blah.blah.create') + } + + let(:properties) { + double(:properties, headers: { + 'version' => '2', + 'identify_by' => ['id', 'database'].to_json + }) + } + + let(:payload) { + described_class.new(delivery_info, properties, { + 'id' => 1, + 'database' => 'app', + 'foo' => 'bar' + }) + } + + it "should return appropriate identify_by keys" do + expect(payload.identify_by).to eq({'id' => 1, 'database' => 'app'}) + end + + it "should return appropriate keys" do + expect(payload.keys).to eq(['id', 'database', 'foo']) + end + + it 'should store data appropriately' do + expect(payload['id']).to eq(1) + expect(payload['database']).to eq('app') + expect(payload['foo']).to eq('bar') + end + + it "should have an operation" do + expect(payload.operation).to eq('create') + end +end \ No newline at end of file diff --git a/spec/payload_generator_spec.rb b/spec/payload_generator_spec.rb index 50ae2a2..1e4ad93 100644 --- a/spec/payload_generator_spec.rb +++ b/spec/payload_generator_spec.rb @@ -6,15 +6,6 @@ class PayloadMockClass < Struct.new(:foo, :bar) let(:mock_object) { PayloadMockClass.new(1,2) } - describe "operation" do - it "should be whatever was passed in" do - expect(described_class.new(mock_object, :update).operation).to eq('update') - end - it "should be create by default" do - expect(described_class.new(mock_object).operation).to eq('create') - end - end - describe "data" do context "with a serializer" do it "should return stuff from the serializer" do @@ -34,11 +25,23 @@ def as_json end end end + + describe "payload" do + it "should output data" do + MultipleMan::AttributeExtractor.any_instance.stub(:as_json).and_return({c: 3, d: 4}) + described_class.new(mock_object, :create, { fields: [:c, :d] }).payload.should == {c: 3, d: 4}.to_json + end + end - describe "id" do - it "should defer to identity" do - MultipleMan::Identity::MultipleField.any_instance.stub(:value).and_return("foo_1") - described_class.new(mock_object).id.should == "foo_1" + describe "headers" do + it "should output version 2" do + described_class.new(mock_object, :create, { fields: [:c, :d] }).headers['version'].should == '2' + end + it "should include identity_by headers" do + described_class.new(mock_object, :create, { fields: [:c, :d] }).headers['identify_by'].should == ['id'].to_json + end + it "should support custom identify by information" do + described_class.new(mock_object, :create, { identify_by: [:c, :d], fields: [:c, :d] }).headers['identify_by'].should == ['c', 'd'].to_json end end From 51bf2dcc1e5a699d76cb423a94e5e8379c777646 Mon Sep 17 00:00:00 2001 From: Ryan Brunner Date: Thu, 11 Feb 2016 09:58:30 -0500 Subject: [PATCH 6/9] Actually subscribe to V2. --- lib/multiple_man/payload/payload.rb | 2 ++ spec/payload/payload_spec.rb | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/lib/multiple_man/payload/payload.rb b/lib/multiple_man/payload/payload.rb index a37d091..fba7b2c 100644 --- a/lib/multiple_man/payload/payload.rb +++ b/lib/multiple_man/payload/payload.rb @@ -3,6 +3,8 @@ def self.build(delivery_info, properties, data) case properties.headers["version"] when "1", nil V1.new(delivery_info, properties, data) + when "2" + V2.new(delivery_info, properties, data) else raise "This version of MultipleMan does not support the payload version supplied (#{properties.headers["version"]}). Please upgrade to the latest version." end diff --git a/spec/payload/payload_spec.rb b/spec/payload/payload_spec.rb index c1b6615..0b06c23 100644 --- a/spec/payload/payload_spec.rb +++ b/spec/payload/payload_spec.rb @@ -18,6 +18,10 @@ def initialize(version) payload = described_class.build(nil, properties.new("1"), nil) payload.should be_instance_of(MultipleMan::Payload::V1) end + it "should support v2" do + payload = described_class.build(nil, properties.new("2"), nil) + payload.should be_instance_of(MultipleMan::Payload::V2) + end it "should fail on an unknown version" do expect{ described_class.build(nil, properties.new("3"), nil)}.to raise_exception end From 4d2c59907114f72198384d7a0cc8ebc3cd2aaf58 Mon Sep 17 00:00:00 2001 From: Ryan Brunner Date: Thu, 11 Feb 2016 10:06:23 -0500 Subject: [PATCH 7/9] Upgrade specs to use v2 rather than v1. --- spec/listeners/listener_spec.rb | 46 +++++++++++++++-------- spec/model_populator_spec.rb | 13 +++---- spec/subscribers/model_subscriber_spec.rb | 9 +++-- 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/spec/listeners/listener_spec.rb b/spec/listeners/listener_spec.rb index 776aa51..451daf6 100644 --- a/spec/listeners/listener_spec.rb +++ b/spec/listeners/listener_spec.rb @@ -38,25 +38,41 @@ class MockClass2; end end end - specify "process_message should send the correct data" do - connection_stub = double(MultipleMan::Connection).as_null_object - MultipleMan::Connection.stub(:new).and_return(connection_stub) - subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object - listener = described_class.new(subscriber) + context "v1" do + specify "process_message should send the correct data" do + connection_stub = double(MultipleMan::Connection).as_null_object + MultipleMan::Connection.stub(:new).and_return(connection_stub) + subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object + listener = described_class.new(subscriber) - subscriber.should_receive(:create).with(instance_of(MultipleMan::Payload::V1)) + subscriber.should_receive(:create).with(instance_of(MultipleMan::Payload::V1)) - listener.process_message(MultipleMan::Payload::V1.new(double(:delivery_info, routing_key: 'app.MockClass1.create'), nil, "data" => {'a' => 1, 'b' => 2})) - end + listener.process_message(MultipleMan::Payload::V1.new(double(:delivery_info, routing_key: 'app.MockClass1.create'), nil, "data" => {'a' => 1, 'b' => 2})) + end - specify "process_message should use the payload to determine the operation if it's available" do - connection_stub = double(MultipleMan::Connection).as_null_object - MultipleMan::Connection.stub(:new).and_return(connection_stub) - subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object - listener = described_class.new(subscriber) + specify "process_message should use the payload to determine the operation if it's available" do + connection_stub = double(MultipleMan::Connection).as_null_object + MultipleMan::Connection.stub(:new).and_return(connection_stub) + subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object + listener = described_class.new(subscriber) - subscriber.should_receive(:create) + subscriber.should_receive(:create) - listener.process_message(MultipleMan::Payload::V1.new(double(:delivery_info, routing_key: 'app.MockClass1, update'), nil, "operation" => "create", "data" => {'a' => 1, 'b' => 2})) + listener.process_message(MultipleMan::Payload::V1.new(double(:delivery_info, routing_key: 'app.MockClass1, update'), nil, "operation" => "create", "data" => {'a' => 1, 'b' => 2})) + end + end + + context "v2" do + specify "process_message should send the correct data" do + connection_stub = double(MultipleMan::Connection).as_null_object + MultipleMan::Connection.stub(:new).and_return(connection_stub) + subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object + properties = double(:properties, headers: { 'version' => '2', 'identify_by' => ['id'].to_json }) + listener = described_class.new(subscriber) + + subscriber.should_receive(:create).with(instance_of(MultipleMan::Payload::V2)) + + listener.process_message(MultipleMan::Payload::V2.new(double(:delivery_info, routing_key: 'app.MockClass1.create'), properties, {'a' => 1, 'b' => 2})) + end end end diff --git a/spec/model_populator_spec.rb b/spec/model_populator_spec.rb index 7335ab3..2a5f2a9 100644 --- a/spec/model_populator_spec.rb +++ b/spec/model_populator_spec.rb @@ -2,21 +2,18 @@ describe MultipleMan::ModelPopulator do class MockModel - attr_accessor :a, :b, :multiple_man_identifier + attr_accessor :a, :b, :id end describe "populate" do let(:model) { MockModel.new } - let(:payload) { MultipleMan::Payload::V1.new(nil, nil, { - 'id' => id, - 'data' => data - })} - let(:data) { { 'a' => 1, 'b' => 2 } } - let(:id) { { 'multiple_man_identifier' => 'app_1' }} + let(:payload) { MultipleMan::Payload::V2.new(nil, properties, data)} + let(:data) { { 'a' => 1, 'b' => 2, 'id' => 1 } } + let(:properties) { double(:properties, headers: { 'identify_by' => ['id'].to_json })} let(:fields) { nil } subject { described_class.new(model, fields).populate(payload) } - its(:multiple_man_identifier) { should == 'app_1' } + its(:id) { should == 1 } context "with fields defined" do let(:fields) { [:a] } diff --git a/spec/subscribers/model_subscriber_spec.rb b/spec/subscribers/model_subscriber_spec.rb index 0e8a131..8b4be43 100644 --- a/spec/subscribers/model_subscriber_spec.rb +++ b/spec/subscribers/model_subscriber_spec.rb @@ -4,11 +4,12 @@ class MockClass end - + let(:properties) { + double(:properties, headers: {'identify_by' => ['id'].to_json }) + } let(:payload) { - MultipleMan::Payload::V1.new(nil, nil, { - 'id' => {'id' => 5 }, - 'data' => {'a' => 1, 'b' => 2} + MultipleMan::Payload::V2.new(nil, properties, { + 'a' => 1, 'b' => 2, 'id' => 5 }) } From 57183f9785be9118247df8aaee72d492604f0d6c Mon Sep 17 00:00:00 2001 From: Ryan Brunner Date: Thu, 11 Feb 2016 10:17:13 -0500 Subject: [PATCH 8/9] Version history in README.md --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 4577eb3..be73bca 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,11 @@ when upgrading to a new major or minor version, ensure that you upgrade your sub prior to upgrading your publishers (if two services both subscribe and publish, you'll need to update them synchronously.) +Currently, the following versions support the following payload versions: + +- **Payload v1** - 1.0.x +- **Payload v2** - 1.1.x + ## Contributing 1. Fork it From fb9b50893a2c3f6f6d3c641470ac3732cf1eaeb5 Mon Sep 17 00:00:00 2001 From: Ryan Brunner Date: Thu, 11 Feb 2016 11:15:18 -0500 Subject: [PATCH 9/9] Allow overriding of identify by fields. --- README.md | 21 +++++++++ .../subscribers/model_subscriber.rb | 44 ++++++++++++++----- spec/subscribers/model_subscriber_spec.rb | 12 +++++ 3 files changed, 67 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index be73bca..a150b41 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,27 @@ You can pass the following options to the `subscribe` call: - `fields` - Specify which fields you want to receive from the publisher. If this is blank, then any field that is published where your subscriber model has a corresponding `field=` method will be subscribed to. +- `to` - If the model name on the publishing end differs from your model name, specify + the name of the model from the publisher's perspective. +- `identify_by` - If you want to identify this model by a different set of fields than + what the publisher specifies, you can specify it here. For instance, you may have an + `id` numeric field on the publisher, but the publisher also supplies a `uuid` field. You + can specify `identify_by: :uuid` to override which field is used. Accepts either a single + field or an array of fields. + +You can also remap fields within the subscriber, like so: + + class Widget < ActiveRecord::Base + include MultipleMan::Subscriber + subscribe fields: { + id: :id, + name: :remote_name + } + end + +The keys in the supplied hash indicate the field name on the payload (i.e. the publisher +side) and the values represent which local field you want to map them to. + By default, MultipleMan will attempt to identify which model on the subscriber matches a model sent by the publisher by id. However, if your publisher specifies an `identify_by` array, MultipleMan will locate your record by finding a record where all of those fields match. diff --git a/lib/multiple_man/subscribers/model_subscriber.rb b/lib/multiple_man/subscribers/model_subscriber.rb index 806e432..7dba2d1 100644 --- a/lib/multiple_man/subscribers/model_subscriber.rb +++ b/lib/multiple_man/subscribers/model_subscriber.rb @@ -26,20 +26,44 @@ def destroy(payload) private def find_model(payload) - model_class.where(cleanse_id(payload.identify_by)).first || model_class.new + ModelFinder.new(payload, options, model_class).model end + + attr_writer :klass + attr_accessor :model_class - def cleanse_id(hash) - if hash.keys.length > 1 && hash.keys.include?("id") - id = hash.delete("id") - hash.merge("source_id" => id) - else - hash + class ModelFinder + def initialize(payload, options, model_class) + self.payload = payload + self.options = options + self.model_class = model_class + end + + def model + model_class.where(conditions).first || model_class.new end - end - attr_writer :klass - attr_accessor :model_class + def conditions + hash = find_conditions + if hash.keys.length > 1 && hash.keys.include?("id") + id = hash.delete("id") + hash.merge("source_id" => id) + else + hash + end + end + + def find_conditions + if options[:identify_by] + Hash[[*options[:identify_by]].map{|k| [k, payload[k]]}] + else + payload.identify_by + end + end + + private + attr_accessor :payload, :options, :model_class + end end end diff --git a/spec/subscribers/model_subscriber_spec.rb b/spec/subscribers/model_subscriber_spec.rb index 8b4be43..1d9c926 100644 --- a/spec/subscribers/model_subscriber_spec.rb +++ b/spec/subscribers/model_subscriber_spec.rb @@ -39,6 +39,18 @@ class MockClass MockClass.should_receive(:where).with('id' => 5).and_return([mock_object]) described_class.new(MockClass, {}).create(payload) end + + it "should use overridden identify_by if available" do + mock_object = double(MockClass).as_null_object + MockClass.should_receive(:where).with(:a => 1).and_return([mock_object]) + described_class.new(MockClass, {identify_by: :a}).create(payload) + end + + it "should support an array of identify bys" do + mock_object = double(MockClass).as_null_object + MockClass.should_receive(:where).with(a: 1, b: 2).and_return([mock_object]) + described_class.new(MockClass, {identify_by: [:a, :b]}).create(payload) + end end describe "destroy" do