A rolling window implementation for Go.
- rolling
go get github.com/kevinconway/rolling/v3
Rolling time windows represent a view of data from the current point in time to some configurable duration in the past.
var p = rolling.NewTimePolicy[int](rolling.NewWindow[int](3000), time.Millisecond)
var start = time.Now()
for range time.Tick(time.Millisecond) {
if time.Since(start) > 3*time.Second {
break
}
p.Append(1)
}
The above creates a time window that contains 3,000 buckets where each bucket contains, at most, 1ms of recorded data. New values are always recorded in the most recent bucket and the oldest bucket is dropped in intervals of the bucket duration. In the above, for example, the oldest bucket is dropped every millisecond and a new, most recent bucket is created to collect values.
This type of window policy is most useful for collecting real-time values such as request rates, error rates, and latency of operations.
The NewTimePolicy
constructor returns a lock free implementation that is not
safe for concurrent use. Use NewTimePolicyConcurrent
if you need to manage
concurrent readers and writers.
The time based rolling window defaults to using time.Now()
for all timestamp
generation but it is possible to provide your own timestamps using the
AppendWithTimestamp
and ReduceWithTimestamp
variants:
var p = rolling.NewTimeWindow[int](rolling.NewWindow[int](3000), time.Millisecond)
var start = time.Now()
for range time.Tick(time.Millisecond) {
if time.Since(start) > 3*time.Second {
break
}
p.AppendWithTimestamp(1, start.Add(time.Millisecond))
}
Using custom timestamps is an advanced use case and requires careful usage. The
target use case for this is when populating the window with a series of data
that provides its own timestamps. An example of this is processing a strictly
ordered event stream where each event has an embedded timestamp. You should not
use both the standard and WithTimestamp
method variants for the same window.
The reason for this is that the time based policy is driven by the largest given
timestamp such that the general expectation is for timestamps to only increase
in value. Mixing custom and default timestamps can result in unexpected behavior
or a corrupt internal state of the policy.
The risk of feeding out of order timestamps to a time based policy may be greater when using custom timestamps but there is still a risk when using the default timestamp generation. The current implementation leverages wall clock values to determine passage of time and when routing values to window buckets. There are multiple ways for wall clock time to run backwards depending on system configuration and load on the system. Here's how the time policy handles different ordering conditions:
- If the timestamp is older than the oldest bucket in the window then it is discarded.
- If the timestamp is within the current window then it is added to the appropriate bucket.
- If the timestamp is in the future compared to the current window then the future timestamp becomes the new leading edge of the window and any existing buckets that are now outside the window are discarded.
In effect, time windows can only move forward and never backwards. If time runs backwards substantially then it is likely that all data points will be discarded until the timestamp values return to the range within the window. If time runs forward substantially then it will both discard all current values in the window and set a new anchor point. The window never adjusts backwards so a future timestamp effectively invalidates the window until the source of timestamps begins producing values within the window established by the future timestamp.
While these timing conditions are possible, a substantial forward or backwards move in wall clock time is uncommon and would likely have other negative effects on a system beyond this library.
Rolling point windows represent a view of the most recently added data points.
var p = rolling.NewPointPolicy[int](rolling.NewWindow[int](5))
for x := 0; x < 5; x = x + 1 {
p.Append(1)
}
p.Reduce(func(w Window[int]) int {
fmt.Println(w) // [ [1] [1] [1] [1] [1] ]
return 0
})
w.Append(5)
p.Reduce(func(w Window[int]) int {
fmt.Println(w) // [ [5] [1] [1] [1] [1] ]
return 0
})
w.Append(6)
p.Reduce(func(w Window[int]) int {
fmt.Println(w) // [ [6] [5] [1] [1] [1] ]
return 0
})
The above creates a window that always contains 5 data points. When the next value is appended it will overwrite the first value. The window continuously overwrites the oldest value with the newest to preserve the specified value count. This type of window is similar to a circular buffer and is useful for collecting data that have a known interval on which they are captured or for tracking data where time is not a factor.
The NewPointPolicy
constructor returns a lock free implementation that is not
safe for concurrent use. Use NewPointPolicyConcurrent
if you need to manage
concurrent readers and writers.
Each window policy exposes a method with the signature
Reduce(func(ctx context.Context, w Window[T]) T) T
that can be used to
aggregate the data stored within the window. The method takes in a function that
can compute the contents of the Window
into a single value. For convenience,
this package provides some common reductions:
p.Reduce(ctx, rolling.Count[int])
p.Reduce(ctx, rolling.Avg[int])
p.Reduce(ctx, rolling.Min[int])
p.Reduce(ctx, rolling.Max[int])
p.Reduce(ctx, rolling.Sum[int])
p.Reduce(ctx, rolling.Percentile[int](99.9))
p.Reduce(ctx, rolling.FastPercentile[int](99.9))
The Count
, Avg
, Min
, Max
, and Sum
each perform their expected
computation. The Percentile
reduction can be constructed based on a target
percentile.
For cases of very large datasets, the FastPercentile
can be used as a
replacement for the standard percentile calculation. This alternative version
uses the p-squared algorithm for estimating the percentile by processing
only one value at a time, in any order. The results are quite accurate but can
vary from the actual percentile by a small amount. It's a tradeoff of accuracy
for speed when calculating percentiles from large data sets. For more on the
p-squared algorithm see: http://www.cs.wustl.edu/~jain/papers/ftp/psqr.pdf.
Any function that matches the form of
func[T rolling.Numeric](context.Context, rolling.Window[T]) T
may be given to
the Reduce
method of any window policy. The Window[T]
type is a named
version of [][]T
where T may be any integer or float type. Calling
len(window)
will return the number of buckets. Each bucket is, itself, a slice
of T
where len(bucket)
is the number of values measured within that bucket.
Most aggregates will take the form of:
func MyAggregate[T rolling.Numeric](ctx context.Context, w rolling.Window[T]) T {
for _, bucket := range w {
for _, value := range bucket {
// aggregate something
}
}
}
Generally, the window policies leverage fixed sized windows. For example, point policies only contain up to one data point per bucket in the window which means the memory usage is constant and there are no allocations for any number of data points added to a point policy.
Time based policies, however, can record an unbounded number of data points per
bucket even if they maintain a fixed number of buckets in the window. To better
amortize the cost of allocating new space within buckets to contain more data,
time based policies will always re-use previous allocations rather than
returning them to garbage collection. A time policy in use for extended periods
of time tends to average down to zero allocations (as demonstrated in the
benchmarks included with the project). To further reduce runtime allocations you
can use the NewPreallocatedWindow(buckets, bucketSize)
to generate a window
where each bucket contains bucketSize
number of spaces. If you match this
value to your peak data ingestion rate per unit of time associated with each
bucket then you can achieve zero allocation time policy usage.
Most of the provided reduction methods (Sum, Avg, Min, etc.) run in O(n)
time,
where n
is the number of data points in the window, and result in zero
allocations. The Count
reduction runs in O(n)
but with n
equal to the
number of buckets. The Percentile
reduction, however, attempts to calculate a
perfectly accurate percentile which requires it to sort all values in the
window. The sort algorithm complexity is documented in the
sort.Stable documentation. The
FastPercentile
reduction is O(n)
and results in zero allocations but it uses
an estimation algorithm called p-squared
that produces smaller errors when given larger datasets. See the p-squared paper
for more details.
Another performance factor to consider is that concurrency safe window policies
use a mutex lock to guard all reads and writes of the enclosed window. This
means that reduction methods hold the lock for the duration of their run. A more
complex than usual reduction or even a simple O(n)
reduction over a very large
dataset could mean blocking. If reductions begin to cause a performance issue
then the options under the current design are limited to reducing the complexity
of the reduction, reducing the size of the dataset being operated on, and
amortizing the reduction cost by, for example, only running it in intervals of
time or after a fixed amount of calls that get a cached result.
In the past, I was part of the original team that built and maintained github.com/asecurityteam/rolling which was used in service reliability and performance tooling. Since then, the team's priorities and tooling have changed.
I have new use cases for this library so I'm maintaining this fork. This version includes support for multiple numeric types, a small number of bug fixes, and a few small performance optimizations.
For convenience, I have created a v2.2.1
tag that matches the last published
release of github.com/asecurityteam/rolling. The only difference is that I
have updated the module path to github.com/kevinconway/rolling/v2
. You should
be able to replace github.com/asecurityteam/rolling
with
github.com/kevinconway/rolling/v2
in either your source code or your go.mod
using a replace
directive to pull from here instead.
There is no particular advantage to doing this, today, unless it's part of a gradual migration to v3 (the version documented in this file). If someone finds a severe enough bug then I may release an additional v2 patch.
This fork of the project has a nearly identical interface to the original except
that this version uses Go generics to allow all numeric types in a window rather
than only float64
. To update, you should only need to add a type parameter to
constructor methods and reductions. For example:
p := rolling.NewTimeWindow(rolling.NewWindow(3000), time.Millisecond)
p.Reduce(rolling.Sum)
becomes:
p := rolling.NewTimeWindow[int](rolling.NewWindow[int](3000), time.Millisecond)
p.Reduce(ctx, rolling.Sum[int])
This project has no hard dependencies on any build tools other than Go. You
should be able to run go test
for any changes and see the results.
If you prefer, the project includes a Makefile with the following rules:
update
- Update dependencies in the go.mod file.bin
- Download all optional build and test tools.fmt
- Rungoimports
on all Go source files.test
- Run all tests and create a test coverage record.coverage
- Generate a series of coverage reports from test records.
For bugs or performance improvements, I welcome pull requests, issues, or
comments. If you make a pull request then please be sure to add tests and run
make fmt
.
For new features, please start a discussion first by creating an issue and explaining the intended change.
This project is licensed under the Apache 2.0 license. See LICENSE.txt or http://www.apache.org/licenses/LICENSE-2.0 for the full terms.
This project is forked from https://github.com/asecurityteam/rolling. The original project's copyright attribution and license terms are:
Copyright @ 2017 Atlassian Pty Ltd
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
All files are marked with SPDX tags that both attribute the original copyright as well as identify the author(s) of any significant changes to those files.