Logs HTTP files to an S3 bucket at a configurable frequency.
A delta is the place where a river merges into a larger body of water: in our case, it's where our various HTTP files are merged into our data lake.
Delta is built around a GenStage pipeline, taking data from Producers and writing it to Sinks.
The basic unit is a File, a chunk of possibly-encoded data.
Delta is configured by JSON. Example:
{
"producers": {
"polling_producer_name": {
"url": "https://cdn.mbta.com/realtime/VehiclePositions.pb",
},
"webhook_producer_name": {
"type": "webhook",
},
},
"sinks": {
"s3_sink_name": {
"type": "s3",
"bucket": "bucket_name",
"producers": [
"polling_producer_name",
"webhook_producer_name"
]
},
"log_sink_name": {
"producers": [
"polling_producer_name",
"webhook_producer_name"
]
}
}
}
Delta produces files from two sources: polled HTTP endpoints and webhooks.
Polling sources make an HTTP request for data at a configurable interval.
Configuration options:
url
: Required.frequency
: Default 60000.headers
: Default{}
. Example:{"content-type": "application/x-protobuf"}
.filters
: Default[]
. See Filters below.
Makes a request to S3, authenticated by IAM
Configuration options:
type
: Must be"s3"
.bucket
: Required string.path
: Required string.frequency
: Default 60000.filters
: Default[]
. See Filters below.
Webhooks accept an HTTP POST to /webhook/:webhook_producer_name
, and treat the body of
the request as the content of the file.
Configuration options:
type
: Must be"webhook"
.authorization
: A string. Defaultnull
. If present, checks that incoming requests have an"authorization"
header set to this value.filters
: Default[]
. See Filters below.
Both polling and webhook sources also accept filters, which can arbitrarily process the File into 0 or more Files. Some example filters:
- ensure the body is GZip-compressed (or not compressed)
- convert a JSON body into multiple Files based on an access path
- rename the File based on an access path
- set the
updated_at
value based on an access path
Each entry in the configuration is a list whose first element is the string name of a function in lib/delta/file.ex
, and any further elements are arguments to pass to that function.
Example:
"filters": [
["ensure_not_encoded"],
["json_updated_at", ["metadata", "timestamp"]],
]
All producers will always finish with ["ensure_content_type"]
and ["ensure_gzipped"]
.
Sinks take the generated Files and write them somewhere.
The most useful sink writes the Files to an Amazon S3 bucket.
Writes to the given bucket at the path {prefix}/{year}/{month}/{day}/{time}_{url}.gz
.
Configuration options:
type
: must be"s3"
.bucket
: Required string.prefix
: Default""
. Will prepend this to all file names it writes to s3.acl
: Default"public-read"
. Passed toS3.put_object
.producers
: Required list of string producer names.filename_rewrites
: Default[]
. Contains a list of maps like -%{pattern: "old_value", replacement: "new_value"}
- this will be applied to the resulting s3 filename. Note: the full collection gets applied to every resultant producer filename, so these configurations should be relatively specific.
Useful for debugging, the Log sink logs a message for each File.
Configuration options:
type
: must be"log"
.producers
: Required list of string producer names.
Delta uses the content-type
and content-encoding
headers. If the content-type
is missing, it can fall back to the known extensions listed in config/config.exs
, with a default of application/octet-stream
. The only supported content-encoding
is "gzip"
.
asdf install
mix deps.get
env DELTA_JSON="$(cat config.json)" mix phx.server
Delta loads configuration from the DELTA_JSON
environment variable.
With MIX_ENV=dev
, it will fall back to priv/default_configuration.json.