/arpc

More effective network communication, two-way calling, notify and broadcast supported.

Primary LanguageGoMIT LicenseMIT

ARPC - More Effective Network Communication

Slack

Mentioned in Awesome Go MIT licensed Build Status Go Report Card Coverage Statusd

Contents

Features

  • Two-Way Calling
  • Two-Way Notify
  • Sync and Async Calling
  • Sync and Async Response
  • Batch Write | Writev | net.Buffers
  • Broadcast
  • Middleware
  • Pub/Sub
  • Opentracing
Pattern Interactive Directions Description
call two-way:
c -> s
s -> c
request and response
notify two-way:
c -> s
s -> c
request without response

Performance

Here are some thirdparty benchmark including arpc, although these repos have provide the performance report, but I suggest you run the code yourself and get the real result, other than just believe other people's doc:

Header Layout

  • LittleEndian
bodyLen reserved cmd flag methodLen sequence method body
4 bytes 1 byte 1 byte 1 bytes 1 bytes 8 bytes methodLen bytes bodyLen-methodLen bytes

Installation

  1. Get and install arpc
$ go get -u github.com/lesismal/arpc
  1. Import in your code:
import "github.com/lesismal/arpc"

Quick Start

package main

import "github.com/lesismal/arpc"

func main() {
	server := arpc.NewServer()

	// register router
	server.Handler.Handle("/echo", func(ctx *arpc.Context) {
		str := ""
		if err := ctx.Bind(&str); err == nil {
			ctx.Write(str)
		}
	})

	server.Run("localhost:8888")
}
package main

import (
	"log"
	"net"
	"time"

	"github.com/lesismal/arpc"
)

func main() {
	client, err := arpc.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", "localhost:8888", time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	defer client.Stop()

	req := "hello"
	rsp := ""
	err = client.Call("/echo", &req, &rsp, time.Second*5)
	if err != nil {
		log.Fatalf("Call failed: %v", err)
	} else {
		log.Printf("Call Response: \"%v\"", rsp)
	}
}

API Examples

Register Routers

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

// message would be default handled one by one  in the same conn reader goroutine
handler.Handle("/route", func(ctx *arpc.Context) { ... })
handler.Handle("/route2", func(ctx *arpc.Context) { ... })

// this make message handled by a new goroutine
async := true
handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)

Router Middleware

See router middleware, it's easy to implement middlewares yourself

import "github.com/lesismal/arpc/extension/middleware/router"

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

handler.Use(router.Recover())
handler.Use(router.Logger())
handler.Use(func(ctx *arpc.Context) { ... })
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
handler.Use(func(ctx *arpc.Context) { ... })

Coder Middleware

  • Coder Middleware is used for converting a message data to your designed format, e.g encrypt/decrypt and compress/uncompress
import "github.com/lesismal/arpc/extension/middleware/coder/gzip"

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

handler.UseCoder(gzip.New())
handler.Handle("/echo", func(ctx *arpc.Context) { ... })

Client Call, CallAsync, Notify

  1. Call (Block, with timeout/context)
request := &Echo{...}
response := &Echo{}
timeout := time.Second*5
err := client.Call("/call/echo", request, response, timeout)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
  1. CallAsync (Nonblock, with callback and timeout/context)
request := &Echo{...}

timeout := time.Second*5
err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) {
	response := &Echo{}
	ctx.Bind(response)
	...	
}, timeout)
  1. Notify (same as CallAsync with timeout/context, without callback)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)

Server Call, CallAsync, Notify

  1. Get client and keep it in your application
var client *arpc.Client
server.Handler.Handle("/route", func(ctx *arpc.Context) {
	client = ctx.Client
	// release client
	client.OnDisconnected(func(c *arpc.Client){
		client = nil
	})
})

go func() {
	for {
		time.Sleep(time.Second)
		if client != nil {
			client.Call(...)
			client.CallAsync(...)
			client.Notify(...)
		}
	}
}()
  1. Then Call/CallAsync/Notify

Broadcast - Notify

var mux = sync.RWMutex{}
var clientMap = make(map[*arpc.Client]struct{})

func broadcast() {
	var svr *arpc.Server = ... 
	msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i))
	mux.RLock()
	for client := range clientMap {
		client.PushMsg(msg, arpc.TimeZero)
	}
	mux.RUnlock()
}

Async Response

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

asyncResponse := true // default is true, or set false
handler.Handle("/echo", func(ctx *arpc.Context) {
	req := ...
	err := ctx.Bind(req)
	if err == nil {
		ctx.Write(data)
	}
}, asyncResponse)

Handle New Connection

// package
arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleConnected(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleConnected(func(c *arpc.Client) {
	...
})

Handle Disconnected

// package
arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleDisconnected(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleDisconnected(func(c *arpc.Client) {
	...
})

Handle Client's send queue overstock

// package
arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleOverstock(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleOverstock(func(c *arpc.Client) {
	...
})

Custom Net Protocol

// server
var ln net.Listener = ...
svr := arpc.NewServer()
svr.Serve(ln)

// client
dialer := func() (net.Conn, error) { 
	return ... 
}
client, err := arpc.NewClient(dialer)

Custom Codec

import "github.com/lesismal/arpc/codec"

var codec arpc.Codec = ...

// package
codec.Defaultcodec = codec

// server
svr := arpc.NewServer()
svr.Codec = codec

// client
client, err := arpc.NewClient(...)
client.Codec = codec

Custom Logger

import "github.com/lesismal/arpc/log"

var logger arpc.Logger = ...
log.SetLogger(logger) // log.DefaultLogger = logger

Custom operations before conn's recv and send

arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error) {
	// ...
})

arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error) {
	// ...
})

Custom arpc.Client's Reader by wrapping net.Conn

arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader) {
	// ...
})

Custom arpc.Client's send queue capacity

arpc.DefaultHandler.SetSendQueueSize(4096)

JS Client

Web Chat Examples

Pub/Sub Examples

  • start a server
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func main() {
	s := pubsub.NewServer()
	s.Password = password

	// server publish to all clients
	go func() {
		for i := 0; true; i++ {
			time.Sleep(time.Second)
			s.Publish(topicName, fmt.Sprintf("message from server %v", i))
		}
	}()

	s.Run(address)
}
  • start a subscribe client
import "github.com/lesismal/arpc/log"
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func onTopic(topic *pubsub.Topic) {
	log.Info("[OnTopic] [%v] \"%v\", [%v]",
		topic.Name,
		string(topic.Data),
		time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000"))
}

func main() {
	client, err := pubsub.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", address, time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	client.Password = password

	// authentication
	err = client.Authenticate()
	if err != nil {
		panic(err)
	}

	// subscribe topic
	if err := client.Subscribe(topicName, onTopic, time.Second); err != nil {
		panic(err)
	}

	<-make(chan int)
}
  • start a publish client
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func main() {
	client, err := pubsub.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", address, time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	client.Password = password

	// authentication
	err = client.Authenticate()
	if err != nil {
		panic(err)
	}

	for i := 0; true; i++ {
		if i%5 == 0 {
			// publish msg to all clients
			client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second)
		} else {
			// publish msg to only one client
			client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second)
		}
		time.Sleep(time.Second)
	}
}

More Examples