/go-jt808

jt808服务端 单机[2核4G]并发10w+ 100%测试覆盖率 有多种例子支持二次开发(每日保存亿+经纬度[2核4G] 平台下发数据获取回复 报文详细解析 JT1078流媒体播放等)

Primary LanguageGoMIT LicenseMIT

go-jt808

  • 本项目已更好支持二次开发为目标 可通过各种自定义事件去完成相应功能 常见案例如下
  1. 存储经纬度 代码参考
jt808服务端 模拟器 消息队列 数据库都运行在2核4G腾讯云服务器
测试每秒保存5000条的情况 约5.5小时保存了近1亿的经纬度
  1. 平台下发指令给终端 代码参考
主动下发给设备指令 获取应答的情况
  1. jt1078视频 代码参考
开启jt808服务 开启jt1078服务 开启客户端接收到0x9101指令开始发送流
在线流地址 http://49.234.235.7:8080/live/295696659617_1.flv
  1. 协议交互详情 代码参考
使用自定义模拟器 可以轻松生成测试用的报文 有详情描述
  1. 自定义协议扩展 代码参考
自定义附加信息处理 获取想要的扩展内容

  • 看飞哥的单机TCP百万并发 好奇有数据情况的表现 因此国庆准备试一试有数据的情况
  • 性能测试 单机[2核4G机器]并发10w+ 每日保存4亿+经纬度 详情
  • 安全可靠 核心协议部分测试覆盖率100% 纯原生go实现(不依赖任何库)
  • 简洁优雅 核心代码不到1000行 信令交互不使用任何锁 仅使用channel完成
  • 支持JT808(2011/2013/209) JT1078(需要其他流媒体服务) 支持分包和自动补传

快速开始

package main

import (
	"github.com/cuteLittleDevil/go-jt808/service"
	"log/slog"
	"os"
)

func init() {
	logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
		AddSource:   true,
		Level:       slog.LevelDebug,
		ReplaceAttr: nil,
	}))
	slog.SetDefault(logger)
}

func main() {
	goJt808 := service.New(
		service.WithHostPorts("0.0.0.0:8080"),
		service.WithNetwork("tcp"),
	)
	goJt808.Run()
}

  • 目前(2024-10-01)前的go语言版本个人觉得都不好因此都不推荐参考 推荐参考资料如下

参考资料

项目名称 语言 日期 Star 数 链接
JT808 C# 2024-10-01 534 JT808 C#
jt808-server Java 2024-10-01 1.4k+ JT808 Java

性能测试

  • java模拟器(QQ群下载 373203450)
  • go模拟器 详情点击

连接数测试

详情点击

  • 2台云服务器各开5w+客户端 总计10w+
服务端版本 场景 并发数 服务器配置 服务器使用资源情况 描述
v0.3.0 连接数测试 10w+ 2核4G 120%+cpu 1.7G内存 10.0.16.5上开启服务端和模拟器
10.0.16.14机器上开启模拟器

模拟经纬度存储测试

详情点击

  • save进程丢失了部分数据 channel队列溢出抛弃 (测试channel队列为100)
  • 保存1亿丢失826条 保存4.32亿丢失1216条(分两次测试)
服务端版本 客户端 服务器配置 服务使用资源情况 描述
v0.3.0 1w go模拟器 2核4G 35%cpu 180.4MB内存 每秒5000 一共保存经纬度1亿
实际保存99999174 成功率99.999%
服务 cpu 内存 描述
server 35% 180.4MB 808服务端
client 23% 196MB 模拟客户端
save 18% 68.8MB 存储数据服务
nats-server 20% 14.8MB 消息队列
taosadapter 37% 124.3MB tdengine数据库适配
taosd 15% 124.7MB tdengine数据库

使用案例

1. 协议处理

1.1 协议解析

func main() {
	t := terminal.New(terminal.WithHeader(consts.JT808Protocol2013, "1")) // 自定义模拟器 2013版本
	data := t.CreateDefaultCommandData(consts.T0100Register) // 生成的预值注册指令 0x0100
	fmt.Println(fmt.Sprintf("模拟器生成的[%x]", data))

	jtMsg := jt808.NewJTMessage()
	_ = jtMsg.Decode(data) // 解析固定请求头

	var t0x0100 model.T0x0100 // 把body数据解析到结构体中
	_ = t0x0100.Parse(jtMsg)
	fmt.Println(jtMsg.Header.String())
	fmt.Println(t0x0100.String())
}

部分输出 输出详情

模拟器生成的[7e010000300000000000010001001f006e63643132337777772e3830382e636f6d0000000000000000003736353433323101b2e2413132333435363738797e]
[0100] 消息ID:[256] [终端-注册]
消息体属性对象: {
        [0000000000110000] 消息体属性对象:[48]
        版本号:[JT2013]
        [bit15] [0]
        [bit14] 协议版本标识:[0]
        [bit13] 是否分包:[false]
        [bit10-12] 加密标识:[0] 0-不加密 1-RSA
        [bit0-bit9] 消息体长度:[48]
}
[000000000001] 终端手机号:[1]
[0001] 消息流水号:[1]
数据体对象:{
        终端-注册:[001f006e63643132337777772e3830382e636f6d0000000000000000003736353433323101b2e2413132333435363738]
        [001f] 省域ID:[31]
        [006e] 市县域ID:[110]
        [6364313233] 制造商ID(5):[cd123]
        [7777772e3830382e636f6d000000000000000000] 终端型号(20):[www.808.com]
        [37363534333231] 终端ID(7):[7654321]
        [01] 车牌颜色:[1]
        [b2e2413132333435363738] 车牌号:[测A12345678]
}

1.2 自定义协议扩展 (附加信息)

自定义解析扩展 0x33为例 关键代码如下

func (l *Location) Parse(jtMsg *jt808.JTMessage) error {
	l.T0x0200AdditionDetails.CustomAdditionContentFunc = func(id uint8, content []byte) (model.AdditionContent, bool) {
		if id == uint8(consts.A0x01Mile) {
			l.customMile = 100
		}
		if id == 0x33 {
			value := content[0]
			l.customValue = value
			return model.AdditionContent{
				Data:        content,
				CustomValue: value,
			}, true
		}
		return model.AdditionContent{}, false
	}
	return l.T0x0200.Parse(jtMsg)
}

func (l *Location) OnReadExecutionEvent(message *service.Message) {
	if v, ok := tmp.Additions[consts.A0x01Mile]; ok {
		fmt.Println(fmt.Sprintf("里程[%d] 自定义辅助里程[%d]", v.Content.Mile, tmp.customMile))
	}
	id := consts.JT808LocationAdditionType(0x33)
	if v, ok := tmp.Additions[id]; ok {
		fmt.Println("自定义未知信息扩展", v.Content.CustomValue, tmp.customValue)
	}
}

部分输出 输出详情

里程[11] 自定义辅助里程[100]
自定义未知信息扩展 32 32

1.3 平台下发给终端参数 (8104查询终端参数)

关键代码如下

	replyMsg := goJt808.SendActiveMessage(&service.ActiveMessage{
		Key:              phone,                           // 默认使用手机号作为唯一key 根据key找到对应终端的TCP链接
		Command:          consts.P8104QueryTerminalParams, // 下发的指令
		Body:             nil,                             // 下发的body数据 8104为空
		OverTimeDuration: 3 * time.Second,                 // 超时时间 设备这段时间没有回复则失败
	})
	var t0x0104 model.T0x0104
	if err := t0x0104.Parse(replyMsg.JTMessage); err != nil {
		panic(err)
	}
	fmt.Println(t0x0104.String())

部分输出 输出详情

数据体对象:{
        [0003] 应答消息流水号:[3]
        [5b] 应答参数个数:[91]
        终端-查询参数:

        {
                [0001]终端参数ID:1 终端心跳发送间隔,单位为秒(s)
                参数长度[4] 是否存在[true]
                [0000000a]参数值:[10]
        }
		...
        {
                [0110]终端参数ID:272 CAN总线ID单独采集设置:
                参数长度[8] 是否存在[true]
                [0000000000000101]参数值:[[0 0 0 0 0 0 1 1]]
        }
        未知终端参数id:[33 117 118 119 121 122 123 124]
}

2. 自定义保存经纬度

详情点击

  • 自定义实现0x0200的消息处理 把数据发送到nats 关键代码如下
type T0x0200 struct {
	model.T0x0200
}

func (t *T0x0200) OnReadExecutionEvent(message *service.Message) {
	var t0x0200 model.T0x0200
	if err := t0x0200.Parse(message.JTMessage); err != nil {
		fmt.Println(err)
		return
	}
	location := shared.NewLocation(message.Header.TerminalPhoneNo, t0x0200.Latitude, t0x0200.Longitude)
	if err := mq.Default().Pub(shared.SubLocation, location.Encode()); err != nil {
		fmt.Println(err)
		return
	}
}

func (t *T0x0200) OnWriteExecutionEvent(_ service.Message) {}
  • 使用自定义0x0200消息处理启动
	goJt808 := service.New(
		service.WithHostPorts("0.0.0.0:8080"),
		service.WithNetwork("tcp"),
		service.WithCustomHandleFunc(func() map[consts.JT808CommandType]service.Handler {
			return map[consts.JT808CommandType]service.Handler{
				consts.T0200LocationReport:      &T0x0200{},
			}
		}),
	)
	goJt808.Run()

3. jt808附件上传

4. jt1078相关

4.1 流媒体服务使用lal

  • 把1078格式流转对应格式 放入lal服务中 核心代码参考
func (j *jt1078) createStream(name string) chan<- *Packet {
	...
	ch := make(chan *Packet, 100)
	go func(session logic.ICustomizePubSessionContext, ch <-chan *Packet) {
		for v := range ch {
				...
				switch v.Flag.PT {
				case PTG711A:
					tmp.PayloadType = base.AvPacketPtG711A
				case PTG711U:
					tmp.PayloadType = base.AvPacketPtG711U
				case PTH264:
				case PTH265:
					tmp.PayloadType = base.AvPacketPtHevc
				default:
					slog.Warn("未知类型",
						slog.Any("pt", v.Flag.PT))
				}
				if err := session.FeedAvPacket(tmp); err != nil {
					slog.Warn("session.FeedAvPacket",
						slog.Any("err", err))
				}
			}
		}
	}(session, ch)
}

协议对接完成情况

JT808 终端通讯协议消息对照表

序号 消息 ID 完成情况 测试情况 消息体名称 2019 版本 2011 版本
1 0x0001 终端通用应答
2 0x8001 平台-通用应答
3 0x0002 终端心跳
5 0x0100 终端注册 修改 被修改
4 0x8003 补传分包请求 被新增
6 0x8100 平台-注册应答
8 0x0102 终端鉴权 修改
10 0x8104 平台-查询终端参数
11 0x0104 查询终端参数应答
18 0x0200 位置信息汇报 增加附加信息 被修改
49 0x0704 定位数据批量上传 修改 被新增

JT1078 扩展 JT808 议消息对照表

序号 消息 ID 完成情况 测试情况 消息体名称
13 0x1003 终端上传音视频属性
14 0x1005 终端上传乘客流量
15 0x1205 终端上传音视频资源列表
16 0x1206 文件上传完成通知
17 0x9003 平台-查询终端音视频属性
18 0x9101 平台-实时音视频传输请求
19 0x9102 平台-音视频实时传输控制
20 0x9105 平台-实时音视频传输状态通知
21 0x9201 平台-下发远程录像回放请求
22 0x9202 平台-下发远程录像回放控制
23 0x9205 平台-查询资源列表
24 0x9206 平台-文件上传指令
25 0x9207 平台-文件上传控制