/annoyED

ANN for Data Streams

Primary LanguageJavaApache License 2.0Apache-2.0

annoyED: Approximate nearest neighbors on your endless datastreams

AnnoyED is a streamified adoption of the Annoy algorithm [1, 2] by Erik Bernhardsson to retrieve approximate nearest neighbors. It continuously builds Annoy Indices on (possibly endless) data streams instead of on self-contained datasets only. The built indices can be queried during runtime (independent from the index building); it is so guaranteed that query results always contain the most current data. This project is implemented in Java, uses Kafka Streams, and was developed in the master’s seminar “Machine Learning for Data Streams” at Hasso Plattner Institute in 2019/20.

Installation and Setup

Make sure to install the following:

Then:

  1. Clone repository via git clone git@github.com:jonashering/annoyED.git.
  2. Start the Docker daemon.
  3. Run docker-compose up. This will start and setup Zookeeper and Kafka.
  4. Execute ./gradlew run to start the annoyED application.

Usage

REST Service

On startup, the application exposes three REST endpoints on localhost:5000. The relevant one, in the beginning, is /params:

  • /params: To configure the application, make a POST request with the request body <Number of Trees>;<Expected Dataset Size>;<Distance Metric ID> to this endpoint. The store creates the trees and allocates the space accordingly (use 0 if size is unknown). We currently support 2 distance metrics: Euclidean (ID: 0) and Angular (ID: 1). An example body would be 10;0;0. We would get 10 trees, no preallocated memory and the euclidean distance metric. Be aware that every POST request resets the whole storage. This means that all data that was loaded into the store will be discarded.

Our Kafka Streams application uses two topics: source-topic for input and sink-topic for output. JSON serialized messages are written and read by our app. Messages to the source-topic should have the following format:

{   'datapointID': 0,
    'vector':      [0.3, 0.7, 255, ...],
    'persist':     true,
    'write':       false,
    'k':           10                       }

Use the flags persist=True to add a point to the index and write=True to retrieve its nearest neighbors and write them to the sink-topic as explained in detail here. The application then outputs JSON serialized NearestNeighbors object to the sink-topic in the following format; list contains the datapointIDs of the k nearest neighbors sorted by distance:

{  'list': [1, 5, 4, 3]  }
  • /query: After having added some points to your index, start querying nearest neighbors for a new point by making a POST request with a JSON serialized Datapoint as request body (as described above) to this endpoint. The result will be a JSON serialized NearestNeighbors object with the ids of the k nearest neighbor points.

  • /trees: To inspect the built indices, send a GET request to this endpoint. The response will be a JSON serialized IndexTree. Be aware that the returned object can be very large. We only recommend using this endpoint for testing purposes with few data.

Usage with ann-benchmarks

Erik Bernhardsson, the creator of Annoy, also developed a benchmarking framework called ann-benchmarks [3], which we use to evaluate our implementation. To run the benchmarks on AnnoyED, use our fork of ann-benchmarks found here which includes a custom AnnoyED wrapper, i.e. datapoint producer, as well as the required configuration files. Then, follow the README there. In short this means:

git clone git@github.com:MariusDanner/ann-benchmarks.git  # clone repository
cd ann-benchmarks
pip install -r requirements.txt
python run.py  --local --dataset [dataset_id]# run the benchmark tool
python plot.py  # show and plot the results

Why Approximate Nearest Neighbor Search for Data streams?

Finding similar records in datasets can facilitate solutions for many interesting use-cases. For example, recommendation engines for music services: We can recommend new interesting songs to listeners based on what other similar listeners previously listened to. Since both listeners are somewhat similar, it is not unlikely that they will perceive these recommendations as reasonably customized and meaningful.

K-NN formalizes searching similar points in a dataset as follows: Given a dataset of points in a vector space (e.g. a three-dimensional space), an arbitrary distance function (e.g. Manhattan distance), for each point p in the dataset we want to retrieve the k points that are closest to p based on the selected distance function, i.e. the k nearest neighbors [8]. Usually, these neighbors are then used for classification or regression tasks.

A naive solution on a self-contained dataset is to compute distances between all pairs of points in the dataset and infer the k nearest neighbors from these results. This naive approach has quadratic time complexity wrt. the number of points in the dataset. While smart optimizations to this naive solution exist, the time complexity of k nearest neighbor search algorithms remains a bottleneck [7].

However for many use-cases, determining the exact k nearest neighbors is not a crucial requirement. For example when using k-NN for recommendation engines, it is usually reasonably good to compromise on quality, i.e. allow leaving out some true nearest neighbors, if it leads to a faster query performance. This is where approximate k-NN comes into place. Approximate nearest neighbor search is a relaxed definition of k-NN. Given a relative error of a point p is considered an approximate nearest neighbor of point q, if , where p* is a true nearest neighbor of q. Thus, p is within relative error of the true nearest neighbor [7].

Approximate k-NN approaches were firstly developed for static datasets. When the underlying data changes, this requires however recalculating the entire model. This is costly and since models are then trained on snapshots of the dataset, queries might be answered based on deprecated data. To tackle this issue, we adopted the popular approximate k-NN approach Annoy [1, 2] and transformed it into a streamified approach.

Related Work: Annoy, a solution for static datasets

Annoy [1, 2] uses a spatial indexing strategy with binary trees to store and retrieve points efficiently. Generally, the root node as well as every inner node in this tree represents a split, while every leaf node is a sink that holds a fixed number of points. Datenfluss

Building Annoy Index: An index tree is built in a top-down procedure: Consider a set of points, i.e. your dataset, two points are randomly picked from this set. Between these two points, a bisecting hyperplane is constructed that is equidistant to both points. This split constitutes the root node in our index tree. It has a left and a right child node. Each point in the dataset is assigned to either one of the children nodes based on which side of the drawn hyperplane it is located. This process of splitting the dataset into smaller and smaller chunks is recursively applied to every new child node until we have an index tree in which each leaf node holds at most a fixed number of points. The result of this process can be seen in the illustration above. In result, the constructed index trees with its split nodes and hyperplanes partition the space in which our points are located into small subspaces that are roughly equally populated.

Querying Annoy Index: Consider a query point for which you want to determine its k approximate nearest neighbors. We start with the root node of our previously built index tree and check on which side of its hyperplane our query point is located. Based on that result, we proceed to traverse the tree with either the left or right child node. We traverse the tree in this fashion until we reach a leaf node. All points that this leaf node holds are considered nearest neighbor candidates. We proceed by calculating the distance to these points and the query point. The k points with the lowest distance are the resulting k nearest neighbors.

Using a forest instead of a single tree: Our simplified explanation makes it easy to spot a major flaw in this concept: If we are unfortunate, splits could easily separate close points into different partitions, e.g. if they are very close to a hyperplane. For that reason, instead of constructing a single index tree, we construct a forest of n index trees that are all constructed randomly, i.e. each is different from one another. With every query, we search all index trees in this forest. By this, the problem of missing nearest neighbors due to unfortunate splits is reduced.

Our approach: Streamifying Annoy

In the original implementation of Annoy, the built index is immutable. This results in it being impractical for an approach where the data originates from a continuous data stream rather than a static dataset. However, since the index trees constructed when building the index are simply binary trees, i.e. only leaves hold data and they do not need to be balanced, we can make them mutable and thus suitable for our approach with few modifications:

While the querying procedure remains unchanged, we add the following modification to constructing the index trees: Points can be added to an existing index tree at any time, the correct leaf node to hold the new point is found by traversing the index tree similarly to the querying process, i.e. until the corresponding subspace is found. Each leaf node has a fixed capacity. Until this capacity is reached, we can add new points to it. When it is reached, a new spit is introduced by inserting a new hyperplane in its space and partitioning the points into two leaves accordingly.

Step 1: Traverse IndexTree to the corresponding leaf

Step 2: Insert new point at that leaf, split leaf if it reaches capacity (in this example: capacity=8)

Architecture and Components of annoyED

Apart from the Kafka-relevant components, we defined the following classes to store and retrieve approximate nearest neighbors:

Datapoints have an id which uniquely identifies them and is used as a key to store the Datapoint in a global lookup table. The data vector holds the position of the datapoint in the vector space. The persist flag tells the NeighborProcessor whether or not the datapoint should be written into the index. The write flag tells the NeighborProcessor if the nearest neighbors should be retrieved and written to the sink-topic. The k variable is only used if the write flag is set and tells the IndexStore how many nearest neighbors should be retrieved.

Dataflow

A Datapoint is written to the Kafka Topic source-topic through our Producer or an external application, i.e. the evaluation framework, to which the NeighborProcessor is subscribed. Depending on if the Datapoint should be written to the index and/or its nearest neighbors should be retrieved, it is proceeded as follows:

Datenfluss

Adding a Datapoint to the Tree Index: The NeighborProcessor calls the write method of the IndexStore. If the distance metric is angular, the vector of the Datapoint is converted to a unit vector as a preprocessing step. After this, the Datapoint gets inserted into a lookup table (HashMap) which maps the unique datapointID to the Datapoint. The Datapoint is now being inserted into each one of the IndexTrees. If the Datapoint is the first point to be inserted into the tree, the parameter _ k, which determines the maximum capacity of a leaf, is set to the number of dimensions of the datapoint rounded up to the next hundred. Now, the correct IndexNode of each IndexTree is found by traversing the nodes of the tree starting by its root node and continuing with either its leftChild or rightChild based on which side of the IndexSplit the Datapoint is located. We stop when we find a leaf, i.e. a node without children. The datapointID is then added to the data list of the node. If the node data contains less than _ k elements now, the insert operation is finished. Otherwise, a left and a right child node and an IndexSplit are created. First, two Datapoints are selected to create the split randomly (or alternatively by an approximate 2-Means procedure). Then, a hyperplane (equidistant to both points) is constructed and persisted in the IndexSplit. After this, all Datapoints of the node are moved either to the leftChild or the rightChild depending on which side of the hyperplane they are located.

Querying the Nearest Neighbors of a Datapoint: To query the k neighbors of the Datapoint, candidates from all trees are selected. First, a HashMap<Integer, Double> which maps the datapointID to the distance to the query point is created. It acts as a cache so that distances between the same pairs of Datapoints do not need to be calculated redundantly. In each IndexTree, the correct leaf node is determined in the procedure as on insert. Then, for each Datapoint in the leaf node, the distance to the query point is determined. This is done by looking up the datapointID in the previously created cache or calculating the distance and caching it. Then, all Datapoints of the leaf node are sorted by distance to the query point and the closest k Datapoints are returned. In the last step, all returned Datapoints (from all IndexTrees) are merged in a custom merge-sort-like procedure until the top k nearest neighbor Datapoints have been determined. These are then returned and written to the sink-topic as NeighborCandidates.

Experiments and their Results

The ann-benchmarks framework [3] provides access to multiple datasets and quality measures to evaluate approximate nearest neighbor approaches. We used it with a custom annoyED wrapper and measured recall of the k nearest neighbor result as (i.e. recall is equivalent to precision in this use case) as well as the number of processed queries per second as QPS (this excludes model training) on these reference datasets:

Dataset Dimensions Train size Test size Neighbors Distance
MNIST [4] 784 60,000 10,000 100 Euclidean
Fashion-MNIST [6] 784 60,000 10,000 100 Euclidean
NYTimes (Bag of Words) [5] 256 290,000 10,000 100 Angular

Optimizations

In the process of developing annoyED, we started with a straightforward implementation of our approach and later implemented a number of small optimizations to increase both quality and throughput. We created benchmarks for these changes with the MNIST dataset [4], Euclidean distance and k = 10.

Baseline: Already with our baseline (No Caching + Random), where we select split candidates randomly and otherwise proceed as described in the previous section but leabing out the aforementioned query caching, we achieve a precision of up to 0.87 at 31 QPS with 5 index trees. As expected, with more trees our model can answer queries with higher recall, i.e. unfavorable partitions in one index tree are compensated by the other trees, but however at the cost of slower throughput due to the increased complexity.

2-Means Split Candidate Selection: Randomly selecting the points between which the splitting hyperplane is constructed in a split can lead to unbalanced splits, e.g. if a selected point is an outlier. This will then lead to an unbalanced and unnecessarily deep tree structure in our index. To approach this problem, we added a k-means approximation with two centroids to create an optimally splitting hyperplane (look here for how it works in detail). With this optimization, we achieve a higher precision of up to 0.92 while at the same time maintaining the throughput of 31 QPS with 5 trees. We observe an increased QPS for less trees as well. Splitting the points in a more balanced way and thus creating more quality index trees is therefore a meaningful addition.

However: When later running the experiments on the NYTimes dataset, we discovered that the 2-means algorithm might decrease the performance. Since the capacity of the index tree leaves depends on the dimensionality of points, low-dimensional datasets trigger splitting more often. The costly 2-means, as well as the large dataset size of NYTimes, makes the index building slower, we thus made it optional.

Caching intermediate distance results: A quick recap of our implementation: When searching nearest neighbors, each index tree is traversed to the correct leaf node and from its set of points, the k closest points are forwarded as neighbor candidates where they are then merged to the true k results. Even though every index tree randomly partitions its points differently, it is likely that there is a common subset of neighbor candidates, i.e. the points closest, among all index trees. In our baseline implementation, this led to distances being calculated redundantly for every index tree. As an optimization, we cache the distances and so avoid duplicate computation. This does not lead to quality improvement, however, it improves the throughput to 40 QPS with 5 trees. In fact, we can now train more trees and while maintaining the same throughput we previously had with fewer trees, e.g. instead of 4 trees we now can have 5 at 40 QPS increasing the recall by 0.04.

Parallelization of Distance Calculation: We also experimented with parallelizing the distance calculation when retrieving points from the index. Our implementation collected the set of neighbor candidates from all index trees and then distributed the distance calculation to all available cores using the Java interface list.parallelStream().map(_ -> _). This did, unfortunately, decrease the speed. However, our testing setup is a machine with 4 (virtual) cores, so on a better-powered machine this might change.

10, 25, 50, 100 Nearest Neighbors

Additionally, we tested how well our implementation works with different settings of k. We observe, that the throughput remains relatively equal for different k. We expected this behavior, since sorting all leaf candidates is equally complex for every k, only the later merge step benefits from smaller k; however, its duration is insignificant. Also, that lower k result in higher recall results is not surprising; due to the partitioning process very close points likely end up in the same partition, points further distant are more likely to be cut off to other partitions.

MNIST, Fashion-MNIST and NYTimes Datasets

With our implementation working reasonably well on MNIST with euclidean distance, we also tested it on the similar Fashion-MNIST dataset (euclidean distance) and on the NYTimes dataset (angular distance) with k = 10. We generally observe that with a higher number of index trees, the recall increases while the QPS decreases.

The NYTimes dataset has 5 times the points as MNIST but only 256 dimensions. This results in a lower _k (than MNIST) and thus having less candidates per tree. This means that every tree holds a smaller fraction of available points (maximum 300/290000 = 0.001 (while for MNIST, each tree holds a maximum fraction of 0.013) so retrieving nearest neighbors is more difficult which is also confirmed by the recall results [3] of other algorithms. Fashion-MNIST on the other hand performs very similar to MNIST. The reason for this is it being equal to MNIST in terms of dimensionality, size and the used distance measure. The insignificant differences are likely due to chance at runtime.

AnnoyED vs. Annoy

Our project uses the core ideas from Annoy [1, 2], so we were interested in how well the different implementations compare on MNIST. Both implementations achieve similar high recall values; with 15 trees annoyED achieves a recall of 0.97, however at the price of few QPS of 14.6. Annoy on the other hand achieves an recall of 0.99 with 83.3 QPS. Another example: to get a recall of 0.79 annoyED has 30.2 QPS while Annoy achieves 3940 QPS.

To consider is the following: Annoy is a highly optimized library written in C++, it uses AVX [9] where possible to accelerate for-loops using SIMD operations. Also, Annoy does not allow modifying built indices which allows further optimizations. Plus, Annoy is directly accessed through a C++/Python interface while annoyED uses Kafka topics which leads to networking overhead.

Discussion and Future Work

We implemented an approximate nearest neighbor search prototype for data streams with Kafka Streams. Our solution is based on the Annoy algorithm [1, 2] and adopts its key ideas. As opposed to Annoy, annoyED does not implement separate model training and inference phases but its indices can be extended and queried continuously. Therefore, it is especially suitable when nearest neighbor search is realized on volatile data, e.g. data streams. AnnoyED works reasonably well on a number of different datasets and supports multiple distance measures. However, its performance is not on par with the highly optimized Annoy library.

Implementing annoyED, Kafka Streams turned out to be a suitable framework. While we needed to develop a custom state stores and processor, the Kafka Streams API allowed a flexible architecture according to our requirements. However, we expect adding a reasonable windowing strategy, which is not yet included, to be an elaborate effort due to its complexity.

Current limitations to consider are the following: We only ran and evaluated annoyED in a non-distributed setting, thus adding a layer of parallelization by e.g. multiplying the number of index stores and running them on separate instances is a task for future work. Also, a custom windowing strategy to allow for old data to be forgotten and thus account for e.g. concept drifts is an important step before running annoyED in a production setting.

References and Further Reading

[1]: Bernhardsson, E. (2015). “Nearest Neighbor Methods And Vector Models: Part 1&2” (Blogpost). https://erikbern.com/2015/09/24/nearest-neighbor-methods-vector-models-part-1, https://erikbern.com/2015/10/01/nearest-neighbors-and-vector-models-part-2-how-to-search-in-high-dimensional-spaces.html.

[2]: Spotify Inc. (since 2013). “Annoy: Approximate Nearest Neighbors Oh Yeah” (Repository). https://github.com/spotify/annoy.

[3]: Aumüller, M.; Bernhardsson, E.; Faithfull, A.J. (2018). “ANN-Benchmarks: A Benchmarking Tool for Approximate Nearest Neighbor Algorithms” (PDF). CoRR, abs/1807.05614. Also: http://ann-benchmarks.com/, https://github.com/erikbern/ann-benchmarks/.

[4]: LeCun, Y.; Cortes, C; Burges, CJ (2010). “MNIST handwritten digit database” (Online). ATT Labs. http://yann.lecun.com/exdb/mnist/.

[5]: Newman, D. (2008). “Bag of Words Data Set: NYTimes” (Online). University of California, Irvine. https://archive.ics.uci.edu/ml/datasets/bag+of+words.

[6]: Xiao, H.; Rasul, K; Vollgraf, R. (2017). “Fashion-MNIST: a Novel Image Dataset for Benchmarking Machine Learning Algorithms” (PDF). CoRR, abs/1708.07747. Also: https://github.com/zalandoresearch/fashion-mnist.

[7]: Arya, S.; Mount, D. M.; Netanyahu, N. S.; Silverman, R.; Wu, A. (1998). "An optimal algorithm for approximate nearest neighbor searching" (PDF). Journal of the ACM. 45 (6): 891–923.

[8]: Dasarathy, Belur V., ed. (1991). Nearest Neighbor (NN) Norms: NN Pattern Classification Techniques. ISBN 978-0-8186-8930-7.

[9]: Intel Corp. (2020). "Intel® Advanced Vector Extensions 512" (Online). https://www.intel.com/content/www/us/en/architecture-and-technology/avx-512-overview.html.