A Rails engine to durably deliver schematized events to Amazon Kinesis via ActiveJob.
More specifically, journaled
is composed of three opinionated pieces:
schema definition/validation via JSON Schema, transactional enqueueing
via ActiveJob (specifically, via a DB-backed queue adapter), and event
transmission via Amazon Kinesis. Our current use-cases include
transmitting audit events for durable storage in S3 and/or analytical
querying in Amazon Redshift.
Journaled provides an at-least-once event delivery guarantee assuming ActiveJob's queue adapter is not configured to delete jobs on failure.
Note: Do not use the journaled gem to build an event sourcing solution as it does not guarantee total ordering of events. It's possible we'll add scoped ordering capability at a future date (and would gladly entertain pull requests), but it is presently only designed to provide a durable, eventually consistent record that discrete events happened.
See upgrades below if you're upgrading from an older journaled
version!
-
If you haven't already, configure ActiveJob to use one of the following queue adapters:
:delayed_job
(viadelayed_job_active_record
):que
:good_job
:delayed
Ensure that your queue adapter is not configured to delete jobs on failure.
If you launch your application in production mode and the gem detects that
ActiveJob::Base.queue_adapter
is not in the above list, it will raise an exception and prevent your application from performing unsafe journaling. -
To integrate Journaled into your application, simply include the gem in your app's Gemfile.
gem 'journaled'
If you use rspec, add the following to your rails helper:
# spec/rails_helper.rb # ... your other requires require 'journaled/rspec'
-
You will need to set the following config in an initializer to allow Journaled to publish events to your AWS Kinesis event stream:
Journaled.default_stream_name = "my_app_#{Rails.env}_events"
You may also define a
#journaled_stream_name
method onJournaled::Event
instances:def journaled_stream_name "my_app_#{Rails.env}_alternate_events" end
-
You may also need to define environment variables to allow Journaled to publish events to your AWS Kinesis event stream:
You may optionally define the following ENV vars to specify AWS credentials outside of the locations that the AWS SDK normally looks:
RUBY_AWS_ACCESS_KEY_ID
RUBY_AWS_SECRET_ACCESS_KEY
You may also specify the region to target your AWS stream by setting
AWS_DEFAULT_REGION
. If you don't specify, Journaled will default tous-east-1
.You may also specify a role that the Kinesis AWS client can assume:
JOURNALED_IAM_ROLE_ARN
The AWS principal whose credentials are in the environment will need to be allowed to assume this role.
Journaling provides a number of different configuation options that can be set in Ruby using an initializer. Those values are:
This is described in the "Installation" section above, and is used to specify which stream name to use.
This can be used to configure what priority
the ActiveJobs are enqueued with. This will be applied to all the Journaled::DeliveryJob
s that are created by this application.
Ex: Journaled.job_priority = 14
Note that job priority is only supported on Rails 6.0+. Prior Rails versions will ignore this parameter and enqueue jobs with the underlying ActiveJob adapter's default priority.
The number of seconds a persistent connection is allowed to sit idle before it should no longer be used.
The number of seconds before the :http_handler should timeout while trying to open a new HTTP session.
The number of seconds before the :http_handler should timeout while waiting for a HTTP response.
Both model-level directives accept additional options to be passed into ActiveJob's set
method:
# For change journaling:
journal_changes_to :email, as: :identity_change, enqueue_with: { priority: 10 }
# For audit logging:
has_audit_log enqueue_with: { priority: 30 }
# Or for custom journaling:
journal_attributes :email, enqueue_with: { priority: 20, queue: 'journaled' }
Before using Journaled::Changes
or Journaled::AuditLog
, you will want to
set up automatic "actor" attribution (i.e. tracking the current user session).
To enable this feature, add the following to your controller base class for
attribution:
class ApplicationController < ActionController::Base
include Journaled::Actor
self.journaled_actor = :current_user # Or your authenticated entity
end
Your authenticated entity must respond to #to_global_id
, which ActiveRecords do by default.
This feature relies on ActiveSupport::CurrentAttributes
under the hood.
Out of the box, Journaled
provides an event type and ActiveRecord
mix-in for durably journaling changes to your model, implemented via
ActiveRecord hooks. Use it like so:
class User < ApplicationRecord
include Journaled::Changes
journal_changes_to :email, :first_name, :last_name, as: :identity_change
end
Every time any of the specified attributes is modified, or a User
record is created or destroyed, an event will be sent to Kinesis with the following attributes:
id
- a random event-specific UUIDevent_type
- the constant valuejournaled_change
created_at
- when the event was createdtable_name
- the table name backing the ActiveRecord (e.g.users
)record_id
- the primary key of the record, as a string (e.g."300"
)database_operation
- one ofcreate
,update
,delete
logical_operation
- whatever logical operation you specified in yourjournal_changes_to
declaration (e.g.identity_change
)changes
- a serialized JSON object representing the latest values of any new or changed attributes from the specified set (e.g.{"email":"mynewemail@example.com"}
). Upon destroy, all specified attributes will be serialized as they were last stored.actor
- a string (usually a rails global_id) representing who performed the action.
Callback-bypassing database methods like update_all
, delete_all
,
update_columns
and delete
are intercepted and will require an
additional force: true
argument if they would interfere with change
journaling. Note that the less-frequently-used methods toggle
,
increment*
, decrement*
, and update_counters
are not intercepted at
this time.
Journaled includes a feature for producing audit logs of changes to your model.
Unlike Journaled::Changes
, which will emit individual sets of changes as
"logical" events, Journaled::AuditLog
will log all changes in their entirety,
unless otherwise told to ignore changes to specific columns.
This behavior is similar to papertrail, audited, and logidze, except instead of storing changes/versions locally (in your application's database), it emits them to Kinesis (as Journaled events).
To enable audit logging for a given record, use the has_audit_log
directive:
class MyModel < ApplicationRecord
has_audit_log
# This class will now be audited,
# but will ignore changes to `created_at` and `updated_at`.
end
To ignore changes to additional columns, use the ignore
option:
class MyModel < ApplicationRecord
has_audit_log ignore: :last_synced_at
# This class will be audited,
# and will ignore changes to `created_at`, `updated_at`, and `last_synced_at`.
end
By default, changes to updated_at
and created_at
will be ignored (since
these generally change on every update), but this behavior can be reconfigured:
# change the defaults:
Journaled::AuditLog.default_ignored_columns = %i(createdAt updatedAt)
# or append new defaults:
Journaled::AuditLog.default_ignored_columns += %i(modified_at)
# or disable defaults entirely:
Journaled::AuditLog.default_ignored_columns = []
Subclasses will inherit audit log configs:
class MyModel < ApplicationRecord
has_audit_log ignore: :last_synced_at
end
class MySubclass < MyModel
# this class will be audited,
# and will ignore `created_at`, `updated_at`, and `last_synced_at`.
end
To disable audit logs on subclasses, use skip_audit_log
:
class MySubclass < MyModel
skip_audit_log
end
Subclasses may specify additional columns to ignore (which will be merged into the inherited list):
class MySubclass < MyModel
has_audit_log ignore: :another_field
# this class will ignore `another_field`, IN ADDITION TO `created_at`, `updated_at`,
# and any other fields specified by the parent class.
end
To temporarily disable audit logging globally, use the without_audit_logging
directive:
Journaled::AuditLog.without_audit_logging do
# Any operation in here will skip audit logging
end
Whenever an audited record is created, updated, or destroyed, a
journaled_audit_log
event is emitted. For example, calling
user.update!(name: 'Bart')
would result in an event that looks something like
this:
{
"id": "bc7cb6a6-88cf-4849-a4f0-a31b0b199c47",
"event_type": "journaled_audit_log",
"created_at": "2022-01-28T11:06:54.928-05:00",
"class_name": "User",
"table_name": "users",
"record_id": "123",
"database_operation": "update",
"changes": { "name": ["Homer", "Bart"] },
"snapshot": null,
"actor": "gid://app_name/AdminUser/456",
"tags": {}
}
The field breakdown is as follows:
id
: a randomly-generated ID for the event itselfevent_type
: the type of event (alwaysjournaled_audit_log
)created_at
: the time that the action occurred (should matchupdated_at
on the ActiveRecord)class_name
: the name of the ActiveRecord classtable_name
: the underlying table that the class interfaces withrecord_id
: the primary key of the ActiveRecorddatabase_operation
: the type of operation (insert
,update
, ordelete
)changes
: the changes to the record, in the form of"field_name": ["from_value", "to_value"]
snapshot
: an (optional) snapshot of all of the record's columns and their values (see below).actor
: the currentJournaled.actor
tags
: the currentJournaled.tags
When records are created, updated, and deleted, the changes
field is populated
with only the columns that changed. While this keeps event payload size down, it
may make it harder to reconstruct the state of the record at a given point in
time.
This is where the snapshot
field comes in! To produce a full snapshot of a
record as part of an update, set use the virtual _log_snapshot
attribute, like
so:
my_user.update!(name: 'Bart', _log_snapshot: true)
Or to produce snapshots for all records that change for a given operation,
wrap it a with_snapshots
block, like so:
Journaled::AuditLog.with_snapshots do
ComplicatedOperation.run!
end
Snapshots can also be enabled globally for all deletion operations. Since
changes
will be empty on deletion, you should consider using this if you care
about the contents of any records being deleted (and/or don't have a full audit
trail from their time of creation):
Journaled::AuditLog.snapshot_on_deletion = true
Events with snapshots will continue to populate the changes
field, but will
additionally contain a snapshot with the full state of the user:
{
"...": "...",
"changes": { "name": ["Homer", "Bart"] },
"snapshot": { "name": "Bart", "email": "simpson@example.com", "favorite_food": "pizza" },
"...": "..."
}
Both changes
and snapshot
will filter out sensitive fields, as defined by
your Rails.application.config.filter_parameters
list:
{
"...": "...",
"changes": { "ssn": ["[FILTERED]", "[FILTERED]"] },
"snapshot": { "ssn": "[FILTERED]" },
"...": "..."
}
They will also filter out any fields whose name ends in _crypt
or _hmac
, as
well as fields that rely on Active Record Encryption / encrypts
(introduced
in Rails 7).
This is done to avoid emitting values to locations where it is difficult or impossible to rotate encryption keys (or otherwise scrub values after the fact), and currently there is no built-in configuration to bypass this behavior. If you need to track changes to sensitive/encrypted fields, it is recommended that you store the values in a local history table (still encrypted, of course!).
Because Journaled events are not guaranteed to arrive in order, events emitted
by Journaled::AuditLog
must be sorted by their created_at
value, which
should correspond roughly to the time that the SQL statement was issued.
There is currently no other means of globally ordering audit log events,
making them susceptible to clock drift and race conditions.
These issues may be mitigated on a per-model basis via
ActiveRecord::Locking::Optimistic
(and its auto-incrementing lock_version
column), and/or by careful use of other locking mechanisms.
For every custom implementation of journaling in your application, define the JSON schema for the attributes in your event.
This schema file should live in your Rails application at the top level and should be named in snake case to match the
class being journaled.
E.g.: your_app/journaled_schemas/my_class.json)
In each class you intend to use Journaled, include the Journaled::Event
module and define the attributes you want
captured. After completing the above steps, you can call the journal!
method in the model code and the declared
attributes will be published to the Kinesis stream. Be sure to call
journal!
within the same transaction as any database side effects of
your business logic operation to ensure that the event will eventually
be delivered if-and-only-if your transaction commits.
Example:
// journaled_schemas/contract_acceptance_event.json
{
"type": "object",
"title": "contract_acceptance_event",
"required": [
"user_id",
"signature"
],
"properties": {
"user_id": {
"type": "integer"
},
"signature": {
"type": "string"
}
}
}
# app/models/contract_acceptance_event.rb
ContractAcceptanceEvent = Struct.new(:user_id, :signature) do
include Journaled::Event
journal_attributes :user_id, :signature
end
# app/models/contract_acceptance.rb
class ContractAcceptance
include ActiveModel::Model
attr_accessor :user_id, :signature
def user
@user ||= User.find(user_id)
end
def contract_acceptance_event
@contract_acceptance_event ||= ContractAcceptanceEvent.new(user_id, signature)
end
def save!
User.transaction do
user.update!(contract_accepted: true)
contract_acceptance_event.journal!
end
end
end
An event like the following will be journaled to kinesis:
{
"id": "bc7cb6a6-88cf-4849-a4f0-a31b0b199c47", // A random event ID for idempotency filtering
"event_type": "contract_acceptance_event",
"created_at": "2019-01-28T11:06:54.928-05:00",
"user_id": 123,
"signature": "Sarah T. User"
}
Events may be optionally marked as "tagged." This will add a tags
field, intended for tracing and
auditing purposes.
class MyEvent
include Journaled::Event
journal_attributes :attr_1, :attr_2, tagged: true
end
You may then use Journaled.tag!
and Journaled.tagged
inside of your
ApplicationController
and ApplicationJob
classes (or anywhere else!) to tag
all events with request and job metadata:
class ApplicationController < ActionController::Base
before_action do
Journaled.tag!(request_id: request.request_id, current_user_id: current_user&.id)
end
end
class ApplicationJob < ActiveJob::Base
around_perform do |job, perform|
Journaled.tagged(job_id: job.id) { perform.call }
end
end
This feature relies on ActiveSupport::CurrentAttributes
under the hood, so these tags are local to
the current thread, and will be cleared at the end of each request request/job.
Journaled provides a couple helper methods that may be useful in your custom events. You can add whichever you need your event types like this:
# my_event.rb
class MyEvent
include Journaled::Event
journal_attributes :commit_hash, :actor_uri # ... etc, etc
def commit_hash
Journaled.commit_hash
end
def actor_uri
Journaled.actor_uri
end
# ... etc, etc
end
If you choose to use it, you must provide a GIT_COMMIT
environment
variable. Journaled.commit_hash
will fail if it is undefined.
Returns one of the following in order of preference:
- The current controller-defined
journaled_actor
's GlobalID, if set - A string of the form
gid://[app_name]/[os_username]
if performed on the command line - a string of the form
gid://[app_name]
as a fallback
In order for this to be most useful, you must configure your controller as described in Attribution above.
If you use RSpec, you can test for journaling behaviors with the
journal_event(s)_including
and journal_changes_to
matchers. First, make
sure to require journaled/rspec
in your spec setup (e.g.
spec/rails_helper.rb
):
require 'journaled/rspec'
The journal_event_including
and journal_events_including
matchers allow you
to check for one or more matching event being journaled:
expect { my_code }
.to journal_event_including(name: 'foo')
expect { my_code }
.to journal_events_including({ name: 'foo', value: 1 }, { name: 'foo', value: 2 })
This will only perform matches on the specified fields (and will not match one way or the other against unspecified fields). These matchers will also ignore any extraneous events that are not positively matched (as they may be unrelated to behavior under test).
When writing tests, pairing every positive assertion with a negative assertion
is a good practice, and so negative matching is also supported (via both
.not_to
and .to not_
):
expect { my_code }
.not_to journal_events_including({ name: 'foo' }, { name: 'bar' })
expect { my_code }
.to raise_error(SomeError)
.and not_journal_event_including(name: 'foo') # the `not_` variant can chain off of `.and`
Several chainable modifiers are also available:
expect { my_code }.to journal_event_including(name: 'foo')
.with_schema_name('my_event_schema')
.with_partition_key(user.id)
.with_stream_name('my_stream_name')
.with_enqueue_opts(run_at: future_time)
.with_priority(999)
All of this can be chained together to test for multiple sets of events with multiple sets of options:
expect { subject.journal! }
.to journal_events_including({ name: 'event1', value: 300 }, { name: 'event2', value: 200 })
.with_priority(10)
.and journal_event_including(name: 'event3', value: 100)
.with_priority(20)
.and not_journal_event_including(name: 'other_event')
The journal_changes_to
matcher checks against the list of attributes specified
on the model. It does not actually test that an event is emitted within a given
codepath, and is instead intended to guard against accidental regressions that
may impact external consumers of these events:
it "journals exactly these things or there will be heck to pay" do
expect(User).to journal_changes_to(:email, :first_name, :last_name, as: :identity_change)
end
When an event is enqueued, an ActiveSupport::Notification
titled
journaled.event.enqueue
is emitted. Its payload will include the :event
and
its background job :priority
.
This can be forwarded along to your preferred monitoring solution via a Rails initializer:
ActiveSupport::Notifications.subscribe('journaled.event.enqueue') do |*args|
payload = ActiveSupport::Notifications::Event.new(*args).payload
journaled_event = payload[:event]
tags = { priority: payload[:priority], event_type: journaled_event.journaled_attributes[:event_type] }
Statsd.increment('journaled.event.enqueue', tags: tags.map { |k,v| "#{k.to_s[0..64]}:#{v.to_s[0..255]}" })
end
Since this gem relies on background jobs (which can remain in the queue across code releases), this gem generally aims to support jobs enqueued by the prior gem version.
As such, we always recommend upgrading only one major version at a time.
Versions of Journaled prior to 5.0 would enqueue events one at a time, but 5.0
introduces a new transaction-aware feature that will bundle up all events
emitted within a transaction and enqueue them all in a single "batch" job
directly before the SQL COMMIT
statement. This reduces the database impact of
emitting a large volume of events at once.
This feature can be disabled conditionally:
Journaled.transactional_batching_enabled = false
And can then be enabled via the following block:
Journaled.with_transactional_batching do
# your code
end
Backwards compatibility has been included for background jobs enqueued by version 4.0 and above, but has been dropped for jobs emitted by versions prior to 4.0. (Again, be sure to upgrade only one major version at a time.)
Versions of Journaled prior to 4.0 relied directly on environment variables for stream names, but now stream names are configured directly. When upgrading, you can use the following configuration to maintain the previous behavior:
Journaled.default_stream_name = ENV['JOURNALED_STREAM_NAME']
If you previously specified a Journaled.default_app_name
, you would have required a more precise environment variable name (substitute {{upcase_app_name}}
):
Journaled.default_stream_name = ENV["{{upcase_app_name}}_JOURNALED_STREAM_NAME"]
And if you had defined any journaled_app_name
methods on Journaled::Event
instances, you can replace them with the following:
def journaled_stream_name
ENV['{{upcase_app_name}}_JOURNALED_STREAM_NAME']
end
When upgrading from 3.1 or below, Journaled::DeliveryJob
will handle any jobs that remain in the queue by accepting an app_name
argument. This behavior will be removed in version 5.0, so it is recommended to upgrade one major version at a time.
Versions of Journaled prior to 3.0 relied direclty on delayed_job
and a "performable" class called Journaled::Delivery
.
In 3.0, this was superceded by an ActiveJob class called Journaled::DeliveryJob
, but the Journaled::Delivery
class was not removed until 4.0.
Therefore, when upgrading from 2.5.0 or below, it is recommended to first upgrade to 3.1.0 (to allow any Journaled::Delivery
jobs to finish working off) before upgrading to 4.0+.
The upgrade to 3.1.0 will require a working ActiveJob config. ActiveJob can be configured globally by setting ActiveJob::Base.queue_adapter
, or just for Journaled jobs by setting Journaled::DeliveryJob.queue_adapter
.
The :delayed_job
queue adapter will allow you to continue relying on delayed_job
. You may also consider switching your app(s) to delayed
and using the :delayed
queue adapter.
Suggestions for enhancements to this engine are currently being tracked via Github Issues. Please feel free to open an issue for a desired feature, as well as for any observed bugs.