
Logs HTTP files to an S3 bucket at a configurable frequency.

Primary LanguageElixirMIT LicenseMIT


Logs HTTP files to an S3 bucket at a configurable frequency.

A delta is the place where a river merges into a larger body of water: in our case, it's where our various HTTP files are merged into our data lake.


Delta is built around a GenStage pipeline, taking data from Producers and writing it to Sinks.

The basic unit is a File, a chunk of possibly-encoded data.

Delta is configured by JSON. Example:

  "producers": {
    "polling_producer_name": {
      "url": "https://cdn.mbta.com/realtime/VehiclePositions.pb",
    "webhook_producer_name": {
      "type": "webhook",
  "sinks": {
    "s3_sink_name": {
      "type": "s3",
      "bucket": "bucket_name",
      "producers": [
    "log_sink_name": {
      "producers": [


Delta produces files from two sources: polled HTTP endpoints and webhooks.

HTTP Polling

Polling sources make an HTTP request for data at a configurable interval.

Configuration options:

  • url: Required.
  • frequency: Default 60000.
  • headers: Default {}. Example: {"content-type": "application/x-protobuf"}.
  • filters: Default []. See Filters below.

S3 Polling

Makes a request to S3, authenticated by IAM

Configuration options:

  • type: Must be "s3".
  • bucket: Required string.
  • path: Required string.
  • frequency: Default 60000.
  • filters: Default []. See Filters below.


Webhooks accept an HTTP POST to /webhook/:webhook_producer_name, and treat the body of the request as the content of the file.

Configuration options:

  • type: Must be "webhook".
  • authorization: A string. Default null. If present, checks that incoming requests have an "authorization" header set to this value.
  • filters: Default []. See Filters below.


Both polling and webhook sources also accept filters, which can arbitrarily process the File into 0 or more Files. Some example filters:

  • ensure the body is GZip-compressed (or not compressed)
  • convert a JSON body into multiple Files based on an access path
  • rename the File based on an access path
  • set the updated_at value based on an access path

Each entry in the configuration is a list whose first element is the string name of a function in lib/delta/file.ex, and any further elements are arguments to pass to that function.


"filters": [
  ["json_updated_at", ["metadata", "timestamp"]],

All producers will always finish with ["ensure_content_type"] and ["ensure_gzipped"].


Sinks take the generated Files and write them somewhere.


The most useful sink writes the Files to an Amazon S3 bucket.

Writes to the given bucket at the path {prefix}/{year}/{month}/{day}/{time}_{url}.gz.

Configuration options:

  • type: must be "s3".
  • bucket: Required string.
  • prefix: Default "". Will prepend this to all file names it writes to s3.
  • acl: Default "public-read". Passed to S3.put_object.
  • producers: Required list of string producer names.
  • filename_rewrites: Default []. Contains a list of maps like - %{pattern: "old_value", replacement: "new_value"} - this will be applied to the resulting s3 filename. Note: the full collection gets applied to every resultant producer filename, so these configurations should be relatively specific.


Useful for debugging, the Log sink logs a message for each File.

Configuration options:

  • type: must be "log".
  • producers: Required list of string producer names.


Delta uses the content-type and content-encoding headers. If the content-type is missing, it can fall back to the known extensions listed in config/config.exs, with a default of application/octet-stream. The only supported content-encoding is "gzip".


asdf install
mix deps.get
env DELTA_JSON="$(cat config.json)" mix phx.server

Delta loads configuration from the DELTA_JSON environment variable. With MIX_ENV=dev, it will fall back to priv/default_configuration.json.