orchestracities/ngsi-timeseries-api

Improve WQ response stream format

NEC-Vishal opened this issue · 3 comments

will reopen the issue soon.

@pooja1pathak @NEC-Vishal hi guys, this is one of the most cryptical TODOs in our code, my fault, I was the one who wrote it and I don't think it can possibly make sense to anyone else than me. Sorry about that! But I'm going to explain what I had in mind here so you guys can decide what to do next. So here goes.

Stream processing techniques in Work Queue

First off, a little digression about Work Queue performance. We designed WQ hoping to get an implementation with decent performance---well, for Python. Our approach was to put streams at the core of the design to be able to then perform most computations in linear time and constant space. This is key to get decent performance, b/c WQ could potentially handle very large datasets. So sucking any such a beast into RAM or iterating over it multiple times isn't an option. (I know we do exactly that all over the show in the rest of the code base, but eventually we should rewrite the rest of the code to adopt the same stream-based approach.)

In particular queries stream data directly from the DB to the client in constant space and time. As the DB streams each row in the result set, WQ processes it, converts it to JSON and writes that JSON right away as a text line to the HTTP response stream as shown below---r[n] is a DB row, j[n] is the corresponding JSON object.

DB              WQ            client
---             ---           ------
 |---- r1 -----> |               |
 |               |---- j1 -----> |
 |---- r2 -----> |               |
 |               |---- j2 -----> |
 | ...           | ...           |

The JSON response is actually an array where each entry contains the JSON corresponding to a DB row. But how can we write a JSON array in the HTTP response body without holding all its entries in memory? One entry at a time? Sorry for the silly humour, but that's actually close to what happens. We start writing the response with a line containing the JSON array start delimiter [, then as each row comes in we convert it to JSON, append a comma and write the line to the response stream---e.g. {"id": 31, "speed": 76.9},\n. After streaming all the rows in the result set, we write the last line containing the JSON array end delimiter ].

JSON array streaming algo

Let's flesh out the actual algorithm to convert rows to JSON and stream them to the client as a JSON array. I'm going to use Haskell as a spec language b/c it is very close to maths notation and has the advantage you can actually run the code---yes, we're going to put together an executable spec!

Conceptually we'll think of a stream as a sequence of items---sequence as in maths, i.e. a function from the natural numbers to the set of items to stream. This maps cleanly to the Haskell list algebraic data type---think potentially infinite array or Python "iterable", etc. The items we want to stream are DB rows and let's call R the set of all possible rows know to man. We'll assume there's a given function json : R --> String that converts a row to a JSON object.

Now we're ready to give an exact definition of the JSON array streaming process we sketched out earlier. The key ingredient is the function arr : [R] ---> [String] which takes a stream of rows and turns it into a stream of strings in such a way that if you concatenate all the strings in the stream you get a valid JSON array.

arr rs = "[" : go rs
    where
        go []       = [ "]" ]
        go [r]      = [ json r, "]" ]
        go (r:rest) = (json r ++ ",") : go rest

With that under our belt, all we need is a write function to send the client each item arr outputs as a text line. This kind of function you typically get from your Web framework, but we'll implement a stub here to be able to run the Haskell spec.

write [] = send ""
write (line:rest) = do
    send line
    write rest

send is a function to write data to a socket, but for demo purposes, we'll just make it an alias to putStrLn which is the Haskell function that writes a line to the stdout stream. With all that in place, we can easily evaluate expressions in the REPL

> write (arr [1, 2, 3])
[
1,
2,
3
]

Notice that b/c Haskell evaluates expressions lazily, the argument to arr is consumed one item at a time as shown below

write (arr [1, 2, 3]) =
write ("[" : go [1, 2, 3]) =
send "["
write (go [1, 2, 3]) =
write ((json 1 ++ ",") : go [2, 3]) =
write ("1," : go [2, 3]) =
send "1,"
write (go [2, 3]) = ...

So if the example list [1, 2, 3] was actually a sequence of rows streamed from the DB, all would still work as expected.

Python implementation

Now you'd think you'd have an easy time translating the above executable spec into Python. In fact, Python 3 comes with iterators and generators that were pretty much modelled after Haskell lists. But I couldn't come up with a quick way of doing that, so I gave up and hacked together something similar which is still based on iterators & generators but it outputs an extra JSON null value as last element of the streamed array. So for example, the Python algo implementation would produce

[
1,
2,
3,
null
]

when given an iterable containing 1, 2, 3.

@pooja1pathak @NEC-Vishal, the TODO in the code asks if there's any easy way to get rid of that extra JSON null value in the Python implementation that doesn't take more than a couple of lines of code to write.

Also another thing we can look into is streamlining the JSON response format, maybe adopt one of the formats Wikipedia mentions