Hermes - a messenger of gods, delivering them via RabbitMQ with a little help from hutch.
Add this line to your application's Gemfile:
gem 'hermes-rb'
And then execute:
$ bundle install
Or install it yourself as:
$ gem install hermes-rb
First, define an initializer, for example config/initializers/hermes.rb
Rails.application.config.to_prepare do
event_handler = Hermes::EventHandler.new
Hermes.configure do |config|
config.adapter = Rails.application.config.async_messaging_adapter
config.application_prefix = "my_app"
config.background_processor = HermesHandlerJob
config.enqueue_method = :perform_async
config.event_handler = event_handler
config.clock = Time.zone
config.instrumenter = Instrumenter
config.configure_hutch do |hutch|
hutch.uri = ENV.fetch("HUTCH_URI")
end
config.distributed_tracing_database_uri = ENV.fetch("DISTRIBUTED_TRACING_DATABASE_URI")
end
event_handler.handle_events do
handle Events::Example::Happened, with: Example::HappenedHandler
handle Events::Example::SyncCallHappened, with: Example::SyncCallHappenedHandler, async: false
end
# if you care about distributed tracing
Hermes::DistributedTrace.establish_connection(Hermes.configuration.distributed_tracing_database_uri)
end
Hutch::Logging.logger = Rails.logger if !Rails.env.test? && !Rails.env.development?
Note that not all options are required (could be the case if the application is just a producer or just a consumer).
adapter
- messages can be either delivered via RabbitMQ or in-memory adapter (useful for testing). Most likely you will want to make it based on the environment, that's why it's advisable to useRails.application.config.async_messaging_adapter
and defineasync_messaging_adapter
onconfig
object indevelopment.rb
,test.rb
andproduction.rb
files. The recommended setup is to assignconfig.async_messaging_adapter = :in_memory
for test ENV andconfig.async_messaging_adapter = :hutch
for production and development ENVs.application_prefix
- identifier for this application. ABSOLUTELY NECESSARY unless you want to have competing queues with different applications (hint: most likely you don't want that).
3 and 4. background_processor
and enqueue_method
. By design, Hermes is supposed to use Hutch workers to fetch the messages from RabbitMQ and process them in some background jobs framework. background_processor
refers to the name of the class for the job and enqueue_method
is the method name that will be called when enqueuing the job. This method must accept three arguments: event_class
, body
and headers
. Here is an example for Sidekiq:
class HermesHandlerJob
include Sidekiq::Worker
sidekiq_options queue: :critical
def perform(event_class, body, headers)
Hermes::EventProcessor.call(event_class, body, headers)
end
end
If you know what you are doing, you don't necessarily have to process things in the background. As long as the class implements the expected interface, you can do anything you want.
event_handler
- an instance of event handler/storage, just use what is shown in the example.clock
- a clock object that is time-zone aware, implementingnow
method.configure_hutch
- a way to specifyhutch uri
, basically the URI for RabbitMQ.event_handler.handle_events
- that's how you declare events and their handlers. The event handler is an object that responds tocall
method and takesevent
as an argument. All events should ideally be subclasses ofHermes::BaseEvent
This class inherits from Dry::Struct
, so getting familiar with dry-struct gem would be beneficial. Here is an example event:
class Payment::MarkedAsPaid < Hermes::BaseEvent
attribute :payment_id, Types::Strict::Integer
attribute :cents, Types::Strict::Integer
attribute :currency, Types::Strict::String
end
To keep things clean, you might want to prefix the namespace with Events
:
class Events::Payment::MarkedAsPaid < Hermes::BaseEvent
attribute :payment_id, Types::Strict::Integer
attribute :cents, Types::Strict::Integer
attribute :currency, Types::Strict::String
end
In both cases, the routing key will be the same (Events
prefix is dropped) and will resolve to payment.marked_as_paid
To avoid unexpected problems, don't use restricted names for attribtes such as meta
, routing_key
, origin_headers
, origin_body
, trace_context
, version
.
You can also specify whether the event should be processed asynchronously using background_processor
(default behavior) or synchronously. If you want the event to be processed synchronously, e.g. when doing RPC, use async: false
option.
-
rpc_call_timeout
- a timeout for RPC calls, defaults to 10 seconds. Can be also customized per instance of RPC Client (covered later). -
instrumenter
- instrumenter object responding toinstrument
method taking one string argument, one optional hash argument and a block.
For example:
module Instrumenter
extend ::NewRelic::Agent::MethodTracer
def self.instrument(name, payload = {})
ActiveSupport::Notifications.instrument(name, payload) do
self.class.trace_execution_scoped([name]) do
yield if block_given?
end
end
end
end
If you don't care about it, you can leave it empty.
-
distributed_tracing_database_uri
- If you want to enable distributed tracing, specify Postgres database URI -
distributed_tracing_database_table
- Table name for storing traces, by default it'shermes_distributed_traces
. -
distributes_tracing_mapper
- an object responding tocall
method taking one argument (a hash of attributes) that has to return a hash as well. This hash will be used for assigning attributes when creatingHermes::DistributedTrace
. The default mapper just returns the original hash. You can use it if you want to remove, for example, some sensitive info from the event's body.
If you want to handle RPC call, you need to add rpc: true
flag. Keep in mind that RPC requires a synchronous processing and response, so you also need to set async: false
. The routing key and correlation ID will be resolved based on the message that is published by the client. The payload that is sent back will be what event handler reutrns, so it might be a good idea to just return a hash so that you can operate on JSON easily.
To publish an async event call Hermes::Publisher
:
Hermes::EventProducer.publish(event)
event
is an instance of a subclass of Events::BaseEvent
.
If you want to perform a synchronous RPC call, use Hermes::RpcClient
:
parsed_response_hash = Hermes::RpcClient.call(event)
You can also use an explicit initializer and provide custom rpc_call_timeout
:
parsed_response_hash = Hermes::RpcClient.new(rpc_call_timeout: 10).call(event)
If the request timeouts, Hermes::RpcClient::RpcTimeoutError
will be raised.
If you want to take advantage of distributed tracing, you need to specify distributed_tracing_database_uri
in the config and in many cases that will be enough, although there are some cases where some extra code will be required to properly use it.
If you have a "standard" flow, which means producing events and then consuming them in the jobs specified by background_processor
and publishing other events from the same class, then you don't need to do anything extra as things will be handled out-of-box. In such scenario, at least two Hermes::DistributedTrace
will be created (one for producer, and the rest for consumers and then potential other traces if the consumer also published some events).
However, if you enqueue some job inside the job specified by background_processor
, you will need to do something extra:
- You need to pass
origin_headers
as an argument to the job to have headers available. You can extract them inside the handler from the event by callingevent.origin_headers
- When processing the job, you will need to assign these headers to
Hermes
:
Hermes.origin_headers = origin_headers
These origin_headers
will be stored in RequestStore.store
(it uses request_store).
Traces are also stored for RPC calls. For a single RPC, there will be traces:
- Client (the actual RPC call)
- Server (processing the request)
- Client (processing the response) - that one uses a special internal event to keep the consistency:
ResponseEvent
, which storesresponse_body
as a hash.
You will also need to create an appropriate database table:
create_table(:hermes_distributed_traces) do |t|
t.string "trace", null: false
t.string "span", null: false
t.string "parent_span"
t.string "service", null: false
t.text "event_class", null: false
t.text "routing_key", null: false
t.jsonb "event_body", null: false, default: []
t.jsonb "event_headers", null: false, default: []
t.datetime "created_at", precision: 6, null: false
t.datetime "updated_at", precision: 6, null: false
t.index ["created_at"], name: "index_hermes_distributed_traces_on_created_at", using: :brin
t.index ["trace"], name: "index_hermes_distributed_traces_on_trace"
t.index ["span"], name: "index_hermes_distributed_traces_on_span"
t.index ["service"], name: "index_hermes_distributed_traces_on_service"
t.index ["event_class"], name: "index_hermes_distributed_traces_on_event_class"
t.index ["routing_key"], name: "index_hermes_distributed_traces_on_routing_key"
end
Some important attributes to understand which will be useful during potential debugging:
trace
- ID of the trace - all events from the same saga will have the same value (and that's why it's important to properly deal withorigin_headers
).span
- ID of the operation.parent span
- span value of the previous operation from the previous service.service
- name of the service where the given event occured, based onapplication_prefix
,
It is highly recommended to use a shared database for storing traces. It's not ideal, but the benefits of storing traces in a single DB shared by the applications outweigh the disadvantages in many cases.
Put this inside rails_helper
. Note that it requires webmock
and sidekiq
.
def execute_jobs_inline
original_active_job_adapter = ActiveJob::Base.queue_adapter
ActiveJob::Base.queue_adapter = :inline
Sidekiq::Testing.inline! do
yield
end
ActiveJob::Base.queue_adapter = original_active_job_adapter
end
config.around(:example, :inline_jobs) do |example|
execute_jobs_inline { example.run }
end
class ActiveRecord::Base
mattr_accessor :shared_connection
def self.connection
shared_connection.presence || retrieve_connection
end
end
config.after(:each) do
Hermes::Publisher.instance.reset
end
config.before(:each, :with_rabbit_mq) do
ActiveRecord::Base.shared_connection = ActiveRecord::Base.connection
stub_request(:get, "http://127.0.0.1:15672/api/exchanges")
stub_request(:get, "http://127.0.0.1:15672/api/bindings")
hutch_publisher = Hermes::Publisher::HutchAdapter.new
Hermes::Publisher.instance.current_adapter = hutch_publisher
@worker_thread = Thread.new do
Hutch.connect
worker = Hutch::Worker.new(Hutch.broker, Hutch.consumers, Hutch::Config.setup_procs)
worker.run
end
sleep 0.2
end
config.after(:each, :with_rabbit_mq) do |example|
@worker_thread.kill
end
To run integrations specs (with real RabbitMQ process), use inline_jobs
and with_rabbit_mq
meta flags.
require "rails_helper"
RSpec.describe "Example Event Test", :with_rabbit_mq, :inline_jobs do
describe "when Events::Example::Happened is published" do
subject(:publish_event) { Hermes::EventProducer.publish(event) }
let(:event) { Events::Example::Happened.new(event_params) }
let(:event_params) do
{
name: name
}
end
let(:name) { "hermes" }
it "calls Example::HappenedHandler" do
expect(Example::HappenedHandler).to receive(:call)
.with(instance_of(Events::Example::Happened)).and_call_original
publish_event
sleep 0.2 # since this is an async action, some delay will be required, either with a simple way like this, or you may want to go with something more complex to not put ugly `sleep` here
end
end
end
E.g. in spec/supports/matchers/publish_async_message
:
require "hermes/support/matchers/publish_async_message"
And then use it in the following way:
expect {
call
}.to publish_async_message(routing_key_of_the_expected_event).with_event_payload(expected_event_payload)
Note that expected_event_payload
does not contain extra meta
key that is added by Hermes publisher, it's just a symbolized hash with the result of the serialization of the event.
require "rails_helper"
RSpec.describe HermesHandlerJob do
it { is_expected.to be_processed_in :critical }
describe "#perform" do
subject(:perform) { described_class.new.perform(EventClassForTestingHermesHandlerJob.to_s, payload, headers) }
let(:configuration) { Hermes.configuration }
let(:event_handler) { Hermes::EventHandler.new }
let(:payload) do
{
"bookingsync" => "hermes"
}
end
let(:headers) do
{}
end
class EventClassForTestingHermesHandlerJob < Hermes::BaseEvent
attribute :bookingsync, Types::Strict::String
end
class HandlerForEventClassForTestingHermesHandlerJob
def self.event
@event
end
def self.call(event)
@event = event
end
end
before do
event_handler.handle_events do
handle EventClassForTestingHermesHandlerJob, with: HandlerForEventClassForTestingHermesHandlerJob
end
end
around do |example|
original_event_handler = configuration.event_handler
Hermes.configure do |config|
config.event_handler = event_handler
end
example.run
Hermes.configure do |config|
config.event_handler = original_event_handler
end
end
it "calls proper handler with a given event" do
perform
expect(HandlerForEventClassForTestingHermesHandlerJob.event).to be_a(EventClassForTestingHermesHandlerJob)
expect(HandlerForEventClassForTestingHermesHandlerJob.event.bookingsync).to eq "hermes"
end
end
end
Hermes is just an extra later on top of hutch, refer to Hutch's docs for more info about dealing with the workers and deployment.
Use - image: brandembassy/rabbitmq:latest
After checking out the repo, run bin/setup
to install dependencies. Then, run rake spec
to run the tests. You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
, which will create a git tag for the version, push git commits and tags, and push the .gem
file to rubygems.org.
Bug reports and pull requests are welcome on GitHub at https://github.com/BookingSync/hermes-rb.
The gem is available as open source under the terms of the MIT License.