A Rails engine to durably deliver schematized events to Amazon Kinesis via DelayedJob.
More specifically, journaled
is composed of three opinionated pieces:
schema definition/validation via JSON Schema, transactional enqueueing
via Delayed::Job (specifically delayed_job_active_record
), and event
transmission via Amazon Kinesis. Our current use-cases include
transmitting audit events for durable storage in S3 and/or analytical
querying in Amazon Redshift.
Journaled provides an at-least-once event delivery guarantee assuming Delayed::Job is configured not to delete jobs on failure.
Note: Do not use the journaled gem to build an event sourcing solution as it does not guarantee total ordering of events. It's possible we'll add scoped ordering capability at a future date (and would gladly entertain pull requests), but it is presently only designed to provide a durable, eventually consistent record that discrete events happened.
-
Install
delayed_job_active_record
if you haven't already. -
To integrate Journaled into your application, simply include the gem in your app's Gemfile.
gem 'journaled'
If you use rspec, add the following to your rails helper:
# spec/rails_helper.rb # ... your other requires require 'journaled/rspec'
-
You will also need to define the following environment variables to allow Journaled to publish events to your AWS Kinesis event stream:
JOURNALED_STREAM_NAME
Special case: if your
Journaled::Event
objects override the#journaled_app_name
method to a non-nil value e.g.my_app
, you will instead need to provide a corresponding[upcased_app_name]_JOURNALED_STREAM_NAME
variable for each distinct value, e.g.MY_APP_JOURNALED_STREAM_NAME
. You can provide a default value for allJournaled::Event
s in an initializer like this:Journaled.default_app_name = 'my_app'
You may optionally define the following ENV vars to specify AWS credentials outside of the locations that the AWS SDK normally looks:
RUBY_AWS_ACCESS_KEY_ID
RUBY_AWS_SECRET_ACCESS_KEY
You may also specify the region to target your AWS stream by setting
AWS_DEFAULT_REGION
. If you don't specify, Journaled will default tous-east-1
.You may also specify a role that the Kinesis AWS client can assume:
JOURNALED_IAM_ROLE_ARN
The AWS principal whose credentials are in the environment will need to be allowed to assume this role.
Journaling provides a number of different configuation options that can be set in Ruby using an initializer. Those values are:
This is described in the proceeding paragraph and is used to specify which app name to use, which corresponds to which Journaled Stream to send events too.
This is the default value for events that do NOT specify their own #journaled_app_name
. For events that define their own #journaled_app_name
method, that will take precedence over this default.
Ex: Journaled.default_app_name = 'my_app'
This can be used to configure what priority
the Delayed Jobs are enqueued with. This will be applied to all the Journaled::Devivery jobs that are created by this application.
Ex: Journaled.job_priority = 14
The number of seconds a persistent connection is allowed to sit idle before it should no longer be used.
The number of seconds before the :http_handler should timeout while trying to open a new HTTP session.
The number of seconds before the :http_handler should timeout while waiting for a HTTP response.
Both model-level directives accept additional options to be passed into DelayedJob's enqueue
method:
# For change journaling:
journal_changes_to :email, as: :identity_change, enqueue_with: { priority: 10 }
# Or for custom journaling:
journal_attributes :email, enqueue_with: { priority: 20, queue: 'journaled' }
Out of the box, Journaled
provides an event type and ActiveRecord
mix-in for durably journaling changes to your model, implemented via
ActiveRecord hooks. Use it like so:
class User < ApplicationRecord
include Journaled::Changes
journal_changes_to :email, :first_name, :last_name, as: :identity_change
end
Add the following to your controller base class for attribution:
class ApplicationController < ActionController::Base
include Journaled::Actor
self.journaled_actor = :current_user # Or your authenticated entity
end
Your authenticated entity must respond to #to_global_id
, which
ActiveRecords do by default.
Every time any of the specified attributes is modified, or a User
record is created or destroyed, an event will be sent to Kinesis with the following attributes:
id
- a random event-specific UUIDevent_type
- the constant valuejournaled_change
created_at
- when the event was createdtable_name
- the table name backing the ActiveRecord (e.g.users
)record_id
- the primary key of the record, as a string (e.g."300"
)database_operation
- one ofcreate
,update
,delete
logical_operation
- whatever logical operation you specified in yourjournal_changes_to
declaration (e.g.identity_change
)changes
- a serialized JSON object representing the latest values of any new or changed attributes from the specified set (e.g.{"email":"mynewemail@example.com"}
). Upon destroy, all specified attributes will be serialized as they were last stored.actor
- a string (usually a rails global_id) representing who performed the action.
Callback-bypassing database methods like update_all
, delete_all
,
update_columns
and delete
are intercepted and will require an
additional force: true
argument if they would interfere with change
journaling. Note that the less-frequently-used methods toggle
,
increment*
, decrement*
, and update_counters
are not intercepted at
this time.
If you use RSpec (and have required journaled/rspec
in your
spec/rails_helper.rb
), you can regression-protect important journaling
config with the journal_changes_to
matcher:
it "journals exactly these things or there will be heck to pay" do
expect(User).to journal_changes_to(:email, :first_name, :last_name, as: :identity_change)
end
For every custom implementation of journaling in your application, define the JSON schema for the attributes in your event.
This schema file should live in your Rails application at the top level and should be named in snake case to match the
class being journaled.
E.g.: your_app/journaled_schemas/my_class.json)
In each class you intend to use Journaled, include the Journaled::Event
module and define the attributes you want
captured. After completing the above steps, you can call the journal!
method in the model code and the declared
attributes will be published to the Kinesis stream. Be sure to call
journal!
within the same transaction as any database side effects of
your business logic operation to ensure that the event will eventually
be delivered if-and-only-if your transaction commits.
Example:
// journaled_schemas/contract_acceptance_event.json
{
"type": "object",
"title": "contract_acceptance_event",
"required": [
"user_id",
"signature"
],
"properties": {
"user_id": {
"type": "integer"
},
"signature": {
"type": "string"
}
}
}
# app/models/contract_acceptance_event.rb
ContractAcceptanceEvent = Struct.new(:user_id, :signature) do
include Journaled::Event
journal_attributes :user_id, :signature
end
# app/models/contract_acceptance.rb
class ContractAcceptance
include ActiveModel::Model
attr_accessor :user_id, :signature
def user
@user ||= User.find(user_id)
end
def contract_acceptance_event
@contract_acceptance_event ||= ContractAcceptanceEvent.new(user_id, signature)
end
def save!
User.transaction do
user.update!(contract_accepted: true)
contract_acceptance_event.journal!
end
end
end
An event like the following will be journaled to kinesis:
{
"id": "bc7cb6a6-88cf-4849-a4f0-a31b0b199c47", // A random event ID for idempotency filtering
"event_type": "contract_acceptance_event",
"created_at": "2019-01-28T11:06:54.928-05:00",
"user_id": 123,
"signature": "Sarah T. User"
}
Journaled provides a couple helper methods that may be useful in your custom events. You can add whichever you need your event types like this:
# my_event.rb
class MyEvent
include Journaled::Event
journal_attributes :commit_hash, :actor_uri # ... etc, etc
def commit_hash
Journaled.commit_hash
end
def actor_uri
Journaled.actor_uri
end
# ... etc, etc
end
If you choose to use it, you must provide a GIT_COMMIT
environment
variable. Journaled.commit_hash
will fail if it is undefined.
Returns one of the following in order of preference:
- The current controller-defined
journaled_actor
's GlobalID, if set - A string of the form
gid://[app_name]/[os_username]
if performed on the command line - a string of the form
gid://[app_name]
as a fallback
In order for this to be most useful, you must configure your controller as described in Change Journaling above.
Suggestions for enhancements to this engine are currently being tracked via Github Issues. Please feel free to open an issue for a desired feature, as well as for any observed bugs.