/beehive

Distributed Programming Framework in GoLang

Primary LanguageGoOtherNOASSERTION

Beehive Travis Build Status GoDoc

Beehive is a distributed programming framework that comes with built-in transactions, replication, fault-tolerance, runtime instrumentation, and optimized placement.

Installation

Option 1. Install Beehive using goget:

curl -sL https://git.io/goget | bash -s -- github.com/kandoo/beehive

Option 2. Install go 1.4+, set up your GOPATH, and install Beehive using go get:

go get github.com/kandoo/beehive

Test Your Setup. Enter Beehive's root directory ($GOPATH/src/github.com/kandoo/beehive) and run:

go test -v

Hello World

Let's write a simple example that counts the number of times we have said hello to each person. You can find the complete example in the GoDoc.

Message

Beehive is based on asynchronous message passing. Naturally, the first step is to define a Hello message:

// Hello represents a message in our hello world example.
type Hello struct {
	Name string // Name is the name of the person saying hello.
}

Message Handler

To handle Hello messages, we need to write an application that has a message handler for Hello. Pretty analogous to HTTP handlers, except we are processing messages not HTTP requests.

A message handler in Beehive consists of two functions: (i) Rcv and (ii) Map. Rcv is the function that actually processes a message. Since Beehive provides a generic runtime Map function that works for all applications, let's skip the Map function for now, and we will explain it in next section.

This is a simple Rcv function that handles Hello messages (don't panic it's all comments ;) ):

// Rcvf receives the message and the context.
func Rcvf(msg beehive.Msg, ctx beehive.RcvContext) error {
    // msg is an envelope around the Hello message.
    // You can retrieve the Hello, using msg.Data() and then
    // you need to assert that its a Hello.
    hello := msg.Data().(Hello)
    // Using ctx.Dict you can get (or create) a dictionary.
    dict := ctx.Dict("hello_dict")
    // Using Get(), you can get the value associated with
    // a key in the dictionary. Keys are always string
    // and values are generic interface{}'s.
    v, err := dict.Get(hello.Name)
    // If there is an error, the entry is not in the
    // dictionary. Otherwise, we set cnt based on
    // the value we already have in the dictionary
    // for that name.
    cnt := 0
    if err == nil {
        cnt = v.(int)
    }
    // Now we increment the count.
    cnt++
    // And then we print the hello message.
    ctx.Printf("hello %s (%d)!\n", hello.Name, cnt)
    // Finally we update the count stored in the dictionary.
    return dict.Put(hello.Name, cnt)
}

To register a message handler, we first create an application and then we register the Hello handler for our application:

// Create the hello world application and make sure .
app := beehive.NewApp("hello-world", beehive.Persistent(1))
// Register the handler for Hello messages.
app.HandleFunc(Hello{}, beehive.RuntimeMap(Rcvf), Rcvf)

Note that our application is persistent and will save its state on 1 node (i.e., persistent but not replicated).

Emit Hello

Now, to send a Hello message, you can emit it:

// Emit simply emits a message, here a
// string of your name.
go beehive.Emit(Hello{Name: "your name"})
// Emit another message with the same name
// to test the counting feature.
go beehive.Emit(Hello{Name: "your name"})

Whenever you emit a Hello message, it will be processed by all applications that have a handler for Hello. Here, we have only one application, but you could create different applications with different handlers for Hello. All of them would receive the Hello message.

Start

Finally, we need to start Beehive:

beehive.Start()

When you run the application (say go run helloworld.go), you will have the following output:

bee 1/HelloWorld/0000000000000402> hello your name (1)!
bee 1/HelloWorld/0000000000000402> hello your name (2)!

When you run the application one more time, you will see the following output:

bee 1/HelloWorld/0000000000000402> hello your name (3)!
bee 1/HelloWorld/0000000000000402> hello your name (4)!

Note that the counter is saved on disk, so you can safely restart your application.

Run a Cluster

This simple hello world application is actually a distributed application. The message handler is automatically sharded by Hello.Name. Later, we will explain how that happens. For now, let's just try to run our hello world application in a cluster.

Run the first node as you have done previously (go run helloworld.go). Wait until you see the hello messages:

bee 1/HelloWorld/0000000000000402> hello your name (5)!
bee 1/HelloWorld/0000000000000402> hello your name (6)!

Then, run a new node using the following command:

go run helloworld.go -addr localhost:7678 -paddrs localhost:7677 -statepath /tmp/beehive2

After you connect the second node, the first node should generate the following output:

bee 1/HelloWorld/0000000000000402> hello your name (7)!
bee 1/HelloWorld/0000000000000402> hello your name (8)!

Note that in the last command, -addr sets the listening address of the beehive server, -paddrs sets the address of the peers (the first node is listening on the default port, 7677), and -statepath sets where beehive should store its state and the dictionaries.

Note: You can reinitializing the cluster by removing both /tmp/beehive and /tmp/beehive2, and re-running the commands.

Deep Dive

Hives

A Hive is basically a Beehive server, representing one logical unit of computing (say, a physical or a virtual machine). Hives can form, join to, and leave a cluster. Beehive clusters are homogeneous, meaning that all hives in the same cluster are running the same set of applications.

Applications, Dictionaries, and Message Handlers

A Beehive application is a set of asynchronous message handlers. Message handlers simply process async messages and store their state in dictionaries. A dictionary is basically a hash map. Behind the scenes, these dictionaries are saved to disk and are replicated. A message handler is composed of a Rcv function that actually processes the message and a Map function declaring how messages should be sharded/partitioned. Beehive provides a generic Map functions (as you saw in the Hello World example) and also has a compiler that can generate Map functions based on your Rcv functions. Having said that, Map functions are almost always one-liners and are pretty easy to implement.

Map and Consistent Concurrency

To make the distributed and concurrent version of of message handlers, we want to balance the load of message processing among multiple go-routines across multiple hives. We need to do this in a way that the application's behavior remains identical to when we use only a single, centralized go-routine. To do so, we need to preserve the consistency of application dictionaries.

In other words, we want to make sure that each entry (or as we call them, cell) in an application dictionary is always accessed on the same logical go-routine. Otherwise, we can't guarantee that the application behaves consistently when distributed over multiple hives. For example, what would happen to our hello world application if two different go-routines could read and modify the same entry concurrently?

To that end, for each message, we need to know what are the keys used to process the message in the Rcv function of a message handler. We call this the mapped cells of that message. Each message handler, in addition to its Rcv function, needs to provide a Map function that maps an incoming message to cells or simply keys in application dictionaries. Map functions are usually very simple to implement, but you can also use Beehive's generic RuntimeMap function or the Beehive compiler to generate the Map function.

Bees

Applications and their message handlers are passive in Beehive. Internally, each hive has a set of go-routines called bees that run the message handlers for each application. Each bee exclusively owns a set of cells. These cells are the cells that must be accessed by the same go-routine to preserve consistency. Cells are locked by bees using an internal distributed consensus mechanism implemented using Raft. Bees persist their cells if needed and, when a hive restarts, we reload all the bees.

Moreover, for replicated applications, bees will form a colony of bees (itself and some other bees on other hives) and will consistently replicate its cells using raft. When a bee fails, we hand its cells and workload to other bees in the colony. The size of a colony is equal to the application's replication factor.

Life of a Message

When a message is emitted on a hive, we first pass that message to the Map function of the registered message handlers for that type of message. The Map function returns the mapped cells of that message. Then, we relay the message to the bee that has any of the keys in the mapped cell. That bee in response calls the Rcv function of that message handler. This bee may be on the same hive or can be on another hive.

If there is no such bee (this is the first message mapped to those cells), we elect a hive, create one bee on it, and relay the message. By default, we create the bee on the local hive, but applications can register custom placement methods to change this behavior. For example, using this option, one can implement a random placement.

Reply Messages

So far, we have seen emitted messages but we cannot use that for communication between applications, say to implement a request response system. For example, a better way to implement our hello world application would be emitting a Hello message and waiting for a response that contains the count.

In Beehive, you can reply to a message using ReplyTo method. For example, we can rewrite our hello world application using reply messages:

func Rcvf(msg beehive.Msg, ctx beehive.RcvContext) error {
	hello := msg.Data().(Hello)
	dict := ctx.Dict("hello_dict")
	v, err := dict.Get(hello.Name)
	cnt := 0
	if err == nil {
			cnt = v.(int)
	}
	cnt++
	// Reply to the message with the count of hellos.
	ctx.ReplyTo(msg, cnt)
	return dict.Put(name, cnt)
}

To use this version, we also need to emit Hello messages and wait for the application's response. We can implement this using Sync:

go beehive.Start()
defer beehive.Stop()

name = "your name"
// Sync sends the Hello message and waits until it receives the reply.
res, err := beehive.Sync(context.TODO(), Hello{Name: name})
if err != nil {
	...
	return
}
cnt := res.(int)
fmt.Printf("%s (%d)!\n", name, cnt)

HTTP Handlers

Beehive applications can register custom HTTP handlers, handling requests to URLs with the /apps/APP_NAME/ prefix. To register a HTTP handler, applications can use HandleHTTP:

app.HandleHTTP("/", httpHandler)

Internally we use Gorilla mux, and we expose the sub-router of each application. As a result, applications can match against regular expression, use parameters in the URL, and specify HTTP methods:

app.HandleHTTP("/{name}", httpHandler).Methods("POST")

HTTP handlers usually communicate with the application using synchronous messaging. For example, we can implement a HTTP handler for our hello world application as follows:

type HelloHTTPHandler struct {}

func (h *HelloHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	name, ok := vars["name"]
	if !ok {
		http.Error(w, "no name", http.StatusBadRequest)
		return
	}

	res, err := beehive.Sync(context.TODO(), Hello{Name: name})
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	fmt.Fprintf(w, "hello %s (%d)\n", name, res.(int))
}

Now you can send HTTP requests to this handler using curl:

curl -X POST localhost:7677/apps/hello-world/yourname

And you should see the following output:

hello yourname (1)

Detached Handlers

Sometimes you need to create go-routines that read data from network connections or file system and generate messages. For example, to implement a network listener for a custom protocol, we need to run a network listener and then run go-routines for each established connection. To implement such functionalities in Beehive, we can use detached handlers. A detached handler is different than a message handler in a sense that it is started in its Start method and only receives replies.

The Detached Example demonstrates how we used detached handlers to implement a text-based protocol in Beehive. When you run this example, you can telnet to port 6789 for sending names to the hello world application:

telnet localhost 6789
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
your name
hello your name (1)!
your name
hello your name (2)!
your name
hello your name (3)!

Transactions

By default, all applications are transactional in Beehive, meaning that the bee opens up a transaction for each call to the Rcv function. If the Rcv panics or returns an error the transaction is automatically aborted, and otherwise is committed.

In Beehive, transactions include all the modification to the dictionaries and all the messages emitted in a Rcv function. That is, when a transaction is aborted, all the messages and all dictionary modifications are dropped.

Transactions are by default implicit, but message handlers can explicitly control transactions. To open a transaction in a Rcv function you can use BeginTx(), CommitTx() and AbortTx():

func Rcvf(msg bh.Msg, ctx bh.RcvContext) error {
	ctx.BeginTx()
	d1 := ctx.Dict("d1")
	d1.Put("k", []byte{1})
	ctx.Emit(MyMsg("test1"))
	// Update d1/k and emit test1.
	ctx.CommitTx()

	ctx.BeginTx()
	d2 := ctx.Dict("d2")
	d2.Put("k", []byte{2})
	ctx.Emit(MyMsg("test2"))
	// DO NOT update d2/k and DO NOT emit test2.
	ctx.AbortTx()
}

To disable automatic transactions for an application (say, for performance reasons), use the NonTransactional option.

Runtime Instrumentation and Optimization

Beehive is capable of automatic runtime instrumentation. It measures the messages exchanged between different bees and can use it to live migrate bees to minimize latency. You can enable instrumentation and optimization by passing -instrument command line arguments to your Beehive program. You can also access the instrumentation data on Beehive's web interface (by default, http://localhost:7677/).

Projects using Beehive

Mailing List

Google group: https://groups.google.com/forum/#!forum/beehive-dev

Please report bugs in github, not in the group.

Publications