Implements Oz-style dataflow concurrency through dataflow (single assignment) variables and streams as well as lightweight (event-based) processes/threads.
Currently implemented on top of Scala Actors, but could make use of the new delimited continuations support in upcoming Scala 2.8.
The best way to learn how to program with dataflow variables is to read the fantastic book Concepts, Techniques, and Models of Computer Programming. By Peter Van Roy and Seif Haridi.
TODO: Unification of dataflow variables.
Dataflow Variable defines three different operations:
1. Define a Dataflow Variable
val x = new DataFlowVariable[Int]
2. Wait for Dataflow Variable to be bound
x()
3. Bind Dataflow Variable
x << 3
A Dataflow Variable can only be bound once. Subsequent attempts to bind the variable will throw an exception.
You can also shutdown a dataflow variable like this:
x.shutdown
Dataflow streams work like a BoundedQueue in Java. You can add an item to the tail and try to grab one from the head. Threads will block until data is available.
1. Define a Dataflow Stream
val producer = new DataFlowStream[Int]
2. Wait for an element in the stream to be available
producer()
3. Add an element to the stream
producer <<< 3
You can easily create millions lightweight (event-driven) threads on a regular workstation.
thread { ... }
You can also set the thread to a reference to be able to control its life-cycle:
val t = thread { ... } ... // time passes t ! 'exit // shut down the thread
Most of these examples are taken from the Oz wikipedia page.
To run these examples:
1. Compile:
cd src scalac -cp . DataFlow.scala
2. Start REPL
scala -cp .
Welcome to Scala version 2.7.3.final (Java HotSpot(TM) Client VM, Java 1.6.0_06). Type in expressions to have them evaluated. Type :help for more information. scala>
3. Paste the examples (below) into the Scala REPL.
Note: Do not try to run the Oz version, it is only there for reference.
4. Have fun.
This example is from Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language).
Sort of the “Hello World” of dataflow concurrency.
thread Z = X+Y % will wait until both X and Y are bound to a value. {Browse Z} % shows the value of Z. end thread X = 40 end thread Y = 2 end
import DataFlow._ val x, y, z = new DataFlowVariable[Int] thread { z << x() + y() println("z = " + z()) } thread { x << 40 } thread { y << 2 }
Using DataFlowVariable and recursion to calculate sum.
fun {Ints N Max} if N == Max then nil else {Delay 1000} N|{Ints N+1 Max} end end fun {Sum S Stream} case Stream of nil then S [] H|T then S|{Sum H+S T} end end local X Y in thread X = {Ints 0 1000} end thread Y = {Sum 0 X} end {Browse Y} end
import DataFlow._ def ints(n: Int, max: Int): List[Int] = if (n == max) Nil else n :: ints(n + 1, max) def sum(s: Int, stream: List[Int]): List[Int] = stream match { case Nil => s :: Nil case h :: t => s :: sum(h + s, t) } val x = new DataFlowVariable[List[Int]] val y = new DataFlowVariable[List[Int]] thread { x << ints(0, 1000) } thread { y << sum(0, x()) } thread { println("List of sums: " + y()) }
Using DataFlowStream and high-order functions to calculate sum.
fun {Ints N Max} if N == Max then nil else {Delay 1000} N|{Ints N+1 Max} end end fun {Sum S Stream} case Stream of nil then S [] H|T then S|{Sum H+S T} end end local X Y in thread X = {Ints 0 1000} end thread Y = {Sum 0 X} end {Browse Y} end
import DataFlow._ def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { println("Generating int: " + n) stream <<< n ints(n + 1, max, stream) } def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { println("Calculating: " + s) out <<< s sum(in() + s, in, out) } def printSum(stream: DataFlowStream[Int]): Unit = { println("Result: " + stream()) printSum(stream) } val producer = new DataFlowStream[Int] val consumer = new DataFlowStream[Int] thread { ints(0, 1000, producer) } thread { Thread.sleep(1000) println("Sum: " + producer.map(x => x * x).foldLeft(0)(_ + _)) }
Using DataFlowStream and recursive functions to calculate sum.
fun {Ints N Max} if N == Max then nil else {Delay 1000} N|{Ints N+1 Max} end end fun {Sum S Stream} case Stream of nil then S [] H|T then S|{Sum H+S T} end end local X Y in thread X = {Ints 0 1000} end thread Y = {Sum 0 X} end {Browse Y} end
import DataFlow._ def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { println("Generating int: " + n) stream <<< n ints(n + 1, max, stream) } def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { println("Calculating: " + s) out <<< s sum(in() + s, in, out) } def printSum(stream: DataFlowStream[Int]): Unit = { println("Result: " + stream()) printSum(stream) } val producer = new DataFlowStream[Int] val consumer = new DataFlowStream[Int] thread { ints(0, 1000, producer) } thread { sum(0, producer, consumer) } thread { printSum(consumer) }
Shows how to shutdown dataflow variables and bind threads to values to be able to interact with them (exit etc.).
// ======================================= import DataFlow._ // create four 'Int' data flow variables val x, y, z, v = new DataFlowVariable[Int] val main = thread { println("Thread 'main'") x << 1 println("'x' set to: " + x()) println("Waiting for 'y' to be set...") if (x() > y()) { z << x println("'z' set to 'x': " + z()) } else { z << y println("'z' set to 'y': " + z()) } // main completed, shut down the data flow variables x.shutdown y.shutdown z.shutdown v.shutdown } val setY = thread { println("Thread 'setY', sleeping...") Thread.sleep(5000) y << 2 println("'y' set to: " + y()) } val setV = thread { println("Thread 'setV'") v << y println("'v' set to 'y': " + v()) } // shut down the threads main ! 'exit setY ! 'exit setV ! 'exit