/reactor

Asynchronous Event Driven IO for .NET

Primary LanguageC#MIT LicenseMIT

Reactor

Asynchronous event driven IO for .net

Reactor.Loop.Start();

var server = Reactor.Http.Server.Create(context => {

	context.Response.Write("hello world!!");

	context.Response.End();
	
}).Listen(8080);

overview

Reactor is a event driven, asynchronous io and networking framework written for Microsoft.Net, Mono, Xamarin, and Unity3D platforms. Reactor is heavily influenced by libuv and nodejs, and aims to mirror both their feature set, and ultimately provide easy interoperability between .net applications and real-time network services.

Reactor is specifically written to target .net applications running versions as low as 2.0. Developers can leverage Reactor to both consume realtime network services, as well as expose realtime services of their own.

download reactor 0.9

contents

The following section describes setting up a Reactor application.

At its core, reactor requires that users start an event loop. The reactor event loop internally demultiplexes asynchronous callback operations back to the caller. The following describes recommended approaches to running a loop.

The following describes running a reactor event loop in a typical console application. Calling Reactor.Loop.Start() will begin a background thread which will enumerate reactors internal event queue. By doing this, reactor will dispatch any asynchronous completation callbacks to the caller. The in example below, we start the loop, make a request to google, then (optionally) stop the loop.

class Program 
{
	static void Main(string [] args)
	{
		Reactor.Loop.Start();

        Reactor.Http.Request.Get("http://google.com", (exception, buffer) => {

			Console.WriteLine(buffer.ToString("utf8"));

			Reactor.Loop.Stop(); // optional
        });
	}
}

When developing UI applications, handling asynchronous callbacks typically requires the developer to manage synchronization back on the application UI thread by way of a synchronization context. Reactor provides a convienient overload for starting loops that accepts a System.Threading.SynchronizationContext as a argument. In the example below, the loop is started with System.Threading.SynchronizationContext.Current on OnLoad(). This ensures that all async completations are always returned to the UI thread.

public partial class Form1 : Form
{
    public Form1()
    {
        InitializeComponent();
    }

    protected override void OnLoad(EventArgs e)
    {
        base.OnLoad(e);

        Reactor.Loop.Start(System.Threading.SynchronizationContext.Current);
    }

    private void button1_Click(object sender, EventArgs e)
    {
        Reactor.Http.Request.Get("http://google.com", (exception, buffer) => {

			this.textbox1.Text = buffer.ToString("utf8");
        });
    }
}

In Unity3D, a SynchronizationContext is not available to developers. Rather, Unity3D requires developers to use cooroutines to orchestrate asynchrony. In these scenarios, Reactor provides a Reactor.Loop.Enumerator() that Unity can use to enumerate completed asynchronous operations. The example below demonstrates how.

using UnityEngine;
using System.Collections;

public class MyGameObject : MonoBehaviour {
	
	void Start () {
		
        Reactor.Http.Request.Get("http://google.com", (exception, buffer) => {

			// ready to go!!
        });
	}
	
	void Update () {

		StartCoroutine ( Reactor.Loop.Enumerator() );
	}
}

Reactor comes bundled with two timing primitives, Timeouts and Intervals. These are fashioned after javascript setTimeout() and setInterval() respectively.

Use the Timeout class set a delay.

Reactor.Timeout.Create(() => {

	Console.WriteLine("this code will be run in 1 second");

}, 1000);

Use the Interval class setup a repeating interval.

Reactor.Interval.Create(() => {

	Console.WriteLine("this code will be run every 2 seconds");

}, 2000);

Additionally, Intervals can be cleared...

Reactor.Interval interval = null;

interval = Reactor.Interval.Create(() => {

	Console.WriteLine("this code will be run once");

	interval.Clear();
	
}, 2000);

Reactor has a single buffer primitive which is used to buffer data in memory, and to act as a container for data transmitted via a stream. The buffer contains read and write operations, and is type passed back on all OnData events.

Reactor.Tcp.Server.Create(socket => {
	
	socket.OnData += data => {
	
		// data is of type Reactor.Buffer
	};

    var buffer = Reactor.Buffer.Create();

    buffer.Write(10.0f);

    buffer.Write(20.0f);

    buffer.Write(30.0f);

    socket.Write(buffer);

}).Listen(5000);

Reactor aligns closely with the evented io model found in nodejs. Reactor implements IReadable, IWriteable, or IDuplexable interfaces across file io, tcp, http request / response, stdio etc, with the intent of enabling effecient, evented piping of data across transports.

Reactor.Http.Server.Create(context => {

	var readstream = Reactor.File.ReadStream.Create("c:/video.mp4");

    context.Response.ContentLength = readstream.Length;

    context.Response.ContentType = "video/mp4";

    readstream.Pipe(context.Response);

}).Listen(8080);

Supports OnData, OnEnd and OnError events. As well as Pause(), Resume() and Pipe().

The following demonstrates opening a file as a readstream.

var readstream = Reactor.File.ReadStream.Create("myfile.txt");

readstream.OnData += (data) => { 
	
	// fired when we have read data from the file system.
};

readstream.OnEnd += () => { 

	// fired when we have read to the end of the file.
};

readstream.OnError += (error) => { 
	
	// fired on error. error is of type System.Exception.
};

Supports Write(), Flush() and End() operations on a underlying stream.

var writestream = Reactor.File.WriteStream.Create("myfile.txt");

writestream.Write("hello");

writestream.Write(123);

writestream.Write(new byte[] {0, 1, 2, 3});

writestream.End();

Reactor provides a evented abstraction for the .net type System.IO.FileStream. The following outlines its use.

The following creates a reactor file readstream. The example outputs its contents to the console window.

var readstream = Reactor.File.ReadStream.Create("input.txt");

readstream.OnData += (data) => Console.Write(data.ToString("utf8"));

readstream.OnEnd  += ()     => Console.Write("finished reading");

The followinf creates a reactor file writestream. The example writes data and ends the stream when complete.

var writestream = Reactor.File.WriteStream.Create("output.txt");

writestream.Write("hello world");

writestream.End();

Reactor provides a evented abstraction over the http bcl classes.

The following will create a simple http server and listen on port 8080.

var server = Reactor.Http.Server.Create(context => {

    context.Response.Write("hello world");

    context.Response.End();

}).Listen(8080);

The reactor http server passes a 'context' for each request. The context object contains Request, Response objects, which are in themselves, implementations of IReadable and IWritable respectively.

Reactor provides a evented abstraction over both HttpWebRequest and HttpWebResponse classes.

Make a GET request.

var request = Reactor.Http.Request.Create("http://domain.com", (response) => {

	response.OnData += (data) => Console.WriteLine(data.ToString(Encoding.UTF8));

	response.OnEnd += ()      => Console.WriteLine("the response has ended");

});

request.End(); // signals to make the request.

Make a POST request

var request = Reactor.Http.Request.Create("http://domain.com", (response) => {

    response.OnData += (data) => Console.WriteLine(data.ToString(Encoding.UTF8));
        
});

byte[] postdata = System.Text.Encoding.UTF8.GetBytes("this is some data");

request.Method         = "POST";

request.ContentLength  = postdata.Length;

request.Write(postdata);

request.End();

Reactor provides a evented abstraction over the System.Net.Sockets.Socket TCP socket.

Create a tcp socket server. The following example emits the message "hello world" to a connecting client, then closes the connection with End().

Reactor.Tcp.Server.Create(socket => {

	socket.Write("hello there");
	
	socket.End();

}).Listen(5000);

Reactor tcp sockets are evented abstractions over System.Net.Socket.Socket. Tcp sockets are implementations of type IDuplexable. The following code connects to the server in the previous example. (assumed to be both on localhost)

var client = Reactor.Tcp.Socket.Create(5000);

client.OnConnect += () => { // wait for connection.

    client.OnData += (d) => Console.WriteLine(d.ToString("utf8"));

    client.OnEnd  += ()  => Console.WriteLine("tcp transport closed");
};

Reactor provides a evented abstraction over a System.Net.Sockets.Socket for UDP sockets. The following demonstrates setting up two udp endpoints, and exchanging messages between both.

The following demonstrates setting up two sockets, one to connect to the other.

//--------------------------------------------------
// socket a: create a udp socket and bind to port. 
// on receiving a message. print to console.
//--------------------------------------------------

var a = Reactor.Udp.Socket.Create();
            
a.Bind(System.Net.IPAddress.Any, 5000);
           
a.OnMessage += (remote_endpoint, message) => {

    Console.WriteLine(System.Text.Encoding.UTF8.GetString(message));
};

//--------------------------------------------------
// socket b: create a udp socket and send message
// to port localhost on port 5000.
//--------------------------------------------------

var b = Reactor.Udp.Socket.Create();

b.Send(IPAddress.Loopback, 5000, System.Text.Encoding.UTF8.GetBytes("hello from b"));

Reactor has the ability to execute threads within the applications thread pool.

In the following example, a 'task' is created which accepts an integer argument, and returns a integer. Inside the body of the task, Thread.Sleep() is invoked to emulate some long running computation.

var task = Reactor.Async.Task<int, int>(interval => {

    Thread.Sleep(interval); // run computation here !

    return 0;
});

Once the task has been created, the user can invoke the process with the following.

task(10000, (error, result) => {

    Console.WriteLine(result);
});