Teleport
Teleport is a versatile, high-performance and flexible socket framework.
It can be used for peer-peer, rpc, gateway, micro services, push services, game services and so on.
Benchmark
Self Test
-
A server and a client process, running on the same machine
-
CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
-
Memory: 16G
-
OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
-
Go: 1.9.2
-
Message size: 581 bytes
-
Message codec: protobuf
-
Sent total 1000000 messages
-
teleport
client concurrency | mean(ms) | median(ms) | max(ms) | min(ms) | throughput(TPS) |
---|---|---|---|---|---|
100 | 1 | 0 | 16 | 0 | 75505 |
500 | 9 | 11 | 97 | 0 | 52192 |
1000 | 19 | 24 | 187 | 0 | 50040 |
2000 | 39 | 54 | 409 | 0 | 42551 |
5000 | 96 | 128 | 1148 | 0 | 46367 |
- teleport/socket
client concurrency | mean(ms) | median(ms) | max(ms) | min(ms) | throughput(TPS) |
---|---|---|---|---|---|
100 | 0 | 0 | 14 | 0 | 225682 |
500 | 2 | 1 | 24 | 0 | 212630 |
1000 | 4 | 3 | 51 | 0 | 180733 |
2000 | 8 | 6 | 64 | 0 | 183351 |
5000 | 21 | 18 | 651 | 0 | 133886 |
Comparison Test
Environment | Throughputs | Mean Latency | P99 Latency |
---|---|---|---|
- Profile torch of teleport/socket
- Heap torch of teleport/socket
Version
version | status | branch |
---|---|---|
v5 | release | v5 |
v4 | release | v4 |
v3 | release | v3 |
v2 | release | v2 |
v1 | release | v1 |
Install
go get -u -f github.com/henrylee2cn/teleport
Feature
- Server and client are peer-to-peer, have the same API method
- Support custom communication protocol
- Support set the size of socket I/O buffer
- Message contains both
Header
andBody
two parts - Message
Header
contains metadata in the same format as HTTP header - Support for customizing
Body
coding types separately, e.gJSON
Protobuf
string
- Support push, call, reply and other means of communication
- Support plug-in mechanism, can customize authentication, heartbeat, micro service registration center, statistics, etc.
- Whether server or client, the peer support reboot and shutdown gracefully
- Support reverse proxy
- Detailed log information, support print input and output details
- Supports setting slow operation alarm threshold
- Use I/O multiplexing technology
- Support setting the size of the reading message (if exceed disconnect it)
- Provide the context of the handler
- Client session support automatically redials after disconnection
- Support network list:
tcp
,tcp4
,tcp6
,unix
,unixpacket
and so on - Provide an operating interface to control the connection file descriptor
Example
server.go
package main
import (
"fmt"
"time"
tp "github.com/henrylee2cn/teleport"
)
func main() {
// graceful
go tp.GraceSignal()
// server peer
srv := tp.NewPeer(tp.PeerConfig{
CountTime: true,
ListenPort: 9090,
PrintDetail: true,
})
// router
srv.RouteCall(new(Math))
// broadcast per 5s
go func() {
for {
time.Sleep(time.Second * 5)
srv.RangeSession(func(sess tp.Session) bool {
sess.Push(
"/push/status",
fmt.Sprintf("this is a broadcast, server time: %v", time.Now()),
)
return true
})
}
}()
// listen and serve
srv.ListenAndServe()
}
// Math handler
type Math struct {
tp.CallCtx
}
// Add handles addition request
func (m *Math) Add(arg *[]int) (int, *tp.Rerror) {
// test query parameter
tp.Infof("author: %s", m.Query().Get("author"))
// add
var r int
for _, a := range *arg {
r += a
}
// response
return r, nil
}
client.go
package main
import (
"time"
tp "github.com/henrylee2cn/teleport"
)
func main() {
// log level
tp.SetLoggerLevel("ERROR")
cli := tp.NewPeer(tp.PeerConfig{})
defer cli.Close()
cli.RoutePush(new(Push))
sess, err := cli.Dial(":9090")
if err != nil {
tp.Fatalf("%v", err)
}
var result int
rerr := sess.Call("/math/add?author=henrylee2cn",
[]int{1, 2, 3, 4, 5},
&result,
).Rerror()
if rerr != nil {
tp.Fatalf("%v", rerr)
}
tp.Printf("result: %d", result)
tp.Printf("wait for 10s...")
time.Sleep(time.Second * 10)
}
// Push push handler
type Push struct {
tp.PushCtx
}
// Push handles '/push/status' message
func (p *Push) Status(arg *string) *tp.Rerror {
tp.Printf("%s", *arg)
return nil
}
Design
Keywords
- Peer: A communication instance may be a server or a client
- Socket: Base on the net.Conn package, add custom package protocol, transfer pipelines and other functions
- Message:* The corresponding structure of the data package content element
- Proto: The protocol interface of message pack/unpack
- Codec: Serialization interface for
Body
- XferPipe: Message bytes encoding pipeline, such as compression, encryption, calibration and so on
- XferFilter: A interface to handle message data before transfer
- Plugin: Plugins that cover all aspects of communication
- Session: A connection session, with push, call, reply, close and other methods of operation
- Context: Handle the received or send messages
- Call-Launch: Call data from the peer
- Call-Handle: Handle and reply to the calling of peer
- Push-Launch: Push data to the peer
- Push-Handle: Handle the pushing of peer
- Router: Router that route the response handler by request information(such as a URI)
Data Message
Abstracts the data message(Message Object) of the application layer and is compatible with HTTP message:
Protocol
You can customize your own communication protocol by implementing the interface:
type (
// Proto pack/unpack protocol scheme of socket message.
Proto interface {
// Version returns the protocol's id and name.
Version() (byte, string)
// Pack writes the Message into the connection.
// NOTE: Make sure to write only once or there will be package contamination!
Pack(Message) error
// Unpack reads bytes from the connection to the Message.
// NOTE: Concurrent unsafe!
Unpack(Message) error
}
ProtoFunc func(io.ReadWriter) Proto
)
Next, you can specify the communication protocol in the following ways:
func SetDefaultProtoFunc(ProtoFunc)
type Peer interface {
...
ServeConn(conn net.Conn, protoFunc ...ProtoFunc) Session
DialContext(ctx context.Context, addr string, protoFunc ...ProtoFunc) (Session, *Rerror)
Dial(addr string, protoFunc ...ProtoFunc) (Session, *Rerror)
Listen(protoFunc ...ProtoFunc) error
...
}
Default protocol RawProto
(Big Endian):
{4 bytes message length}
{1 byte protocol version}
{1 byte transfer pipe length}
{transfer pipe IDs}
# The following is handled data by transfer pipe
{2 bytes sequence length}
{sequence}
{1 byte message type} # e.g. CALL:1; REPLY:2; PUSH:3
{2 bytes URI length}
{URI}
{2 bytes metadata length}
{metadata(urlencoded)}
{1 byte body codec id}
{body}
XferPipe
Transfer filter pipe, handles byte stream of message when transfer.
// XferFilter handles byte stream of message when transfer.
type XferFilter interface {
// ID returns transfer filter id.
ID() byte
// Name returns transfer filter name.
Name() string
// OnPack performs filtering on packing.
OnPack([]byte) ([]byte, error)
// OnUnpack performs filtering on unpacking.
OnUnpack([]byte) ([]byte, error)
}
// Get returns transfer filter by id.
func Get(id byte) (XferFilter, error)
// GetByName returns transfer filter by name.
func GetByName(name string) (XferFilter, error)
// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
// NOTE: the length can not be bigger than 255!
type XferPipe struct {
// Has unexported fields.
}
func NewXferPipe() *XferPipe
func (x *XferPipe) Append(filterID ...byte) error
func (x *XferPipe) AppendFrom(src *XferPipe)
func (x *XferPipe) IDs() []byte
func (x *XferPipe) Len() int
func (x *XferPipe) Names() []string
func (x *XferPipe) OnPack(data []byte) ([]byte, error)
func (x *XferPipe) OnUnpack(data []byte) ([]byte, error)
func (x *XferPipe) Range(callback func(idx int, filter XferFilter) bool)
func (x *XferPipe) Reset()
Codec
The body's codec set.
type Codec interface {
// ID returns codec id.
ID() byte
// Name returns codec name.
Name() string
// Marshal returns the encoding of v.
Marshal(v interface{}) ([]byte, error)
// Unmarshal parses the encoded data and stores the result
// in the value pointed to by v.
Unmarshal(data []byte, v interface{}) error
}
Plugin
Plug-ins during runtime.
type (
// Plugin plugin background
Plugin interface {
Name() string
}
// PreNewPeerPlugin is executed before creating peer.
PreNewPeerPlugin interface {
Plugin
PreNewPeer(*PeerConfig, *PluginContainer) error
}
...
)
Usage
Peer(server or client) Demo
// Start a server
var peer1 = tp.NewPeer(tp.PeerConfig{
ListenPort: 9090, // for server role
})
peer1.Listen()
...
// Start a client
var peer2 = tp.NewPeer(tp.PeerConfig{})
var sess, err = peer2.Dial("127.0.0.1:8080")
Call-Controller-Struct API template
type Aaa struct {
tp.CallCtx
}
func (x *Aaa) XxZz(arg *<T>) (<T>, *tp.Rerror) {
...
return r, nil
}
- register it to root router:
// register the call route
// HTTP mapping: /aaa/xx_zz
// RPC mapping: Aaa.XxZz
peer.RouteCall(new(Aaa))
// or register the call route
// HTTP mapping: /xx_zz
// RPC mapping: XxZz
peer.RouteCallFunc((*Aaa).XxZz)
Call-Handler-Function API template
func XxZz(ctx tp.CallCtx, arg *<T>) (<T>, *tp.Rerror) {
...
return r, nil
}
- register it to root router:
// register the call route
// HTTP mapping: /xx_zz
// RPC mapping: XxZz
peer.RouteCallFunc(XxZz)
Push-Controller-Struct API template
type Bbb struct {
tp.PushCtx
}
func (b *Bbb) YyZz(arg *<T>) *tp.Rerror {
...
return nil
}
- register it to root router:
// register the push handler
// HTTP mapping: /bbb/yy_zz
// RPC mapping: Bbb.YyZz
peer.RoutePush(new(Bbb))
// or register the push handler
// HTTP mapping: /yy_zz
// RPC mapping: YyZz
peer.RoutePushFunc((*Bbb).YyZz)
Push-Handler-Function API template
// YyZz register the handler
func YyZz(ctx tp.PushCtx, arg *<T>) *tp.Rerror {
...
return nil
}
- register it to root router:
// register the push handler
// HTTP mapping: /yy_zz
// RPC mapping: YyZz
peer.RoutePushFunc(YyZz)
Unknown-Call-Handler-Function API template
func XxxUnknownCall (ctx tp.UnknownCallCtx) (interface{}, *tp.Rerror) {
...
return r, nil
}
- register it to root router:
// register the unknown call route: /*
peer.SetUnknownCall(XxxUnknownCall)
Unknown-Push-Handler-Function API template
func XxxUnknownPush(ctx tp.UnknownPushCtx) *tp.Rerror {
...
return nil
}
- register it to root router:
// register the unknown push route: /*
peer.SetUnknownPush(XxxUnknownPush)
Plugin Demo
// NewIgnoreCase Returns a ignoreCase plugin.
func NewIgnoreCase() *ignoreCase {
return &ignoreCase{}
}
type ignoreCase struct{}
var (
_ tp.PostReadCallHeaderPlugin = new(ignoreCase)
_ tp.PostReadPushHeaderPlugin = new(ignoreCase)
)
func (i *ignoreCase) Name() string {
return "ignoreCase"
}
func (i *ignoreCase) PostReadCallHeader(ctx tp.ReadCtx) *tp.Rerror {
// Dynamic transformation path is lowercase
ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
return nil
}
func (i *ignoreCase) PostReadPushHeader(ctx tp.ReadCtx) *tp.Rerror {
// Dynamic transformation path is lowercase
ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
return nil
}
Register above handler and plugin
// add router group
group := peer.SubRoute("test")
// register to test group
group.RouteCall(new(Aaa), NewIgnoreCase())
peer.RouteCallFunc(XxZz, NewIgnoreCase())
group.RoutePush(new(Bbb))
peer.RoutePushFunc(YyZz)
peer.SetUnknownCall(XxxUnknownCall)
peer.SetUnknownPush(XxxUnknownPush)
Config
type PeerConfig struct {
Network string `yaml:"network" ini:"network" comment:"Network; tcp, tcp4, tcp6, unix or unixpacket"`
LocalIP string `yaml:"local_ip" ini:"local_ip" comment:"Local IP"`
ListenPort uint16 `yaml:"listen_port" ini:"listen_port" comment:"Listen port; for server role"`
DefaultDialTimeout time.Duration `yaml:"default_dial_timeout" ini:"default_dial_timeout" comment:"Default maximum duration for dialing; for client role; ns,µs,ms,s,m,h"`
RedialTimes int32 `yaml:"redial_times" ini:"redial_times" comment:"The maximum times of attempts to redial, after the connection has been unexpectedly broken; for client role"`
RedialInterval time.Duration `yaml:"redial_interval" ini:"redial_interval" comment:"Interval of redialing each time, default 100ms; for client role; ns,µs,ms,s,m,h"`
DefaultBodyCodec string `yaml:"default_body_codec" ini:"default_body_codec" comment:"Default body codec type id"`
DefaultSessionAge time.Duration `yaml:"default_session_age" ini:"default_session_age" comment:"Default session max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
DefaultContextAge time.Duration `yaml:"default_context_age" ini:"default_context_age" comment:"Default CALL or PUSH context max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
SlowCometDuration time.Duration `yaml:"slow_comet_duration" ini:"slow_comet_duration" comment:"Slow operation alarm threshold; ns,µs,ms,s ..."`
PrintDetail bool `yaml:"print_detail" ini:"print_detail" comment:"Is print body and metadata or not"`
CountTime bool `yaml:"count_time" ini:"count_time" comment:"Is count cost time or not"`
}
Optimize
-
SetMessageSizeLimit sets max message size. If maxSize<=0, set it to max uint32.
func SetMessageSizeLimit(maxMessageSize uint32)
-
SetSocketKeepAlive sets whether the operating system should send keepalive messages on the connection.
func SetSocketKeepAlive(keepalive bool)
-
SetSocketKeepAlivePeriod sets period between keep alives.
func SetSocketKeepAlivePeriod(d time.Duration)
-
SetSocketNoDelay controls whether the operating system should delay message transmission in hopes of sending fewer messages (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.
func SetSocketNoDelay(_noDelay bool)
-
SetSocketReadBuffer sets the size of the operating system's receive buffer associated with the connection.
func SetSocketReadBuffer(bytes int)
-
SetSocketWriteBuffer sets the size of the operating system's transmit buffer associated with the connection.
func SetSocketWriteBuffer(bytes int)
Extensions
Codec
package | import | description |
---|---|---|
json | import "github.com/henrylee2cn/teleport/codec" |
JSON codec(teleport own) |
protobuf | import "github.com/henrylee2cn/teleport/codec" |
Protobuf codec(teleport own) |
plain | import "github.com/henrylee2cn/teleport/codec" |
Plain text codec(teleport own) |
form | import "github.com/henrylee2cn/teleport/codec" |
Form(url encode) codec(teleport own) |
Plugin
package | import | description |
---|---|---|
auth | import "github.com/henrylee2cn/teleport/plugin/auth" |
A auth plugin for verifying peer at the first time |
binder | import binder "github.com/henrylee2cn/teleport/plugin/binder" |
Parameter Binding Verification for Struct Handler |
heartbeat | import heartbeat "github.com/henrylee2cn/teleport/plugin/heartbeat" |
A generic timing heartbeat plugin |
proxy | import "github.com/henrylee2cn/teleport/plugin/proxy" |
A proxy plugin for handling unknown calling or pushing |
secure | import secure "github.com/henrylee2cn/teleport/plugin/secure" |
Encrypting/decrypting the message body |
Protocol
package | import | description |
---|---|---|
rawproto | import "github.com/henrylee2cn/teleport/proto/rawproto |
A fast socket communication protocol(teleport default protocol) |
jsonproto | import "github.com/henrylee2cn/teleport/proto/jsonproto" |
A JSON socket communication protocol |
pbproto | import "github.com/henrylee2cn/teleport/proto/pbproto" |
A Protobuf socket communication protocol |
thriftproto | import "github.com/henrylee2cn/teleport/proto/thriftproto" |
A Thrift communication protocol |
Transfer-Filter
package | import | description |
---|---|---|
gzip | import "github.com/henrylee2cn/teleport/xfer/gzip" |
Gzip(teleport own) |
md5 | import "github.com/henrylee2cn/teleport/xfer/md5" |
Provides a integrity check transfer filter |
Mixer
package | import | description |
---|---|---|
multiclient | import "github.com/henrylee2cn/teleport/mixer/multiclient" |
Higher throughput client connection pool when transferring large messages (such as downloading files) |
websocket | import "github.com/henrylee2cn/teleport/mixer/websocket" |
Makes the Teleport framework compatible with websocket protocol as specified in RFC 6455 |
html | html "github.com/xiaoenai/tp-micro/helper/mod-html" |
HTML render for http client |
Projects based on Teleport
project | description |
---|---|
TP-Micro | TP-Micro is a simple, powerful micro service framework based on Teleport |
Pholcus | Pholcus is a distributed, high concurrency and powerful web crawler software |
Business Users
License
Teleport is under Apache v2 License. See the LICENSE file for the full license text