Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/override identify by #15

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ spec/reports
test/tmp
test/version_tmp
tmp
node_modules/**/*
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,27 @@ You can pass the following options to the `subscribe` call:

- `fields` - Specify which fields you want to receive from
the publisher. If this is blank, then any field that is published where your subscriber model has a corresponding `field=` method will be subscribed to.
- `to` - If the model name on the publishing end differs from your model name, specify
the name of the model from the publisher's perspective.
- `identify_by` - If you want to identify this model by a different set of fields than
what the publisher specifies, you can specify it here. For instance, you may have an
`id` numeric field on the publisher, but the publisher also supplies a `uuid` field. You
can specify `identify_by: :uuid` to override which field is used. Accepts either a single
field or an array of fields.

You can also remap fields within the subscriber, like so:

class Widget < ActiveRecord::Base
include MultipleMan::Subscriber
subscribe fields: {
id: :id,
name: :remote_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not be remote_name: name? If the incoming is the key and the remapped name is the value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya your docs suggest the hash format is {:from => :to}

which would yield {remote_name: :name}

}
end

The keys in the supplied hash indicate the field name on the payload (i.e. the publisher
side) and the values represent which local field you want to map them to.


By default, MultipleMan will attempt to identify which model on the subscriber matches a model sent by the publisher by id. However, if your publisher specifies an `identify_by` array, MultipleMan will locate your record by finding a record where all of those fields match.

Expand Down Expand Up @@ -184,6 +205,26 @@ MyModel.multiple_man_publish(:seed)

3. Stop the seeder rake task when all of your messages have been processed. You can check your RabbitMQ server

## Versioning

Because you may have different versions of MultipleMan between publishers and subscribers,
MultipleMan attaches **versions** to every message sent. This ensures that updates to payloads,
metadata, etc. will not affect processing of your messages.

In general, a subscriber will not be able to process messages published by a newer version of
MultipleMan. We use **minor versions** to indicate changes that may contain a breaking change
to payload formats.

As a consequence, when upgrading MultipleMan, it's always safe to upgrade patch versions, but
when upgrading to a new major or minor version, ensure that you upgrade your subscribers
prior to upgrading your publishers (if two services both subscribe and publish, you'll need to
update them synchronously.)

Currently, the following versions support the following payload versions:

- **Payload v1** - 1.0.x
- **Payload v2** - 1.1.x

## Contributing

1. Fork it
Expand Down
5 changes: 4 additions & 1 deletion lib/multiple_man.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ module MultipleMan
require 'multiple_man/listeners/listener'
require 'multiple_man/listeners/seeder_listener'
require 'multiple_man/model_populator'
require 'multiple_man/identity'
require 'multiple_man/publish'

require 'multiple_man/channel_maintenance/gc'
require 'multiple_man/channel_maintenance/reaper'

require 'multiple_man/payload/payload'
require 'multiple_man/payload/v1'
require 'multiple_man/payload/v2'

def self.logger
configuration.logger
end
Expand Down
49 changes: 0 additions & 49 deletions lib/multiple_man/identity.rb

This file was deleted.

28 changes: 14 additions & 14 deletions lib/multiple_man/listeners/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@ def init_connection
attr_accessor :subscription, :connection

def listen

MultipleMan.logger.info "Listening for #{subscription.klass} with routing key #{routing_key}."
queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, _, payload|
process_message(delivery_info, payload)
queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, properties, payload|
parsed_payload = MultipleMan::Payload.build(delivery_info, properties, JSON.parse(payload).with_indifferent_access)

begin
process_message(parsed_payload)
rescue Exception => ex
handle_error(ex, delivery_info)
else
MultipleMan.logger.debug " Successfully processed!"
queue.channel.acknowledge(delivery_info.delivery_tag, false)
end
end
end

def process_message(delivery_info, payload)
MultipleMan.logger.info "Processing message for #{delivery_info.routing_key}."
begin
payload = JSON.parse(payload).with_indifferent_access
subscription.send(operation(delivery_info, payload), payload)
rescue ex
handle_error(ex, delivery_info)
else
MultipleMan.logger.debug " Successfully processed!"
queue.channel.acknowledge(delivery_info.delivery_tag, false)
end
def process_message(payload)
MultipleMan.logger.info "Processing message for #{payload}."
subscription.send(payload.operation, payload)
end

def handle_error(ex, delivery_info)
Expand Down
9 changes: 4 additions & 5 deletions lib/multiple_man/model_populator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ def initialize(record, fields)
end

def populate(payload)
data = payload[:id].merge(payload[:data])
fields_for(data).each do |field|
fields_for(payload).each do |field|
source, dest = field.is_a?(Array) ? field : [field, field]
populate_field(dest, data[source])
populate_field(dest, payload[source])
end
record
end
Expand All @@ -37,8 +36,8 @@ def populate_field(field, value)
end
end

def fields_for(data)
fields || data.keys
def fields_for(payload)
fields || payload.keys
end
end
end
2 changes: 1 addition & 1 deletion lib/multiple_man/model_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def push_record(connection, record, operation)

MultipleMan.logger.debug(" Record Data: #{data} | Routing Key: #{routing_key}")

connection.topic.publish(data.payload, routing_key: routing_key)
connection.topic.publish(data.payload, routing_key: routing_key, headers: data.headers)
end

def all_records(records, &block)
Expand Down
12 changes: 12 additions & 0 deletions lib/multiple_man/payload/payload.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class MultipleMan::Payload
def self.build(delivery_info, properties, data)
case properties.headers["version"]
when "1", nil
V1.new(delivery_info, properties, data)
when "2"
V2.new(delivery_info, properties, data)
else
raise "This version of MultipleMan does not support the payload version supplied (#{properties.headers["version"]}). Please upgrade to the latest version."
end
end
end
34 changes: 34 additions & 0 deletions lib/multiple_man/payload/v1.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

class MultipleMan::Payload::V1
def initialize(delivery_info, properties, payload)
self.payload = payload
self.delivery_info = delivery_info
end

def keys
(payload['data'].keys + payload['id'].keys).uniq
end

def [](value)
payload['data'][value.to_s] || payload['id'][value.to_s]
end

def identify_by
if payload['id'].is_a?(Hash)
payload['id']
else
{'multiple_man_identifier' => payload['id']}
end
end

def operation
payload['operation'] || delivery_info.routing_key.split('.').last
end

def to_s
delivery_info.routing_key
end

private
attr_accessor :payload, :delivery_info
end
37 changes: 37 additions & 0 deletions lib/multiple_man/payload/v2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

class MultipleMan::Payload::V2
def initialize(delivery_info, properties, payload)
self.payload = payload
self.delivery_info = delivery_info
self.properties = properties
end

def keys
payload.keys
end

def [](value)
payload[value.to_s]
end

def identify_by
Hash[identify_by_header.map do |key|
[key, payload[key]]
end]
end

def operation
delivery_info.routing_key.split('.').last
end

def to_s
delivery_info.routing_key
end

private
attr_accessor :payload, :delivery_info, :properties

def identify_by_header
JSON.parse(properties.headers['identify_by'])
end
end
18 changes: 10 additions & 8 deletions lib/multiple_man/payload_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ def initialize(record, operation = :create, options = {})
end

def payload
{
type: type,
operation: operation,
id: id,
data: data
}.to_json
data.to_json
end

def headers
{
'version' => '2',
'identify_by' => identify_by.to_json
}
end

def type
options[:as] || record.class.name
end

def id
Identity.build(record, options).value
def identify_by
[* (options[:identify_by] || :id) ]
end

def data
Expand Down
8 changes: 4 additions & 4 deletions lib/multiple_man/subscribers/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ def initialize(klass)

attr_reader :klass

def create(_)
def create(payload)
# noop
end

def update(_)
def update(payload)
# noop
end

def destroy(_)
def destroy(payload)
# noop
end

def seed(_)
def seed(payload)
# noop
end

Expand Down
Loading