/firefighter

Amazon Kinesis Data Firehose configurable queue supporting arbitrary adapters

Primary LanguageElixirMIT LicenseMIT

👨‍🚒 firefighter

Build Status Hex pm

Amazon Kinesis Data Firehose configurable queue supporting arbitrary adapters.

Motivation

When you want to integrate with Amazon Kinesis Data Firehose, you will most likely want to batch the requests you do in order to not hit Amazon limits. Hence, you'd ideally have an abstraction that allows you to push data, automatically buffering it and pumping data to any given stream from time to time. This is what firefighter does.

You can configure different options (e.g., :batch_size, :interval, :delimiter, :flush_grace_period) which should be tuned to your specific usage. Defaults are as follows:

  • :batch_size: 40
  • :interval: 2_000 (milliseconds, i.e., 2 seconds)
  • :delimiter: "" (i.e., the empty string)
  • :flush_grace_period: 30_000 (milliseconds, i.e., 30 seconds)

Installation

The package can be installed by adding firefighter to your list of dependencies in mix.exs:

def deps do
  [
    {:firefighter, "~> 0.1.2"} # check most recent version in this project's mix.exs
  ]
end

Usage

You should configure firefighter in, e.g., config/config.exs and select your specific adapter.

# config/config.exs

config :firefighter, :adapter, Firefighter.Adapters.ExAws

Adapters provide implementations for the underlying libraries you may use to pump data to Firehose. By default, we provide a logger adapter that just logs data. We also provide an adapter for ex_aws out of the box. It should be easy enough to expand on this to provide more adapters (e.g., a new adapter for aws-elixir).

Example

# config/config.exs

config :firefighter, :adapter, Firefighter.Adapters.Logger
# config/prod.exs

config :firefighter, :adapter, Firefighter.Adapters.ExAws
# lib/example/application.ex

defmodule Example.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {Firefighter, [name: :my_firefighter, delivery_stream_name: "s3-firehose", batch_size: 10]}
    ]

    opts = [strategy: :one_for_one, name: Example.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
# lib/example.ex

defmodule Example do
  def run do
    pid = Process.whereis(:my_firefighter)
    for i <- 0..30, do: Firefighter.push(pid, "sample-data-#{i}")
    pid
  end
end

Alternative Usage

As an alternative usage method, you might also decide to go with the Firefighter.Execution abstraction, provided out of the box:

Execution.start(%{user_id: "user-1", post_id: "post-123"})
|> Execution.record(%{age: 29})
|> Execution.push(:my_firefighter)

You can also just hose it directly, without ceremony, if you're pumping a simple record that needs no composition:

Execution.hose(:my_firefighter, %{user_id: "user-1", post_id: "post-123", age: 29})

You can also use pids instead of the process name (:my_firefighter in the example above).

For a detailed example project using firefighter, check the example/ directory.

License

Copyright © 2020-present Daniel Serrano <danieljdserrano at protonmail>

This work is free. You can redistribute it and/or modify it under the
terms of the MIT License. See the LICENSE file for more details.

Made in Portugal 🇵🇹 by dnlserrano