/mingle_events

Simple means of processing events from Mingle's Events API in a pipes and filters style

Primary LanguageRubyApache License 2.0Apache-2.0

Mingle Events

Overview

Mingle 3.3 introduced a new Events API in the form of an Atom feed. The Mingle team and ThoughtWorks Studios are big believers in the use of Atom for exposing events. Atom is a widely used standard, and this event API style puts the issue of robust event delivery in the hands of the consumer, where it belongs. In fact, we’d argue this is the only feasible means of robust, scalable event delivery, short of spending hundreds of thousands or millions of dollars on enterprise buses and such. Atom-delivered events are cheap, scalable, standards-based, and robust.

However, we do accept that asking integrators wishing to consume events to implement polling is not ideal. Writing polling consumers can be tedious. And this tedium gets in the way of writing sweet Mingle integrations. We are addressing this by publishing libraries such as this, which if effective, fully hide the mechanics of event polling from the consumer. The consumer only need worry about the processing of events.

The library supports both basic event analysis as well as polling for purposes of filtering and processing, e.g., re-publishing. The polling and processing portion is modeled in the style of ‘pipes and filters.’

Installation

gem install mingle_events

Source

Hosted on github

git clone git://github.com/ThoughtWorksStudios/mingle_events.git

Quick examples

Get the latest events, from time zero

The event fetcher will manage it’s own state, so fetch_latest can be called repeatedly to fetch only new events. The first fetch will crawl all the way back to the very first event in the project’s history.

mingle_access = MingleEvents::MingleBasicAuthAccess.new('https://localhost:7071', 'david', 'p')
event_fetcher = MingleEvents::ProjectEventFetcher.new('my_project', mingle_access)
latest_events = event_fetcher.fetch_latest

Get the latest events, from now

Similar to the previous example, but a call to reset_to_now will tell the fetcher only to pull new stuff, starting now. If reset_now has been previously called, it will be ignored, and events will be fetched to the last seen event. If you have called reset_now previously, but really do wish to reset to now, you can call reset followed by reset_to_now.

mingle_access = MingleEvents::MingleBasicAuthAccess.new('https://localhost:7071', 'david', 'p')
event_fetcher = MingleEvents::ProjectEventFetcher.new('my_project', mingle_access)
latest_events = event_fetcher.reset_to_now
latest_events = event_fetcher.fetch_latest

Historical analysis, via event playback

One of the great usages of event playback is historical analysis. E.g., if you wanted to see the number of stories added each day to your project, you could write a processor that counts new stories and takes a snapshot at the end of each day, where the end of the day is the first time you see an event for the next day. If you are doing this sort of analysis you will not want to poll the entire project’s event history every time as that is quite time consuming.

The assumption is that you’ve previously fetched the latest events. Or at least you’ve fetched the events in which you are interested. Use all_fetched_entries to load previously fetched events, they’re all cached on disk so you don’t pay the prices of retrieving from Mingle.

event_fetcher = MingleEvents::ProjectEventFetcher.new('my_project', mingle_access)
event_fetcher.all_fetched_entries.each do |e|
  # do something interesting with each event (see examples folder for real analysis examples)
end  

Polling and processing events via a pipeline

You can poll Mingle for new events at regular intervals and process those events to do things such as filter to events you are interested in and post them to other systems with which you wish to integrate Mingle. This example posts all new comments to an HTTP end point. You’d need to use cron or a similar scheduler to run it at regular intervals.

post_comments_to_another_service = MingleEvents::Processors::Pipeline.new([
  MingleEvents::Processors::CategoryFilter.new([MingleEvents::Feed::Category::COMMENT_ADDITION]),
  MingleEvents::Processors::HttpPostPublisher.new('http://localhost:4567/')
])
MingleEvents::Poller.new(mingle_access, {'test_project' => [post_comments_to_another_service]}).run_once  

A more detailed dive into the polling and processing design follows.

Polling and processing

High level design

The Poller class can pump the stream of a Mingle project’s events through a pipeline of processors that you specify. The processors can do things such as “filter out any events that are not sourced from a Story” or “post an event to an HTTP end-point.”

As stated in the opening paragraph, the aim of this library is to hide the mechanics of event polling, making the user’s focus solely the definition of the processing pipeline. This library supplies fundamental event processors, such as card type filters, atom category filters, and http publishers. This library should also make it easy for you to write custom processors.

Processors, filters, and pipelines

Processors, filters, and pipelines are all processors with the same interface. The fundamental model for pipes and filters, or pipelining, is that there is a single, common interface for processing input and returning output. In this context of Mingle event processing, the interface is basically “events in, events out” where “events in” is the list of unprocessed events and “events out” are the processed events. Processed events might be enriched, filtered, untouched but emailed, etc.

This library ships the following processors:

  • MingleEvents::Processors::CardData — loads data for each card that sourced an event in the current stream. This processor requires some special handling (see next section).
  • MingleEvents::Processors::CardTypeFilter — filters events to those sourced by cards of specific card type(s)
  • MingleEvents::Processors::CategoryFilter — filters events to those with specified Atom categories. Mingle’s Atom Categories are specified in MingleEvents::Category
  • MingleEvents::Processors::CustomPropertyFilter — filters events to those with the specified value of a single project-level custom property
  • MingleEvents::Processors::HttpPostPublisher — posts event’s raw XML to an HTTP endpoint
  • MingleEvents::Processors::Pipeline — manages to processing of events by a sequence of processors

Card Data

CardData is a special processor in that it implements a second interface, beyond event processing. This interface is one that allows the lookup of data for the card that sourced the event (if the event was actually sourced by a card). As looking up card data requires accessing additional Mingle server resources, you want to take special care that you don’t make repeated requests for the same resources. If you have multiple processors requiring CardData, be sure to use a single instance of CardData across your entire pipeline.

card_data = MingleEvents::Processors::CardData.new(mingle_access, 'test_project')
    
post_commenting_on_high_priority_bugs_and_stories = MingleEvents::Processors::Pipeline.new([
   MingleEvents::Processors::CardTypeFilter.new(['story', 'bug'], card_data),
   MingleEvents::Processors::CustomPropertyFilter.new('Priority', 'High', card_data),
   MingleEvents::Processors::CategoryFilter.new([MingleEvents::Category::COMMENT_ADDITION]),
   MingleEvents::Processors::HttpPostPublisher.new('http://otherissuetracker.example.com/comments')
])

Note that CardData will provide data for the version of the card that was created by the event you are processing and not the current version of the card.

Writing your own processor

In ruby code, the processing interface is a single method named ‘process_events’ that has a single parameter, the list of unprocessed events’ and returns a list of the processed events.

Here’s a processor that simply logs events:

class MyPutsProcessor
  def process_events(events)
    events.each do |event|
      puts event
    end
  end
end

Here’s a filtering processor that removes any event without an Atom category term of ‘foo’:

class MyFooFilter
  def process_events(events)
    events.select do |event|
      event.categories.any?{|category| category.term == 'foo'}
    end
  end
end

Be absolutely sure that any processor you write returns a list of events. If you fail to do this, any pipeline using this processor will not function correctly.

Each event that is passed to the processor is an instance of type MingleEvents::Entry which is a Ruby wrapper around an Atom event. The Entry class makes it easy to access information such as author, Atom categories, whether the event was sourced by a card, etc. As the model is not yet complete, the Entry class also exposes the raw XML of the entry.

Retry & Error handling

As of now, retry is not implemented. If an error occurs during event processing, the error will be logged and processing will stop. The next run will re-start at the point of the last error.

Events and entries

You might get confused looking at the source code, documentation, etc. as to what’s an Atom entry and what’s a Mingle event. We’re still trying to clean that up a bit, but for all intents and purposes, they are the same thing. The Atom feed represents Mingle events in the form of Atom entries. For the most part we try to use the word ‘entry’ in the context of the feed and ‘event’ in the context of processing, but there’s still cleanup to be done.