/wangle

fork facebook Wangle for learning folly lib

Primary LanguageC++Apache License 2.0Apache-2.0

Travis Build Status CI Status

Wangle

C++ networking library

Wangle is a library that makes it easy to build protocols, application clients, and application servers.

It's like Netty + Finagle smooshed together, but in C++

Building and Installing

The main dependencies are:

Once folly is installed, run the following inside the wangle directory to build, test, and install wangle:

cmake .
make
ctest
sudo make install

Tutorial

There is a tutorial here that explains the basics of Wangle and shows how to build an echo server/client.

Examples

See the examples/ directory for some example Wangle servers and clients

License

Wangle is Apache 2.0-licensed.

Contributing

See the CONTRIBUTING file for how to help out.

Documentation

Wangle interfaces are asynchronous. Interfaces are currently based on Futures, but we're also exploring how to support fibers

Client / Server abstraction #

You're probably familiar with Java's Netty, or Python's twisted, or similar libraries.

It is built on top of folly/async/io, so it's one level up the stack from that (or similar abstractions like boost::asio)

ServerBootstrap - easily manage creation of threadpools and pipelines

ClientBootstrap - the same for clients

Pipeline - set up a series of handlers that modify your socket data

Request / Response abstraction #

This is roughly equivalent to the Finagle library.

Aims to provide easy testing, load balancing, client pooling, retry logic, etc. for any request/response type service - i.e. thrift, http, etc.

Service - a matched interface between client/server. A server will implement this interface, and a client will call in to it. These are protocol-specific

ServiceFilter - a generic filter on a service. Examples: stats, request timeouts, rate limiting

ServiceFactory - A factory that creates client connections. Any protocol specific setup code goes here

ServiceFactoryFilter - Generic filters that control how connections are created. Client examples: load balancing, pooling, idle timeouts, markdowns, etc.

ServerBootstrap

Easily create a new server

ServerBootstrap does the work to set up one or multiple acceptor threads, and one or multiple sets of IO threads. The thread pools can be the same. SO_REUSEPORT is automatically supported for multiple accept threads. tcp is most common, although udp is also supported.

Methods #

childPipeline(PipelineFactory<Pipeline>)

Sets the pipeline factory for each new connection. One pipeline per connection will be created.

group(IOThreadPoolExecutor accept, IOThreadPoolExecutor io)

Sets the thread pools for accept and io thread pools. If more than one thread is in the accept group, SO_REUSEPORT is used. Defaults to a single accept thread, and one io thread per core.

bind(SocketAddress),bind(port)

Binds to a port. Automatically starts to accept after bind.

stop()

Stops listening on all sockets.

join()

Joins all threadpools - all current reads and writes will be completed before this method returns.

NOTE: however that both accept and io thread pools will be stopped using this method, so the thread pools can't be shared, or care must be taken using shared pools during shutdown.

waitForStop()

Waits for stop() to be called from another thread.

Other methods #

channelFactory(ServerSocketFactory)

Sets up the type of server. Defaults to TCP AsyncServerSocket, but AsyncUDPServerSocket is also supported to receive udp messages. In practice, ServerBootstrap is only useful for udp if you need to multiplex the messages across many threads, or have TCP connections going on at the same time, etc. Simple usages of AsyncUDPSocket probably don't need the complexity of ServerBootstrap.

pipeline(PipelineFactory<AcceptPipeline>)

This pipeline method is used to get the accepted socket (or udp message) *before* it has been handed off to an IO thread. This can be used to steer the accept thread to a particular thread, or for logging.

See also AcceptRoutingHandler and RoutingDataHandler for additional help in reading data off of the accepted socket before it gets attached to an IO thread. These can be used to hash incoming sockets to specific threads.

childHandler(AcceptorFactory)

Previously facebook had lots of code that used AcceptorFactories instead of Pipelines, this is a method to support this code and be backwards compatible. The AcceptorFactory is responsible for creating acceptors, setting up pipelines, setting up AsyncSocket read callbacks, etc.

Examples #

A simple example:

ServerBootstrap<TelnetPipeline> server;                                                                                                      
server.childPipeline(std::make_shared<TelnetPipelineFactory>());                                                                             
server.bind(FLAGS_port);                                                                                                                     
server.waitForStop();

ClientBootstrap

Create clients easily

ClientBootstrap is a thin wrapper around AsyncSocket that provides a future interface to the connect callback, and a Pipeline interface to the read callback.

Methods #

group(IOThreadPoolExecutor)

Sets the thread or group of threads where the IO will take place. Callbacks are also made on this thread.

bind(port)

Optionally bind to a specific port

Future<Pipeline*> connect(SocketAddress)

Connect to the selected address. When the future is complete, the initialized pipeline will be returned.

NOTE: future.cancel() can be called to cancel an outstanding connection attempt.

pipelineFactory(PipelineFactory<Pipeline>)

Set the pipeline factory to use after a connection is successful.

Example #

ClientBootstrap<TelnetPipeline> client;
client.group(std::make_shared<folly::wangle::IOThreadPoolExecutor>(1));
client.pipelineFactory(std::make_shared<TelnetPipelineFactory>());
// synchronously wait for the connect to finish
auto pipeline = client.connect(SocketAddress(FLAGS_host,FLAGS_port)).get();

// close the pipeline when finished pipeline->close();

Pipeline

Send your socket data through a series of tubes

A Pipeline is a series of Handlers that intercept inbound or outbound events, giving full control over how events are handled. Handlers can be added dynamically to the pipeline.

When events are called, a Context* object is passed to the Handler - this means state can be stored in the context object, and a single instantiation of any individual Handler can be used for the entire program.

Netty's documentation: ChannelHandler

Usually, the bottom of the Pipeline is a wangle::AsyncSocketHandler to read/write to a socket, but this isn't a requirement.

A pipeline is templated on the input and output types:

EventBase base_;
Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
pipeline.addBack(AsyncSocketHandler(AsyncSocket::newSocket(eventBase)));

The above creates a pipeline and adds a single AsyncSocket handler, that will push read events through the pipeline when the socket gets bytes. Let's try handling some socket events:

class MyHandler : public InboundHandler<folly::IOBufQueue&> {
 public:

void read(Context* ctx, folly::IOBufQueue& q) override { IOBufQueue data;
if (q.chainLength() >= 4) { data.append(q.split(4)); ctx->fireRead(data); } } };

This handler only handles read (inbound) data, so we can inherit from InboundHandler, and ignore the outbound type (so the ordering of inbound/outbound handlers in your pipeline doesn't matter). It checks if there are at least 4 bytes of data available, and if so, passes them on to the next handler. If there aren't yet four bytes of data available, it does nothing, and waits for more data.

We can add this handler to our pipeline like so:

pipeline.addBack(MyHandler());

and remove it just as easily:

pipeline.remove<MyHandler>();

StaticPipeline #

Instantiating all these handlers and pipelines can hit the allocator pretty hard. There are two ways to try to do fewer allocations. StaticPipeline allows *all* the handlers, and the pipeline, to be instantiated all in the same memory block, so we only hit the allocator once.

The other option is to allocate the handlers once at startup, and reuse them in many pipelines. This means all state has to be saved in the HandlerContext object instead of the Handler itself, since each handler can be in multiple pipelines. There is one context per pipeline to get around this limitation.

Built-in handlers

The stuff that comes with the box

Byte to byte handlers #

AsyncSocketHandler #

This is almost always the first handler in the pipeline for clients and servers - it connects an AsyncSocket to the pipeline. Having it as a handler is nice, because mocking it out for tests becomes trivial.

OutputBufferingHandler #

Output is buffered and only sent once per event loop. This logic is exactly what is in ThriftServer, and very similar to what exists in proxygen - it can improve throughput for small writes by up to 300%.

EventBaseHandler #

Putting this right after an AsyncSocketHandler means that writes can happen from any thread, and eventBase->runInEventBaseThread() will automatically be called to put them in the correct thread. It doesn't intrinsically make the pipeline thread-safe though, writes from different threads may be interleaved, other handler stages must be only used from one thread or be thread safe, etc.

In addition, reads are still always called on the eventBase thread.

Codecs #

FixedLengthFrameDecoder #

A decoder that splits received IOBufs by a fixed number of bytes. Used for fixed-length protocols

LengthFieldPrepender #

Prepends a fixed-length field length. Field length is configurable.

LengthFieldBasedFrameDecoder #

The receiving portion of LengthFieldPrepender - decodes based on a fixed frame length, with optional header/tailer data sections.

LineBasedFrameDecoder #

Decodes by line (with optional ending detection types), to be used for text-based protocols

StringCodec #

Converts from IOBufs to std::strings and back for text-based protocols. Must be used after one of the above frame decoders

Services

How to add a new protocol

Finagle's documentation on Services is highly recommended

Services #

A Pipeline was read() and write() methods - it streams bytes in one or both directions. write() returns a future, but the future is set when the bytes are successfully written. Using pipeline there is no easy way to match up requests and responses for RPC.

A Service is an RPC abstraction - Both clients and servers implement the interface. Servers implement it by handling the request. Clients implement it by sending the request to the server to complete.

A Dispatcher is the adapter between the Pipeline and Service that matches up the requests and responses. There are several built in Dispatchers, however if you are doing anything advanced, you may need to write your own.

Because both clients and servers implement the same interface, mocking either clients or servers is trivially easy.

ServiceFilters #

ServiceFilters provide a way to wrap filters around every request and response. Things like logging, timeouts, retrying requests, etc. can be implemented as ServiceFilters.

Existing ServiceFilters include:

  • CloseOnReleaseFilter - rejects requests after connection is closed. Often used in conjunction with
  • ExpiringFilter - idle timeout and max connection time (usually used for clients)
  • TimeoutFilter - request timeout time. Usually used on servers. Clients can use future.within to specify timeouts individually.
  • ExecutorFilter - move requests to a different executor.

ServiceFactories #

For some services, a Factory can help instantiate clients. In Finagle, these are frequently provided for easy use with specific protocols, i.e. http, memcache, etc.

ServiceFactoryFilters #

ServiceFactoryFilters provide filters for getting clients. These include most connection-oriented things, like connection pooling, selection, dispatch, load balancing, etc.

Existing ServiceFactoryFilters: