/scala-dataflow

Oz-style dataflow (single-assignment) variables and streams for Scala

Primary LanguageScala

Scala Dataflow Concurrency DSL

Description

Implements Oz-style dataflow concurrency through dataflow (single assignment) variables and streams as well as lightweight (event-based) processes/threads.

Currently implemented on top of Scala Actors, but could make use of the new delimited continuations support in upcoming Scala 2.8.

The best way to learn how to program with dataflow variables is to read the fantastic book Concepts, Techniques, and Models of Computer Programming. By Peter Van Roy and Seif Haridi.

TODO: Unification of dataflow variables.

Dataflow Variable

Dataflow Variable defines three different operations:

1. Define a Dataflow Variable

val x = new DataFlowVariable[Int]

2. Wait for Dataflow Variable to be bound

x()

3. Bind Dataflow Variable

x << 3 

A Dataflow Variable can only be bound once. Subsequent attempts to bind the variable will throw an exception.

You can also shutdown a dataflow variable like this:

x.shutdown

Dataflow Streams

Dataflow streams work like a BoundedQueue in Java. You can add an item to the tail and try to grab one from the head. Threads will block until data is available.

1. Define a Dataflow Stream

val producer = new DataFlowStream[Int]

2. Wait for an element in the stream to be available

producer()

3. Add an element to the stream

producer <<< 3 

Threads

You can easily create millions lightweight (event-driven) threads on a regular workstation.

thread { ... }

You can also set the thread to a reference to be able to control its life-cycle:

val t = thread { ... }

... // time passes

t ! 'exit // shut down the thread

Examples

Most of these examples are taken from the Oz wikipedia page.

To run these examples:

1. Compile:

cd src
scalac -cp . DataFlow.scala

2. Start REPL

scala -cp .
Welcome to Scala version 2.7.3.final (Java HotSpot(TM) Client VM, Java 1.6.0_06).
Type in expressions to have them evaluated.
Type :help for more information.

scala>

3. Paste the examples (below) into the Scala REPL.
Note: Do not try to run the Oz version, it is only there for reference.

4. Have fun.

Example 1

This example is from Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language).
Sort of the “Hello World” of dataflow concurrency.

Example in Oz

  thread 
    Z = X+Y     % will wait until both X and Y are bound to a value.
    {Browse Z}  % shows the value of Z.
  end
  thread X = 40 end
  thread Y = 2 end

Example in Scala

  import DataFlow._
  val x, y, z = new DataFlowVariable[Int]
  thread {
    z << x() + y()
    println("z = " + z())
  }
  thread { x << 40 }
  thread { y << 2 }

Example 2

Using DataFlowVariable and recursion to calculate sum.

Example in Oz

  fun {Ints N Max}
    if N == Max then nil
    else 
      {Delay 1000}
      N|{Ints N+1 Max}
    end
  end

  fun {Sum S Stream}
    case Stream of nil then S
    [] H|T then S|{Sum H+S T} end
  end

  local X Y in
    thread X = {Ints 0 1000} end
    thread Y = {Sum 0 X} end
    {Browse Y}
  end

Example in Scala

  import DataFlow._

  def ints(n: Int, max: Int): List[Int] =
    if (n == max) Nil
    else n :: ints(n + 1, max)

  def sum(s: Int, stream: List[Int]): List[Int] = stream match {
    case Nil => s :: Nil
    case h :: t => s :: sum(h + s, t)
  }

  val x = new DataFlowVariable[List[Int]]
  val y = new DataFlowVariable[List[Int]]

  thread { x << ints(0, 1000) }
  thread { y << sum(0, x()) }
  thread { println("List of sums: " + y()) }
  

Example 3

Using DataFlowStream and high-order functions to calculate sum.

Example in Oz

  fun {Ints N Max}
    if N == Max then nil
    else 
      {Delay 1000}
      N|{Ints N+1 Max}
    end
  end

  fun {Sum S Stream}
    case Stream of nil then S
    [] H|T then S|{Sum H+S T} end
  end

  local X Y in
    thread X = {Ints 0 1000} end
    thread Y = {Sum 0 X} end
    {Browse Y}
  end

Example in Scala

  import DataFlow._

  def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { 
    println("Generating int: " + n)
    stream <<< n
    ints(n + 1, max, stream)
  }

  def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { 
    println("Calculating: " + s)
    out <<< s
    sum(in() + s, in, out)
  }

  def printSum(stream: DataFlowStream[Int]): Unit = {
    println("Result: " + stream())      
    printSum(stream)
  }

  val producer = new DataFlowStream[Int]
  val consumer = new DataFlowStream[Int]

  thread { ints(0, 1000, producer) }
  thread { 
    Thread.sleep(1000)
    println("Sum: " + producer.map(x => x * x).foldLeft(0)(_ + _)) 
  }

Example 4

Using DataFlowStream and recursive functions to calculate sum.

Example in Oz

  fun {Ints N Max}
    if N == Max then nil
    else 
      {Delay 1000}
      N|{Ints N+1 Max}
    end
  end

  fun {Sum S Stream}
    case Stream of nil then S
    [] H|T then S|{Sum H+S T} end
  end

  local X Y in
    thread X = {Ints 0 1000} end
    thread Y = {Sum 0 X} end
    {Browse Y}
  end

Example in Scala

  import DataFlow._

  def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { 
    println("Generating int: " + n)
    stream <<< n
    ints(n + 1, max, stream)
  }

  def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { 
    println("Calculating: " + s)
    out <<< s
    sum(in() + s, in, out)
  }

  def printSum(stream: DataFlowStream[Int]): Unit = {
    println("Result: " + stream())      
    printSum(stream)
  }

  val producer = new DataFlowStream[Int]
  val consumer = new DataFlowStream[Int]

  thread { ints(0, 1000, producer) }
  thread { sum(0, producer, consumer) }
  thread { printSum(consumer) }

Example 5

Shows how to shutdown dataflow variables and bind threads to values to be able to interact with them (exit etc.).

Example in Scala

  // =======================================
  import DataFlow._

  // create four 'Int' data flow variables
  val x, y, z, v = new DataFlowVariable[Int]

  val main = thread {
    println("Thread 'main'")
   
    x << 1
    println("'x' set to: " + x())
   
    println("Waiting for 'y' to be set...")
   
    if (x() > y()) { 
      z << x
      println("'z' set to 'x': " + z())
    } else { 
      z << y
      println("'z' set to 'y': " + z())
    }
  
    // main completed, shut down the data flow variables
    x.shutdown
    y.shutdown
    z.shutdown
    v.shutdown
  }

  val setY = thread {
    println("Thread 'setY', sleeping...")
    Thread.sleep(5000)
    y << 2
    println("'y' set to: " + y())
  }  

  val setV = thread {
    println("Thread 'setV'")
    v << y
    println("'v' set to 'y': " + v())  
  }

  // shut down the threads  
  main ! 'exit
  setY ! 'exit
  setV ! 'exit