StreamKV is a streaming key-value store built on top of Apache Flink to be used within streaming applications requiring more complex stateful logic. StreamKV integrates seamlessly with the Flink fault-tolerance model providing exactly-once semantics.
Key-value operations (put, get, remove...) and their outputs are represented as native Flink DataStreams
and can be embedded in any Flink Streaming application abstracting away fault-tolerant state sharing between different components.
StreamKV also supports timestamped operations, which will be executed in an ordered manner using Flink's watermark mechanism.
Supported operations
Operation | Description | Output |
---|---|---|
Put | Put a stream of (Key,Value) pairs into the store | No output |
Update | Combine a stream of (Key, Value) pairs with their current value in the store | (Key, Value) Stream |
Update (KeySelector) | Combine a stream of (Object, Value) pairs with their current value in the store using a custom selector for extracting the keys. | (Object, Value) Stream |
Get | Get a stream of Keys from the store | (Key, Value) Stream |
Get (KeySelector) | Get a stream of Keys using a custom key selector for extracting the keys | (Object, Value) Stream |
Remove | Remove a stream of Keys from the store | (Key, Value) Stream |
MultiGet | Get multiple Keys from the store using a stream of Key arrays | (Key, Value) Array Stream |
MultiGet (KeySelector) | Get multiple Keys from the store using a stream of Object arrays and a key selector | (Object, Value) Array Stream |
Check out the Scala and Java example programs!
Learn more about Flink at http://flink.apache.org/
StreamKV currently offers APIs both for Scala and Java.
Scala API
// Create a store for account information (name, balance)
val store = KVStore[String, Double](ARRIVALTIME)
// Feed the balance stream into the store
val initialBalance : DataStream[(String,Double)] = …
store.put(initialBalance)
// At any time query the balance by name
val names : DataStream[String] = …
val balanceQ = store.get(names)
// At any time query the balance for multiple people
val nameArrays : DataStream[Array[String]] = …
val totalBalanceQ = store.multiGet(nameArrays)
// Transfer : (from, to, amount)
val transferStream: DataStream[(String, String, Double)] = …
// Apply transfer by subtracting from the sender and adding to the receiver
store.update(transferStream.flatMap(x => List((x._1, -1 * x._3), (x._2, x._3))))(_ + _)
// Print the query outputs
balanceQ.getOutput.print
totalBalanceQ.getOutput.addSink(x => println(x.mkString(",")))
Java API
// Create a new KV store
KVStore<String, Integer> store = KVStore.withOrdering(OperationOrdering.ARRIVALTIME);
// Create query streams
DataStream<Tuple2<String, Integer>> putStream = ...
DataStream<String> getStream = ...
DataStream<String[]> multiGetStream = ...
// Apply the query streams to the KV store
store.put(putStream);
Query<Tuple2<String, Integer>> q1 = store.get(getStream);
Query<Tuple2<String, Integer>[]> q2 = store.multiGet(multiGetStream);
// Get and print the result streams
q1.getOutput().print();
q2.getOutput().print();
If you have any questions don't hesitate to contact me!
List of people who have contributed to this project
- Gyula Fóra
- Márton Balassi
- Paris Carbone