/resque-bus

Use resque as a message bus!

Primary LanguageRubyMIT LicenseMIT

Resque Bus

This gem uses Redis and Resque to allow simple asynchronous communication between apps.

Example

Application A can publish an event

# config
ResqueBus.redis = "192.168.1.1:6379"

# business logic
ResqueBus.publish("user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith")

# or do it later
ResqueBus.publish_at(1.hour.from_now, "user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith")

Application B is subscribed to events

# config
ResqueBus.redis = "192.168.1.1:6379"

# initializer
ResqueBus.dispatch("app_b") do
  # processes event on app_b_default queue
  # subscribe is short-hand to subscribe to your 'default' queue and this block with process events with the name "user_created"
  subscribe "user_created" do |attributes|
    NameCount.find_or_create_by_name(attributes["last_name"]).increment!
  end
  
  # processes event on app_b_critical queue
  # critical is short-hand to subscribe to your 'critical' queue and this block with process events with the name "user_paid"
  critical "user_paid" do |attributes|
    CreditCard.charge!(attributes)
  end

  # you can pass any queue name you would like to process from as well IE: `banana "peeled" do |attributes|`
  
  # and regexes work as well. note that with the above configuration along with this regex,
  # the following as well as the corresponding block above would both be executed
  subscribe /^user_/ do |attributes|
    Metrics.record_user_action(attributes["bus_event_type"], attributes["id"])
  end
  
  # the above all filter on just the event_type, but you can filter on anything
  # this would be _any_ event that has a user_id and the page value of homepage regardless of bus_event_type
  subscribe "my_key", { "user_id" => :present, "page" => "homepage"} do
    Mixpanel.homepage_action!(attributes["action"])
  end
end

Applications can also subscribe within classes using the provided Subscriber module.

class SimpleSubscriber
  include ResqueBus::Subscriber
  subscribe :my_method

  def my_method(attributes)
    # heavy lifting
  end
end

The following is equivalent to the original initializer and shows more options:

class OtherSubscriber
  include ResqueBus::Subscriber
  application :app_b

  subscribe :user_created
  subscribe_queue :app_b_critical, :user_paid
  subscribe_queue :app_b_default, :user_action, :bus_event_type => /^user_/
  subscribe :homepage_method, :user_id => :present, :page => "homepage"

  def user_created(attributes)
    NameCount.find_or_create_by_name(attributes["last_name"]).increment!
  end

  def user_paid(attributes)
    CreditCard.charge!(attributes)
  end

  def user_action(attributes)
    Metrics.record_user_action(attributes["bus_event_type"], attributes["id"])
  end

  def homepage_method
    Mixpanel.homepage_action!(attributes["action"])
  end
end

Note: This subscribes when this class is loaded, so it needs to be in your load or otherwise referenced/required during app initialization to work properly.

Commands

Each app needs to tell Redis about its subscriptions:

$ rake resquebus:subscribe

The subscription block is run inside a Resque worker which needs to be started for each app.

$ rake resquebus:setup resque:work

The incoming queue also needs to be processed on a dedicated or all the app servers.

$ rake resquebus:driver resque:work

If you want retry to work for subscribing apps, you should run resque-scheduler

$ rake resque:scheduler

Heartbeat

We've found it useful to have the bus act like cron, triggering timed jobs throughout the system. Resque Bus calls this a heartbeat. It uses resque-scheduler to trigger the events. You can enable it in your Rakefile.

# resque.rake
namespace :resque do
  task :setup => [:environment] do
    ResqueBus.heartbeat!
  end
end

Or add it to your schedule.yml directly

resquebus_heartbeat:
  cron: "* * * * *"
  class: "::ResqueBus::Heartbeat"
  queue: resquebus_incoming
  description: "I publish a heartbeat_minutes event every minute"

It is the equivalent of doing this every minute

seconds = minutes * (60)
hours   = minutes / (60)
days    = minutes / (60*24)

now     = Time.at(seconds)

attributes = {}

now = Time.now
seconds = now.to_i
ResqueBus.publish("hearbeat_minutes", {
  "epoch_seconds" => seconds,
  "epoch_minutes" => seconds / 1.minute,
  "epoch_hours"   => seconds / 1.hour,
  "epoch_days"    => seconds / 1.day,
  "minute"        => now.min
  "hour"          => now.hour
  "day"           => now.day
  "month"         => now.month
  "year"          => now.year
  "yday"          => now.yday
  "wday"          => now.wday
})

This allows you do something like this:

ResqueBus.dispatch("app_c") do
  # runs at 10:20, 11:20, etc
  subscribe "once_an_hour", 'bus_event_type' => 'heartbeat_minutes', 'minute' => 20 do |attributes|
    Sitemap.generate!
  end
  
  # runs every five minutes
  subscribe "every_five_minutes", 'bus_event_type' => 'heartbeat_minutes' do |attributes|
    next unless attributes["epoch_minutes"] % 5 == 0
    HealthCheck.run!
  end
  
  # runs at 8am on the first of every month
  subscribe "new_month_morning", 'bus_event_type' => 'heartbeat_minutes', 'day' => 1, hour' => 8, 'minute' => 0,  do |attributes|
    next unless attributes["epoch_minutes"] % 5 == 0
    Token.old.expire!
  end
end

Compatibility

ResqueBus can live along side another instance of Resque that points at a different Redis server.

# config
Resque.redis = "192.168.1.0:6379"
ResqueBus.redis = "192.168.1.1:6379"

If no Redis instance is given specifically, ResqueBus will use the Resque one.

# config
Resque.redis = "192.168.1.0:6379"

That will use the default (resque) namespace which can be helpful for using the tooling. Conflict with queue names are unlikely. You can change the namespace if you like though.

# config
Resque.redis = "192.168.1.0:6379"
ResqusBus.redis.namespace = :get_on_the_bus

Local Mode

For development, a local mode is also provided and is specified in the configuration.

# config
ResqueBus.local_mode = :standalone
or 
ResqueBus.local_mode = :inline

Standalone mode does not require a separate resquebus:driver task to be running to process the incoming queue. Simply publishing to the bus will distribute the incoming events to the appropriate application specific queue. A separate resquebus:work task does still need to be run to process these events

Inline mode skips queue processing entirely and directly dispatches the event to the appropriate code block.

TODO

  • There are a few spots in the code with TODO notes
  • Make this not freak out in development without Redis or when Redis is down
  • We might not actually need to publish in tests
  • Add some rspec helpers for the apps to use: should_ post an event_publish or something along those lines
  • Allow calling resquebus:setup and resquebus:driver together (append to ENV['QUEUES'], don't replace it)

Copyright (c) 2011 Brian Leonard, released under the MIT license