/simmer

Reduce your data. A unix filter for algebird-powered aggregation.

Primary LanguageScala

#simmer Avi Bryant

Simmer is a streaming aggregation tool. It can be used in several contexts to incrementally and efficiently summarize large volumes of data using a fixed amount of memory. Some of the ways it can be used include:

  • As a filter in a unix pipeline processing logs or other text files
  • As a combiner and reducer in Hadoop streaming jobs
  • As a statsd-style metrics service over UDP, optionally backed by Redis

Some of the aggregations it supports include:

  • counts of unique values
  • exponentially decaying values
  • top k most frequent values
  • percentiles
  • min-hash signatures

Simmer is commutative and associative, which is to say that you can always use simmer to combine simmer's output.

It was inspired in part by Hadoop streaming's Aggregate package, but uses the probabalistic aggregation algorithms from Twitter's Algebird.

###To build:

rake

###To run:

bin/simmer < /path/to/data.tsv

###To run listening on UDP and writing to Redis on every 10 updates to a key:

target/simmer -u 8000 -r localhost:6379 -f 10

###Input format

The simmer command takes tab-delimited key-value input and combines all of the values for each key. Here's a very simple sample input:

sum:x	1
min:y	3
min:y	4
sum:x	2
min:y	3

And here are the keys and values of the output:

sum:x	3
min:y	3

simmer has taken the two values for the key "sum:x", 1 and 2, and produced their sum, 3; it has also taken the three values for the key "min:y", 3, 4 and 3, and produced their minimum, 3.

The prefix of each key, before the colon, determines how its values will be combined; in this case, the values for "sum:x" are summed, and the values for "min:y" are combined by taking the minimum. As in this example, you can freely mix different types of aggregation in the same input stream.

Note that the prefix is treated not just as an annotation, but as an integral part of the key. It's often useful to aggregate the same set of values in multiple ways; since, for example, "min:x" and "max:x" are different keys, there's no problem including both and aggregating them separately.

Many of the aggregations can be parameterized by including an integer in the prefix. For example, the percentile aggregator might appear as the prefix "pct95" (to compute the 95th percentile) or the prefix "pct50" to compute the median. A full list of the supported aggregations, and their parameterizations, is below.

###Output format

The output is, like the input format, a tab-separated key-value stream. The output is designed to be easy to read by humans, while at the same time allowing multiple outputs to be combined and fed back into simmer for further aggregation. As a simple example of how these are in conflict, consider an aggregation producing the average of all of the values for a key. The human-readable output is just a single number, the average. To properly combine multiple averages, however, you have to know the count of how many values originally went into each one, so that you can weight them properly. simmer solves this by producing two values for each key, one with a possibly opaque, machine-readable value that is suitable for further aggregation, and another that includes a human-readable version of the value. Often, it's convenient to filter simmer's output through "cut -f 1,3" to see only the human-readable versions.

For simple cases like sum, the human-readable and machine-readable formats are identical, so the output looks like this:

sum:x	3	3

For other aggregations, it might look more like this:

dcy:x	%%%AQBjb20udHdpdHRlci5hbGdlYmlyZC5EZWNheWVkVmFsdeUBQMVkIdW357VAWQAAAAAAAA==	8.752114744797748

Simmer will ignore the human readable values if it's given its own output to consume, because it only looks at the first two columns of input. It will also distinguish properly between new single values, and previous aggregated output, for the same key, and will happily combine these with each other. This means, for example, that you can take the aggregated output of yesterday's logs and cat it with the raw input for today's logs, and get the combined output of both.

###Flushing

The simmer command takes two optional integer arguments. The first argument is --capacity, or -c: how many keys it should hold in memory at once. Whenever a new key is added that will exceed this capacity, the current aggregate value for the least recently used key is flushed. In general these will be infrequent keys that may never recur again, but if they do, you may see multiple outputs for the same key; these need to be aggregated in turn (perhaps by feeding the output back through simmer) to get the complete result.

The second argument, --flush or -f, controls the maximum number of values to aggregate for any one key before flushing. If this is set to 0, there is no maximum and frequently seen keys will only be output when there is no more input. However, if you have an infinite stream of input, you will want to set this to some non-zero value to get intermediate results out. Again, this means there may be multiple values for a single key that need to be combined after the fact.

The defaults are equivalent to:

bin/simmer -c 5000 -f 0

###UDP

If you start simmer with --udp or -u, followed by a port number, it will listen on that UDP port instead of on stdin for rows of data; one UDP packet per row.

###Redis

If you start simmer with --redis or -r, followed by host:port, it will write to Redis instead of stdout; the first column of output (the key) will be used as the Redis key, and the second two columns, tab-separated, will be used as a Redis string value. Any existing data stored in Redis at that key will be merged with the output data whenever simmer flushes.

###Numeric Aggregations

The human-readable output of these is always a single number for each key.

Prefix Description Parameter Default Sample input Sample output
sum Sum n/a n/a
sum:x 1
sum:x 2
sum:x 3
min Minimum n/a n/a
min:x 1
min:x 2
min:x 1
max Maximum n/a n/a
max:x 1
max:x 2
max:x 2
uv Unique values
(estimated using the HyperLogLog algorithm)
number of hash bits - memory use is 2^n uv12
uv:x a
uv:x b
uv:x a
uv:x 2
pct Percentile
which percentile to output pct50 (ie median)
pct50:x  2
pct50:x 4
pct50:x 4
pct50:x 100
The output will be an upper bound on the estimated percentile, expressed as a double.
pct50:x 4.0000152587890625
dcy Exponentially decayed sum
half-life of a value, in seconds dcy86400 (ie, half-life of one day) Data should be in the format timestamp:value.
dcy:x   1365187171:100
dcy:x   1365100771:100
dcy:x   1365014371:100
Human-readable output will be the decayed value as of the end of the current day.
dcy:y 122.3

###Other Aggregations

These are more specialized than, or build in some way on, the numeric aggregations.

Prefix Description Parameter Default Sample input Sample output
top Top K
(by any numeric aggregation)
how many top values to retain
(also requires a secondary prefix - see example)
top10 This will find the top 3 items by the sum of their values, assuming an item:value format
top3:sum:x	a:1
top3:sum:x	b:2
top3:sum:x	a:2
top3:sum:x	c:1
top3:sum:x	d:3

However, top10:uv:x, or top5:pct95:x, or top + any other numeric aggregation, would also be valid keys.

top3:sum:x	a:3,d:3,b:2
bot Bottom K
(by any numeric aggregation)
works just like top bot10
bot3:sum:x	a:1
bot3:sum:x	b:2
bot3:sum:x	a:2
bot3:sum:x	c:1
bot3:sum:x	d:3
bot3:sum:x	c:1,b:2,d:3
mh Min-Hash Signature
(Used for estimating set similarity)
number of hashes to use mh64 Each value should be a single element of the set represented by the key.
mh:x    a
mh:x    b
mh:x    c
Hex representation of n 16-bit hashes. If two sets have k matching hash values, their jaccard similarity = k/n.
mh:x 0FCC:2E1F:0DD7:0049:3BF3:10D4:6460:75D4:392B:07AF:2064:27F0:6931:6717:3A0A:16D9:122E:51C6:8632:64BD:0CAE:0D15:8357:39A5:2008:4ED7:5733:44F8:1F70:02F7:23D5:59AE:0ECB:8EE0:4E1C:0249:9804:610B:0DBD:0316
fh Feature Hashing
(projects any number of features into a fixed-size vector)
number of hash bits to use (output vector size will be 2^n) fh10 Values can either be just a token, for categorical features, or token:number for continuous features.
fh4:x    hello
fh4:x    world
fh4:x    temp:32
fh4 0.0,0.0,-1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-32.0,0.0

###TODO

See https://github.com/avibryant/simmer/issues