Streaming Protocol Buffers messages over TCP in Golang
BuffStreams is a set of abstraction over TCPConns for streaming connections that write data in a format involving the length of the message + the message payload itself (like Protocol Buffers, hence the name).
BuffStreams gives you a simple interface to start a (blocking or non) listener on a given port, which will stream arrays of raw bytes into a callback you provide it. In this way, BuffStreams is not so much a daemon, but a library to build networked services that can communicate over TCP using Protocol Buffer messages.
I was writing a few different projects for fun in Golang, and kept writing code something like what is in the library, but less organized. I decided to focus on the networking code, pulling it out and improving it so I knew it could be trusted to perform reliably across projects.
There is nothing special or magical about Buffstreams, or the code in here. The idea isn't that it's a better, faster socket abstraction - it's to do as much of the boilerplate for you when handling streaming data like protobuff messages, with as little impact to performance as possible. Currently, Buffstreams is able to do over 1.1 million messsages per second, at 110 bytes per message on a single listening socket which saturates a 1gig nic.
The idea of Buffstreams is to do the boring parts and handle common errors, enabling you to write systems on top of it, while performing with as little overhead as possible.
Since protobuff messages lack any kind of natural delimeter, BuffStreams uses the method of adding a fixed header of bytes (which is configurable) that describes the size of the actual payload. This is handled for you, by the call to write. You never need to pack on the size yourself.
On the server side, it will listen for these payloads, read the fixed header, and then the subsequent message. The server must have the same maximum size as the client for this to work. BuffStreams will then pass the byte array to a callback you provided for handling messages received on that port. Deserializing the messages and interpreting their value is up to you.
One important note is that internally, BuffStreams does not actually use or rely on the Protocol Buffers library itself in any way. All serialization / deserialization is handled by the client prior to / after interactions with BuffStreams. In this way, you could theoretically use this library to stream any data over TCP that uses the same strategy of a fixed header of bytes + a subsequent message body.
Currently, I have only used it for ProtocolBuffers messages.
You can optionally enable logging of errors, although this naturally comes with a performance penalty under extreme load.
I've tried very hard to optimize BuffStreams as best as possible, striving to keep it's averages above 1M messages per second, with no errors during transit.
See Bench
Download the library
go get "github.com/barnettzqg/buffstreams"
Import the library
import "github.com/barnettzqg/buffstreams"
For a quick example of a complete end to end client and server, check out the examples in the test/ directory, namely test/client/test_client.go and test/server/test_server.go. These two files are designed to work together to demonstrate an end to end integration of Buffstreams, in the simplest possible way.
One of the core objects in Buffstreams is the TCPListener. This struct allows you to open a socket on a local port, and begin waiting for clients to connect. Once a connection is made, each full message written by the client will be received by the Listener, and a callback you define will be invoked with the message contents (an array of bytes).
To begin listening, first create a TCPListenerConfig object to define how the listener should behave. A sample TCPListenerConfig might look like this:
cfg := buffstreams.TCPListenerConfig{
EnableLogging: false,
MaxMessageSize: 4096,
Address: buffstreams.FormatAddress("", strconv.Itoa(5031)),
Callback: func(me []byte) error {
logrus.Info(string(me))
return nil
},
}
btl, err := buffstreams.ListenTCP(cfg)
Once you've opened a listener this way, the socket is now in use, but the listener itself has not yet begun to accept connections.
To do so, you have two choices. By default, this operation will block the current thread. If you want to avoid that, and use a fire and forget approach, you can call
err := btl.StartListeningAsync()
If there is an error while starting up, it will be returned by this method. Alternatively, if you want to handle running the call yourself, or don't care that it blocks, you can call
err := btl.StartListening()
The way Buffstreams handles acting over the incoming messages is to let you provide a callback to operate on the bytes. ListenCallback takes in an array/slice of bytes, and returns an error.
type ListenCallback func([]byte) error
The callback will receive the raw bytes for a given protobuff message. The header containing the size will have been removed. It is the callbacks responsibility to deserialize and act upon the message.
The Listener gets the message, your callback does the work.
A sample callback might start like so:
func ListenCallbackExample ([]byte data) error {
msg := &message.ImportantProtoBuffStreamingMessage{}
err := proto.Unmarshal(data, msg)
// Now you do some stuff with msg
...
}
The callback is currently run in it's own goroutine, which also handles reading from the connection until the reader disconnects, or there is an error. Any errors reading from a connection incoming will be up to the client to handle.
To begin writing messages to a new connection, you'll need to dial a using TCPConnConfig
cfg := buffstreams.TCPConnConfig{
MaxMessageSize: 2048,
Address: buffstreams.FormatAddress("127.0.0.1", strconv.Itoa(5031)),
}
Once you have a configuration object, you can Dial out.
btc, err := buffstreams.DialTCP(cfg)
This will open a connection to the endpoint at the specified location. Additionally, the TCPConn that the TCPListener returns will also allow you to write data, using the same methods as below.
From there, you can write your data
bytesWritten, err := btc.Write(msgBytes, true)
If there is an error in writing, that connection will be closed and be reopened on the next write. There is no guarantee if any the bytesWritten value will be >0 or not in the event of an error which results in a reconnect.
There is a third option, the provided Manager class. This class will give you a simple but effective Manager abstraction over dialing and listening over ports, managing the connections for you. You provide the normal configuration for dialing out or listening for incoming connections, and let the manager hold onto the references. The Manager is considered threadsafe, as it internally uses locks to ensure consistency and coordination between concurrent access to the connections being held.
The Manager is not really a "Pool", in that it doesn't open and hold X connections for you to re-use. However, it maintains many of the same behaviors as a pool, including caching and re-using connections, and as mentioned is threadsafe.
Creating a Manager
bm := buffstreams.NewManager()
Listening on a port. Manager always makes this asynchronous and non blocking
// Assuming you've got a configuration object cfg, see above
err := bm.StartListening(cfg)
Dialing out to a remote endpoint
// Assuming you've got a configuration object cfg, see above
err := bm.Dial(cfg)
Having opened a connection, writing to that connection in a constant fashion
bytesWritten, err := bm.Write("127.0.0.1:5031", dataBytes)
The Manager will keep listening and dialed out connections cached internally. Once you open one, it'll be kept open. The writer will match your incoming write destination, such that any time you write to that same address, the correct writer will be re-used. The listening connection will simply remain open, waiting to receive requests.
You can forcibly close these connections, by calling either
err := bm.CloseListener("127.0.0.1:5031")
or
err := bm.CloseWriter("127.0.0.1:5031")
Special thanks to those who have reported bugs or helped me improve Buffstreams
- Judson White
- Release proper set of benchmarks, including more real-world cases
- Configurable retry for the client, configurable errored-message queue for user to define failover process to handle.
- Optional channel based streaming approach instead of callbacks
- Further library optimizations via tools such as pprof
Apache v2 - See LICENSE