- 本项目已更好支持二次开发为目标 可通过各种自定义事件去完成相应功能 常见案例如下
- 存储经纬度 代码参考
jt808服务端 模拟器 消息队列 数据库都运行在2核4G腾讯云服务器
测试每秒保存5000条的情况 约5.5小时保存了近1亿的经纬度
- 平台下发指令给终端 代码参考
主动下发给设备指令 获取应答的情况
- jt1078视频 代码参考
开启jt808服务 开启jt1078服务 开启客户端接收到0x9101指令开始发送流
在线流地址 http://49.234.235.7:8080/live/295696659617_1.flv
- 协议交互详情 代码参考
使用自定义模拟器 可以轻松生成测试用的报文 有详情描述
- 自定义协议扩展 代码参考
自定义附加信息处理 获取想要的扩展内容
- 看飞哥的单机TCP百万并发 好奇有数据情况的表现 因此国庆准备试一试有数据的情况
- 性能测试 单机[2核4G机器]并发10w+ 每日保存4亿+经纬度 详情
- 安全可靠 核心协议部分测试覆盖率100% 纯原生go实现(不依赖任何库)
- 简洁优雅 核心代码不到1000行 信令交互不使用任何锁 仅使用channel完成
- 支持JT808(2011/2013/209) JT1078(需要其他流媒体服务) 支持分包和自动补传
package main
import (
"github.com/cuteLittleDevil/go-jt808/service"
"log/slog"
"os"
)
func init() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
AddSource: true,
Level: slog.LevelDebug,
ReplaceAttr: nil,
}))
slog.SetDefault(logger)
}
func main() {
goJt808 := service.New(
service.WithHostPorts("0.0.0.0:8080"),
service.WithNetwork("tcp"),
)
goJt808.Run()
}
- 目前(2024-10-01)前的go语言版本个人觉得都不好因此都不推荐参考 推荐参考资料如下
项目名称 | 语言 | 日期 | Star 数 | 链接 |
---|---|---|---|---|
JT808 | C# | 2024-10-01 | 534 | JT808 C# |
jt808-server | Java | 2024-10-01 | 1.4k+ | JT808 Java |
- java模拟器(QQ群下载 373203450)
- go模拟器 详情点击
- 2台云服务器各开5w+客户端 总计10w+
服务端版本 | 场景 | 并发数 | 服务器配置 | 服务器使用资源情况 | 描述 |
---|---|---|---|---|---|
v0.3.0 | 连接数测试 | 10w+ | 2核4G | 120%+cpu 1.7G内存 | 10.0.16.5上开启服务端和模拟器 10.0.16.14机器上开启模拟器 |
- save进程丢失了部分数据 channel队列溢出抛弃 (测试channel队列为100)
- 保存1亿丢失826条 保存4.32亿丢失1216条(分两次测试)
服务端版本 | 客户端 | 服务器配置 | 服务使用资源情况 | 描述 |
---|---|---|---|---|
v0.3.0 | 1w go模拟器 | 2核4G | 35%cpu 180.4MB内存 | 每秒5000 一共保存经纬度1亿 实际保存99999174 成功率99.999% |
服务 | cpu | 内存 | 描述 |
---|---|---|---|
server | 35% | 180.4MB | 808服务端 |
client | 23% | 196MB | 模拟客户端 |
save | 18% | 68.8MB | 存储数据服务 |
nats-server | 20% | 14.8MB | 消息队列 |
taosadapter | 37% | 124.3MB | tdengine数据库适配 |
taosd | 15% | 124.7MB | tdengine数据库 |
func main() {
t := terminal.New(terminal.WithHeader(consts.JT808Protocol2013, "1")) // 自定义模拟器 2013版本
data := t.CreateDefaultCommandData(consts.T0100Register) // 生成的预值注册指令 0x0100
fmt.Println(fmt.Sprintf("模拟器生成的[%x]", data))
jtMsg := jt808.NewJTMessage()
_ = jtMsg.Decode(data) // 解析固定请求头
var t0x0100 model.T0x0100 // 把body数据解析到结构体中
_ = t0x0100.Parse(jtMsg)
fmt.Println(jtMsg.Header.String())
fmt.Println(t0x0100.String())
}
部分输出 输出详情
模拟器生成的[7e010000300000000000010001001f006e63643132337777772e3830382e636f6d0000000000000000003736353433323101b2e2413132333435363738797e]
[0100] 消息ID:[256] [终端-注册]
消息体属性对象: {
[0000000000110000] 消息体属性对象:[48]
版本号:[JT2013]
[bit15] [0]
[bit14] 协议版本标识:[0]
[bit13] 是否分包:[false]
[bit10-12] 加密标识:[0] 0-不加密 1-RSA
[bit0-bit9] 消息体长度:[48]
}
[000000000001] 终端手机号:[1]
[0001] 消息流水号:[1]
数据体对象:{
终端-注册:[001f006e63643132337777772e3830382e636f6d0000000000000000003736353433323101b2e2413132333435363738]
[001f] 省域ID:[31]
[006e] 市县域ID:[110]
[6364313233] 制造商ID(5):[cd123]
[7777772e3830382e636f6d000000000000000000] 终端型号(20):[www.808.com]
[37363534333231] 终端ID(7):[7654321]
[01] 车牌颜色:[1]
[b2e2413132333435363738] 车牌号:[测A12345678]
}
自定义解析扩展 0x33为例 关键代码如下
func (l *Location) Parse(jtMsg *jt808.JTMessage) error {
l.T0x0200AdditionDetails.CustomAdditionContentFunc = func(id uint8, content []byte) (model.AdditionContent, bool) {
if id == uint8(consts.A0x01Mile) {
l.customMile = 100
}
if id == 0x33 {
value := content[0]
l.customValue = value
return model.AdditionContent{
Data: content,
CustomValue: value,
}, true
}
return model.AdditionContent{}, false
}
return l.T0x0200.Parse(jtMsg)
}
func (l *Location) OnReadExecutionEvent(message *service.Message) {
if v, ok := tmp.Additions[consts.A0x01Mile]; ok {
fmt.Println(fmt.Sprintf("里程[%d] 自定义辅助里程[%d]", v.Content.Mile, tmp.customMile))
}
id := consts.JT808LocationAdditionType(0x33)
if v, ok := tmp.Additions[id]; ok {
fmt.Println("自定义未知信息扩展", v.Content.CustomValue, tmp.customValue)
}
}
部分输出 输出详情
里程[11] 自定义辅助里程[100]
自定义未知信息扩展 32 32
关键代码如下
replyMsg := goJt808.SendActiveMessage(&service.ActiveMessage{
Key: phone, // 默认使用手机号作为唯一key 根据key找到对应终端的TCP链接
Command: consts.P8104QueryTerminalParams, // 下发的指令
Body: nil, // 下发的body数据 8104为空
OverTimeDuration: 3 * time.Second, // 超时时间 设备这段时间没有回复则失败
})
var t0x0104 model.T0x0104
if err := t0x0104.Parse(replyMsg.JTMessage); err != nil {
panic(err)
}
fmt.Println(t0x0104.String())
部分输出 输出详情
数据体对象:{
[0003] 应答消息流水号:[3]
[5b] 应答参数个数:[91]
终端-查询参数:
{
[0001]终端参数ID:1 终端心跳发送间隔,单位为秒(s)
参数长度[4] 是否存在[true]
[0000000a]参数值:[10]
}
...
{
[0110]终端参数ID:272 CAN总线ID单独采集设置:
参数长度[8] 是否存在[true]
[0000000000000101]参数值:[[0 0 0 0 0 0 1 1]]
}
未知终端参数id:[33 117 118 119 121 122 123 124]
}
- 自定义实现0x0200的消息处理 把数据发送到nats 关键代码如下
type T0x0200 struct {
model.T0x0200
}
func (t *T0x0200) OnReadExecutionEvent(message *service.Message) {
var t0x0200 model.T0x0200
if err := t0x0200.Parse(message.JTMessage); err != nil {
fmt.Println(err)
return
}
location := shared.NewLocation(message.Header.TerminalPhoneNo, t0x0200.Latitude, t0x0200.Longitude)
if err := mq.Default().Pub(shared.SubLocation, location.Encode()); err != nil {
fmt.Println(err)
return
}
}
func (t *T0x0200) OnWriteExecutionEvent(_ service.Message) {}
- 使用自定义0x0200消息处理启动
goJt808 := service.New(
service.WithHostPorts("0.0.0.0:8080"),
service.WithNetwork("tcp"),
service.WithCustomHandleFunc(func() map[consts.JT808CommandType]service.Handler {
return map[consts.JT808CommandType]service.Handler{
consts.T0200LocationReport: &T0x0200{},
}
}),
)
goJt808.Run()
- 把1078格式流转对应格式 放入lal服务中 核心代码参考
func (j *jt1078) createStream(name string) chan<- *Packet {
...
ch := make(chan *Packet, 100)
go func(session logic.ICustomizePubSessionContext, ch <-chan *Packet) {
for v := range ch {
...
switch v.Flag.PT {
case PTG711A:
tmp.PayloadType = base.AvPacketPtG711A
case PTG711U:
tmp.PayloadType = base.AvPacketPtG711U
case PTH264:
case PTH265:
tmp.PayloadType = base.AvPacketPtHevc
default:
slog.Warn("未知类型",
slog.Any("pt", v.Flag.PT))
}
if err := session.FeedAvPacket(tmp); err != nil {
slog.Warn("session.FeedAvPacket",
slog.Any("err", err))
}
}
}
}(session, ch)
}
序号 | 消息 ID | 完成情况 | 测试情况 | 消息体名称 | 2019 版本 | 2011 版本 |
---|---|---|---|---|---|---|
1 | 0x0001 | ✅ | ✅ | 终端通用应答 | ||
2 | 0x8001 | ✅ | ✅ | 平台-通用应答 | ||
3 | 0x0002 | ✅ | ✅ | 终端心跳 | ||
5 | 0x0100 | ✅ | ✅ | 终端注册 | 修改 | 被修改 |
4 | 0x8003 | ✅ | ✅ | 补传分包请求 | 被新增 | |
6 | 0x8100 | ✅ | ✅ | 平台-注册应答 | ||
8 | 0x0102 | ✅ | ✅ | 终端鉴权 | 修改 | |
10 | 0x8104 | ✅ | ✅ | 平台-查询终端参数 | ||
11 | 0x0104 | ✅ | ✅ | 查询终端参数应答 | ||
18 | 0x0200 | ✅ | ✅ | 位置信息汇报 | 增加附加信息 | 被修改 |
49 | 0x0704 | ✅ | ✅ | 定位数据批量上传 | 修改 | 被新增 |
序号 | 消息 ID | 完成情况 | 测试情况 | 消息体名称 |
---|---|---|---|---|
13 | 0x1003 | ✅ | ✅ | 终端上传音视频属性 |
14 | 0x1005 | ✅ | ✅ | 终端上传乘客流量 |
15 | 0x1205 | ✅ | ✅ | 终端上传音视频资源列表 |
16 | 0x1206 | ✅ | ✅ | 文件上传完成通知 |
17 | 0x9003 | ✅ | ✅ | 平台-查询终端音视频属性 |
18 | 0x9101 | ✅ | ✅ | 平台-实时音视频传输请求 |
19 | 0x9102 | ✅ | ✅ | 平台-音视频实时传输控制 |
20 | 0x9105 | ✅ | ✅ | 平台-实时音视频传输状态通知 |
21 | 0x9201 | ✅ | ✅ | 平台-下发远程录像回放请求 |
22 | 0x9202 | ✅ | ✅ | 平台-下发远程录像回放控制 |
23 | 0x9205 | ✅ | ✅ | 平台-查询资源列表 |
24 | 0x9206 | ✅ | ✅ | 平台-文件上传指令 |
25 | 0x9207 | ✅ | ✅ | 平台-文件上传控制 |