For Okku version 0.1.5.
Welcome to the first tutorial on how to get started with Okku and Clojure. This tutorial is heavily inspired from the Akka from Java tutorial. Just as they assume the reader knows Java and the general ideas behind Akka, this tutorial assumes that the reader knows Clojure and the general ideas behind Akka.
In this tutorial, we will first set up a new project with Leiningen, and then see how to use Okku actors to compute Pi using the following algorithm:
You should be able to completely reproduce the source code for this tutorial by following along this file. Should you not want to type it all yourself, however, you can clone this git repository.
Please be aware that the tutorial repository is sometimes updated to match the development version of Okku. Unless you want to help update the tutorial, it is recommended to checkout the latest tag after cloning (version tags in the tutorial match the corresponding version tags for the library).
This tutorial assumes you have Java 1.6 or later and access to the Leiningen script.
If you have cloned this git repository (and checked the latest version tag), you can run
lein run
to download all the dependencies, compile all the code and run the program.
If, however, you prefer to create your own version of the program step by step, you can create a new empty project by issuing the
lein new okku-pi
command. Once the project is initialized, you should add the line
[org.clojure.gaverhae/okku "0.1.5"]
to the :dependencies
vector of your project.clj
.
All of the code will be written in the single file src/okku_pi/core.clj
.
In order to make a runnable program, edit the project.clj
file to add the
following key to your defproject
map:
:main okku-pi.core
You can now run your program with
lein run
though it won't do much right now.
The last step before we start writing the actual code is to add a dependency on
Okku to the okku-pi.core
namespace. Your namespace declaration should be
modified to look like:
(ns okku-pi.core
(:use okku.core))
From the Akka tutorial:
The design we are aiming for is to have one Master actor initiating the computation, creating a set of Worker actors. Then it splits up the work into discrete chunks, and sends these chunks to the different workers in a round-robin fashion. The master waits until all the workers have completed their work and sent back results for aggregation. When computation is completed the master sends the result to the Listener, which prints out the result.
With this in mind, we need four types of messages:
compute
: sent from the main function to the Master to start the computationwork
: sent from the Master to a Worker, contains an assignmentresult
: sent from a Worker to the Master with a resultapprox
: sent from the Master to the Listener with the final result (and the computation time)
Since we're working in Clojure, an easy way to represent these messages is to use maps, and as we'll have to create them multiple times, let's define some functions to do that:
(defn m-compute []
{:type :compute})
(defn m-work [start n-elem]
{:type :work :start start :n-elem n-elem})
(defn m-result [value]
{:type :result :value value})
(defn m-approx [pi dur]
{:type :approx :pi pi :dur dur})
We can now define one of our workers. First of all, our worker will be asked to
compute parts of the sum; specifically, they will receive a starting point and
a number of elements to compute, and have to compute the result. To help them
do that, let's define a function calculate-pi-for
:
(defn calculate-pi-for [^long st ^long n]
(let [limit (* (inc st) n)]
(loop [i (* st n) tot 0.0]
(if (= i limit)
tot
(recur (unchecked-inc i) (+ tot
(* 4.0 (/ (double (unchecked-add 1 (unchecked-negate (unchecked-multiply 2 (unchecked-remainder-int i 2)))))
(double (unchecked-add 1 (unchecked-multiply 2 i)))))))))))
As an aside, this function is rather ugly. Do not hesitate to contact me if you know how to make it more beautiful while retaining its performances.
We can now define our worker actor. To do that, Okku defines a helper macro
called actor
, which is a wrapper around a call to proxy
to define a
subclass of akka.actor.UntypedActor
. UntypedActor
has many overridable
methods, which you can read about in the Akka
documentation, but the one method you
should always redefine is onReceive
, which receives a message as an
argument and reacts to it. The actor
macro actually returns a
akka.actor.Props
object.
Since most of the time, when you receive a message, you have to react in a different way based on what message it is, Okku provides a second helper macro to handle dispatching on parts of the message. The worker actor can thus be defined by:
(def worker
(actor (onReceive [{t :type s :start n :n-elem}]
(dispatch-on t
:work (! (m-result (calculate-pi-for s n)))))))
where the !
macro means "answer to the sender" when it has only one argument
(it can be used with two arguments, (! target msg)
, to send a message to
an arbitrary actor).
The master actor is a bit more complex than the worker, as it can receive multiple message types and has to somehow keep track of some internal state.
The first thing that the Master actor must do when initialized is to create the
workers. For the Master actor we will need four parameters: the number of
worker actors to create, the number of work
messages we want to send (which
will be equal to the number of "chunks" to compute), the number of elements in
each chunk, and a reference to the listener to which it must send the final
result. To that end, we shall create a function that generates a master based
on these four parameters. This also allows us to solve the internal state
problem by closing over a let form in the function.
Finally, upon initialization, the master actor has to create the workers, and
that is done through the spawn
macro. This macro takes as first argument a
Props
, such as is returned from actor
, and any three of the following
keyword arguments: in
, router
and name
.
The last useful macro we're going to need for the master actor is stop
,
which stops the current actor (along with all its children, in this case
the workers).
We can now write the master actor in a pretty straightforward way:
(defn master [number-of-workers number-of-messages number-of-elements result-listener]
(let [workerRouter (atom nil)
res (atom {:pi 0 :nr 0})
start (System/currentTimeMillis)]
(actor
(preStart [] (reset! workerRouter (spawn worker :name "workerRouter"
:router (round-robin-router number-of-workers))))
(onReceive [{t :type v :value}]
(dispatch-on t
:compute (dotimes [n number-of-messages]
(! @workerRouter (m-work n number-of-elements)))
:result (do (swap! res #(merge-with + % {:pi v :nr 1}))
(when (= (:nr @res) number-of-messages)
(! result-listener (m-approx (:pi @res)
(- (System/currentTimeMillis) start)))
(stop))))))))
The listener needs only one capability that we have not discussed yet: it must
shut down the entire system when it receives the final message. This is done
through the helper macro shutdown
:
(def listener
(actor
(onReceive [{t :type pi :pi dur :dur}]
(dispatch-on t
:approx (do (println (format "\n\tPi approximation: \t\t%1.8f\n\tCalculation time: \t%8d millis"
pi dur))
(shutdown))))))
All that is needed now is to create the actor system and start the calculation
by creating the Master actor and sending him the compute
message from the
main
method of our program. There is no big surprise here, except for the
fact that as !
does not work outside of an actor, we have to send this
first message by a direct use of Java interop:
(defn -main [& args]
(let [workers (if args (Integer/parseInt (first args)) 4)
elements 10000 messages 10000
system (actor-system "PiSystem" :local true)
result-printer (spawn listener :in system :name "listener")
coordinator (spawn (master workers messages elements result-printer)
:in system :name "master")]
(println "Number of workers: " workers)
(.tell coordinator (m-compute))
(.awaitTermination system)))
You can run the program by typing lein run
at the command prompt from the
root of the project.
Note that you can pass an integer argument, as in lein run 3
, to change the
number of working actors from the default 4. This allows you to quickly test
how much the computation time varies with the number of actors (the optimum
will likely be for a number of actors somewhere between one and two times your
number of cores).
See the Akka documentation for more information on configuration.
So this is how to use the Okku library to simplify interop with Akka. This tutorial will, however, leave a Clojure programmer pretty much unimpressed by Akka, as this computation could just as easily have been done with Clojure agents.
Check out the remote tutorial to see how to distribute computation with Okku.