一个基于Kafka的中间件,旨在简化服务间异步Http调用的复杂度
- Kafka >= 0.9
- Go >= 1.8
- 设置GOPATH为项目根目录
- 进入GOPATH目录,安装glide包管理:mkdir -p bin && curl https://glide.sh/get | sh
- 执行sh build.sh
异步调用
curl localhost:10086/rpc/call -d '{"acl": {"name":"system-1","secret":"i am good"},"url":"http://localhost:10086/rpc/server/mock","data":"hello123123123","partition_key": "暂时用不到"}'
{
"data": "",
"errno": 0,
"msg": "发送成功"
}
统计信息
curl localhost:10086/stats
{
"data": {
"clientStats": {
"test": {
"rpcFail": 0,
"rpcRetries": 0,
"rpcSuccess": 22,
"rpcTotal": 22
}
},
"consumerStats": [
{
"groupId": "G1",
"handleMessage": 11,
"invalidMessage": 0,
"topic": "test"
},
{
"groupId": "G2",
"handleMessage": 11,
"invalidMessage": 0,
"topic": "test"
}
],
"producerStats": {
"test": {
"deliveryFail": 0,
"deliverySuccess": 11
}
},
"serverStats": {
"acceptedCall": 11,
"overloadCall": 0,
"receivedCall": 11
}
},
"errno": 0,
"msg": "success"
}
{
"log.level": 5,
"log.directory": "./logs",
"kafka.bootstrap.servers": "localhost:9092",
"kafka.topics": [
{"name": "test", "partitions": 3}
],
"kafka.producer.channel.size" : 200000,
"kafka.producer.retries": 2,
"kafka.producer.acl": [
{"name": "system-1", "secret": "you are good", "topic": "test"},
{"name": "system-2", "secret": "you are bad", "topic": "test"}
],
"kafka.consumer.list": [
{"topic": "test", "groupId": "G1", "rateLimit": 100, "timeout": 3000, "retries": 2, "concurrency": 5},
{"topic": "test", "groupId": "G2", "rateLimit": 100, "timeout": 3000, "retries": 2, "concurrency": 5}
],
"http.server.port": 10086,
"http.server.read.timeout": 5000,
"http.server.write.timeout": 5000
}
- server模块:接收异步Http调用
- producer模块:将异步调用序列化,投递到kafka
- log模块:线程安全的异步日志
- config模块:基于json的配置
- client模块:异步http客户端,支持超时、重试、并发控制、流速控制
- consumer模块:读取kafka中的消息,发送给下游
- stats模块:基于原子变量的程序统计
- 性能压测和BUG修复
- bigpipe基于rebalanced consumer group工作,可以多进程部署,自动负载均衡
- bigpipe支持优雅退出,不损失任何数据
- bigpipe在正常退出的情况下,保障at least once的投递语义
- bigpipe永远不会阻塞客户端的返回