/twisted-csp

Go-style concurrency for Twisted

Primary LanguagePythonEclipse Public License 1.0EPL-1.0

Twisted CSP

Communicating sequential processes for Twisted. Channels like Go, or Clojurescript's core.async.

WARNING: This is currently alpha software.

This is a very close port of Clojurescript's core.async. The significant difference is that light-weight processes are implemented using generators (yield) instead of macros.

  • Channel operations must happen inside "light-weight processes" (code flows, not actual threads).
  • Light-weight processes are spawn by calling go, go_channel, go_deferred or by using their decorator equivalents.
  • Most channel operations must follow the form of yield do_sth(...).
def slow_pipe(input, output):
    while True:
        value = yield take(input)
        yield sleep(0.5)
        if value is None: # input closed
            close(output)
            yield stop()
        else:
            yield put(output, value)

go(slow_pipe, chan1, chan2))

Examples

Function returning channel (http://talks.golang.org/2012/concurrency.slide#25).

def boring(message):
    c = Channel()
    def counter():
        i = 0
        while True:
            yield put(c, "%s %d" % (message, i))
            yield sleep(random.random())
            i += 1
    go(counter)
    return c


def main():
    b = boring("boring!")
    for i in range(5):
        print "You say: \"%s\"" % (yield take(b))
    print "You are boring; I'm leaving."

Pingpong (http://talks.golang.org/2013/advconc.slide#6).

class Ball:
    hits = 0


@process
def player(name, table):
    while True:
        ball = yield take(table)
        ball.hits += 1
        print name, ball.hits
        yield sleep(0.1)
        yield put(table, ball)


def main():
    table = Channel()

    player("ping", table)
    player("pong", table)

    yield put(table, Ball())
    yield sleep(1)

Timeout using alts (select in Go) (http://talks.golang.org/2012/concurrency.slide#35).

c = boring("Joe")
while True:
    value, chan = yield alts([c, timeout(0.8)])
    if chan is c:
        print value
    else:
        print "You're too slow."
        yield stop()

Running the examples

Use the run script, like

./run example.go.timeout_for_whole_conversation_using_select

Examples under example/go are ports of Go examples from:

Playing around in a REPL

Python 2.7.5+ (default, Sep 19 2013, 13:48:49)
[GCC 4.8.1] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import thread
>>> from csp import *
>>> from twisted.internet import reactor
>>> thread.start_new_thread(reactor.run, (), {"installSignalHandlers": False})
140038185355008
>>> class Ball:
...     hits = 0
...
>>> def player(name, table):
...     while True:
...         ball = yield take(table)
...         if ball is None: # channel's closed
...             print name, "Ball's gone"
...             break
...         ball.hits += 1
...         print name, ball.hits
...         yield sleep(0.1)
...         yield put(table, ball)
...
>>> def main():
...     table = Channel()
...     go(player, "ping", table)
...     go(player, "pong", table)
...     yield put(table, Ball())
...     yield sleep(1)
...     close(table)
...
>>> reactor.callFromThread(lambda: go(main))
>>> ping 1
pong 2
ping 3
pong 4
ping 5
pong 6
ping 7
pong 8
ping 9
pong 10
ping Ball's gone
pong Ball's gone

>>>

Limitations

  • Does not work in a multi-threaded environment, at all (this is fixable though).
  • Channel's normal API cannot be used outside of a process (more precisely outside of the generator function of a process).
  • Generator functions must be used to spawn processes. This makes it less composable than in Go (where the constructs are built-in), or Clojurescript (where macros rule).
  • Forgetting to yield can cause subtle bugs.
  • Cooperative multi-processing (not sure if this is a big problem though).

TODO

  • Multiplexing, mixing, publishing/subscribing.
  • Channel operations (map, filter, reduce...).
  • Support multi-threaded environment (porting Clojure's core.async not Clojurescript's).
  • Write tests.
  • Think of a sensible error handling strategy (I think this should be decided by client code not library code though).
    • Should there be a separate error channel?
    • Should channels deliver (result, error) tuples?
    • Should errors be treated as special values (caught exceptions re-thrown when taken)?
  • Support other reactors, e.g. Tornado (should be easy, as the dispatcher is the only thing that depends on twisted).
  • More documentation.
  • More examples (focusing on leveraging Twisted's rich network capabilities).
  • put_then_callback, take_then_callback execute the supplied callback in the same tick if result is immediately available. This can cause problems (especially if they are public API).

Inspiration

Other Python CSP libraries:

These libraries use threads/processes (except for pycsp, which has support for greenlets (which is not portable)). This makes implementation simpler (in principle), sacrificing efficiency (but managing threads/processes can be a chore). On the other hand they are much more generic, and support networked channels (that is not necessarily a good thing though). TODO: More elaborate comparison.