Teleport is a versatile, high-performance and flexible socket framework.
It can be used for peer-peer, rpc, gateway, micro services, push services, game services and so on.
Test Case
- A server and a client process, running on the same machine
- CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
- Memory: 16G
- OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
- Go: 1.9.2
- Message size: 581 bytes
- Message codec: protobuf
- Sent total 1000000 messages
Test Results
- teleport
client concurrency | mean(ms) | median(ms) | max(ms) | min(ms) | throughput(TPS) |
---|---|---|---|---|---|
100 | 1 | 0 | 16 | 0 | 75505 |
500 | 9 | 11 | 97 | 0 | 52192 |
1000 | 19 | 24 | 187 | 0 | 50040 |
2000 | 39 | 54 | 409 | 0 | 42551 |
5000 | 96 | 128 | 1148 | 0 | 46367 |
- teleport/socket
client concurrency | mean(ms) | median(ms) | max(ms) | min(ms) | throughput(TPS) |
---|---|---|---|---|---|
100 | 0 | 0 | 14 | 0 | 225682 |
500 | 2 | 1 | 24 | 0 | 212630 |
1000 | 4 | 3 | 51 | 0 | 180733 |
2000 | 8 | 6 | 64 | 0 | 183351 |
5000 | 21 | 18 | 651 | 0 | 133886 |
- CPU torch of teleport/socket
version | status | branch |
---|---|---|
v3 | release | v3 |
v2 | release | v2 |
v1 | release | v1 |
go get -u github.com/henrylee2cn/teleport
- Server and client are peer-to-peer, have the same API method
- Support custom communication protocol
- Support set the size of socket I/O buffer
- Packet contains both Header and Body two parts
- Support for customizing head and body coding types separately, e.g
JSON
Protobuf
string
- Packet Header contains metadata in the same format as http header
- Support push, pull, reply and other means of communication
- Support plug-in mechanism, can customize authentication, heartbeat, micro service registration center, statistics, etc.
- Whether server or client, the peer support reboot and shutdown gracefully
- Support reverse proxy
- Detailed log information, support print input and output details
- Supports setting slow operation alarm threshold
- Use I/O multiplexing technology
- Support setting the size of the reading packet (if exceed disconnect it)
- Provide the context of the handler
- Client session support automatically redials after disconnection
- Support network list:
tcp
,tcp4
,tcp6
,unix
,unixpacket
and so on
- Peer: A communication instance may be a server or a client
- Socket: Base on the net.Conn package, add custom package protocol, transfer pipelines and other functions
- Packet: The corresponding structure of the data package content element
- Proto: The protocol interface of packet pack/unpack
- Codec: Serialization interface for
Packet.Body
- XferPipe: Packet bytes encoding pipeline, such as compression, encryption, calibration and so on
- XferFilter: A interface to handle packet data before transfer
- Plugin: Plugins that cover all aspects of communication
- Session: A connection session, with push, pull, reply, close and other methods of operation
- Context: Handle the received or send packets
- Pull-Launch: Pull data from the peer
- Pull-Handle: Handle and reply to the pull of peer
- Push-Launch: Push data to the peer
- Push-Handle: Handle the push of peer
- Router: Router that route the response handler by request information(such as a URI)
The contents of every one packet:
// in socket package
type (
// Packet a socket data packet.
Packet struct {
// packet sequence
seq uint64
// packet type, such as PULL, PUSH, REPLY
ptype byte
// URL string
uri string
// metadata
meta *utils.Args
// body codec type
bodyCodec byte
// body object
body interface{}
// newBodyFunc creates a new body by packet type and URI.
// Note:
// only for writing packet;
// should be nil when reading packet.
newBodyFunc NewBodyFunc
// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
// Note: the length can not be bigger than 255!
xferPipe *xfer.XferPipe
// packet size
size uint32
next *Packet
}
// NewBodyFunc creates a new body by header info.
NewBodyFunc func(seq uint64, ptype byte, uri string) interface{}
)
// in xfer package
type (
// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
// Note: the length can not be bigger than 255!
XferPipe struct {
filters []XferFilter
}
// XferFilter handles byte stream of packet when transfer.
XferFilter interface {
Id() byte
OnPack([]byte) ([]byte, error)
OnUnpack([]byte) ([]byte, error)
}
)
You can customize your own communication protocol by implementing the interface:
type (
// Proto pack/unpack protocol scheme of socket packet.
Proto interface {
// Version returns the protocol's id and name.
Version() (byte, string)
// Pack pack socket data packet.
// Note: Make sure to write only once or there will be package contamination!
Pack(*Packet) error
// Unpack unpack socket data packet.
// Note: Concurrent unsafe!
Unpack(*Packet) error
}
ProtoFunc func(io.ReadWriter) Proto
)
Next, you can specify the communication protocol in the following ways:
func SetDefaultProtoFunc(socket.ProtoFunc)
func (*Peer) ServeConn(conn net.Conn, protoFunc ...socket.ProtoFunc) Session
func (*Peer) DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
func (*Peer) Dial(addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
func (*Peer) Listen(protoFunc ...socket.ProtoFunc) error
- Create a server or client peer
// Start a server
var peer1 = tp.NewPeer(tp.PeerConfig{
ListenAddress: "0.0.0.0:9090", // for server role
})
peer1.Listen()
...
// Start a client
var peer2 = tp.NewPeer(tp.PeerConfig{})
var sess, err = peer2.Dial("127.0.0.1:8080")
- Define a controller and handler for pull request
// Home controller
type Home struct {
tp.PullCtx
}
// Test handler
func (h *Home) Test(args *[2]int) (int, *tp.Rerror) {
a := (*args)[0]
b := (*args)[1]
return a + b, nil
}
- Define controller and handler for push request
// Msg controller
type Msg struct {
tp.PushCtx
}
// Test handler
func (m *Msg) Test(args *map[string]interface{}) {
tp.Infof("receive push(%s):\nargs: %#v\nquery: %#v\n", m.Ip(), args, m.Query())
}
- Define a handler for unknown pull request
func UnknownPullHandle(ctx tp.UnknownPullCtx, body *[]byte) (interface{}, *tp.Rerror) {
var v interface{}
codecId, err := ctx.Unmarshal(*body, &v, true)
if err != nil {
return nil, tp.New*Rerror(0, err.Error())
}
tp.Infof("receive unknown pull:\n codec: %s\n content: %#v", codecId, v)
return "this is reply string for unknown pull", nil
}
- Define a handler for unknown push request
func UnknownPushHandle(ctx tp.UnknownPushCtx, body *[]byte) {
var v interface{}
codecId, err := ctx.Unmarshal(*body, &v, true)
if err != nil {
tp.Errorf("%v", err)
} else {
tp.Infof("receive unknown push:\n codec: %s\n content: %#v", codecId, v)
}
}
- Define a plugin
// AliasPlugin can be used to set aliases for pull or push services
type AliasPlugin struct {
Aliases map[string]string
}
// NewAliasPlugin creates a new NewAliasPlugin
func NewAliasPlugin() *AliasPlugin {
return &AliasPlugin{Aliases: make(map[string]string)}
}
// Alias sets a alias for the uri.
// For example Alias("/arith/mul", "/mul")
func (p *AliasPlugin) Alias(alias string, uri string) {
p.Aliases[alias] = uri
}
// Name return name of this plugin.
func (p *AliasPlugin) Name() string {
return "AliasPlugin"
}
// PostReadPullHeader converts the alias of this service.
func (p *AliasPlugin) PostReadPullHeader(ctx tp.ReadCtx) *tp.Rerror {
var u = ctx.Input().Header.Uri
if p.Aliases != nil {
if a = p.Aliases[u]; a != "" {
ctx.Input().Header.Uri = a
}
}
return nil
}
- Register above handler and plugin
aliasesPlugin := NewAliasPlugin()
aliasesPlugin.Alias("/alias", "/origin")
{
pullGroup := peer.PullRouter.Group("pull", aliasesPlugin)
pullGroup.Reg(new(Home))
peer.PullRouter.SetUnknown(UnknownPullHandle)
}
{
pushGroup := peer.PushRouter.Group("push")
pushGroup.Reg(new(Msg), aliasesPlugin)
peer.PushRouter.SetUnknown(UnknownPushHandle)
}
package main
import (
"encoding/json"
"time"
tp "github.com/henrylee2cn/teleport"
)
func main() {
go tp.GraceSignal()
// tp.SetReadLimit(10)
tp.SetShutdown(time.Second*20, nil, nil)
var peer = tp.NewPeer(tp.PeerConfig{
SlowCometDuration: time.Millisecond * 500,
PrintBody: true,
CountTime: true,
ListenAddress: "0.0.0.0:9090",
})
group := peer.PullRouter.Group("group")
group.Reg(new(Home))
peer.PullRouter.SetUnknown(UnknownPullHandle)
peer.Listen()
}
// Home controller
type Home struct {
tp.PullCtx
}
// Test handler
func (h *Home) Test(args *map[string]interface{}) (map[string]interface{}, *tp.Rerror) {
h.Session().Push("/push/test?tag=from home-test", map[string]interface{}{
"your_id": h.Query().Get("peer_id"),
})
meta := h.CopyMeta()
meta.VisitAll(func(k, v []byte) {
tp.Infof("meta: key: %s, value: %s", k, v)
})
time.Sleep(5e9)
return map[string]interface{}{
"your_args": *args,
"server_time": time.Now(),
"meta": meta.String(),
}, nil
}
func UnknownPullHandle(ctx tp.UnknownPullCtx) (interface{}, *tp.Rerror) {
time.Sleep(1)
var v = struct {
RawMessage json.RawMessage
Bytes []byte
}{}
codecId, err := ctx.Bind(&v)
if err != nil {
return nil, tp.NewRerror(1001, "bind error", err.Error())
}
tp.Debugf("UnknownPullHandle: codec: %d, RawMessage: %s, bytes: %s",
codecId, v.RawMessage, v.Bytes,
)
ctx.Session().Push("/push/test?tag=from home-test", map[string]interface{}{
"your_id": ctx.Query().Get("peer_id"),
})
return map[string]interface{}{
"your_args": v,
"server_time": time.Now(),
"meta": ctx.CopyMeta().String(),
}, nil
}
package main
import (
"encoding/json"
"time"
tp "github.com/henrylee2cn/teleport"
"github.com/henrylee2cn/teleport/socket"
)
func main() {
go tp.GraceSignal()
tp.SetShutdown(time.Second*20, nil, nil)
var peer = tp.NewPeer(tp.PeerConfig{
SlowCometDuration: time.Millisecond * 500,
// DefaultBodyCodec: "json",
PrintBody: true,
CountTime: true,
RedialTimes: 3,
})
defer peer.Close()
peer.PushRouter.Reg(new(Push))
var sess, err = peer.Dial("127.0.0.1:9090")
if err != nil {
tp.Fatalf("%v", err)
}
sess.SetId("testId")
var reply interface{}
for {
var pullcmd = sess.Pull(
"/group/home/test?peer_id=call-1",
map[string]interface{}{
"bytes": []byte("test bytes"),
},
&reply,
socket.WithXferPipe('g'),
socket.WithSetMeta("set", "0"),
socket.WithAddMeta("add", "1"),
socket.WithAddMeta("add", "2"),
)
if pullcmd.Rerror() != nil {
tp.Errorf("pull error: %v", pullcmd.Rerror())
time.Sleep(time.Second * 2)
} else {
break
}
}
tp.Infof("test: %#v", reply)
var pullcmd = sess.Pull(
"/group/home/test_unknown?peer_id=call-2",
struct {
RawMessage json.RawMessage
Bytes []byte
}{
json.RawMessage(`{"RawMessage":"test_unknown"}`),
[]byte("test bytes"),
},
&reply,
socket.WithXferPipe('g'),
)
if pullcmd.Rerror() != nil {
tp.Fatalf("pull error: %v", pullcmd.Rerror())
}
tp.Infof("test_unknown: %#v", reply)
}
// Push controller
type Push struct {
tp.PushCtx
}
// Test handler
func (p *Push) Test(args *map[string]interface{}) {
tp.Infof("receive push(%s):\nargs: %#v\nquery: %#v\n", p.Ip(), args, p.Query())
}
Teleport is under Apache v2 License. See the LICENSE file for the full license text