Process-Schedule-Go(ps-go)是一个流程调度引擎,可以对流程进行可视化编排。
这听起来很不太容易理解,我们来想想这样一个需求场景,现在产品经理告诉你,针对某某节日,需要做一个针对于新用户的活动,希望能够在用户注册成功之后给用户发放一张优惠券,在发送短息通知用户。
一般在公司中,用户中心、优惠券中心、短信服务都是属于基础服务,都由分别的人进行负责。用户中心具有注册接口,优惠券中心具有优惠券发送接口,短信中心具有短信发送接口。
在这个需求背景下,可以由前端或者后端来进行解决,首先来说一下后端处理的方法,由用户中心去在用户注册接口去处理,当用户注册成功之后再去调用优惠券发放,短信通知接口, 但是用户中心本身就是一个独立的服务,再在里面加一些优惠券相关的代码,各个服务之间的耦合性太高,作为用户中心的服务负责人,他肯定也不愿意去为了这个需求而去改这个接口。 其次就是前端来进行解决,前端在调用用户注册接口成功之后,在调用优惠券发放和短信发放接口, 前端处理的坏处最直观的就是,你的分多次请求,请求次数越多,需要的时间越长,这里的请求都是从客户端直接请求,肯定没有从用户中心服务请求快,因为他们都是内网,而且客户端请求一次还要做判断。 当然前端也可以做异步请求,但是如果用户以为已经注册成功,从而关掉界面啥的,就会出问题。
针对于这个需求,调度引擎来进行处理就显得极为方便,我们来看看以下流程配置:
{
"request": {
"type": "json",
"body": {
"phone": {
"type": "string",
"required": true
}
}
},
"components": [
[
{
"name": "user_register",
"desc": "用户注册",
"type": "api",
"method":"get",
"url": "http://user_service/api/v/1/register",
"input": {
"phone": "{request.body.phone}"
},
"outputName": "userResponse",
"timeout": 10
},
{
"name": "grant_coupon",
"desc": "发放优惠券",
"type": "api",
"method":"get",
"url": "http://coupon_service/api/v/1/grant_coupon",
"input": {
"user_id": "{user.id}"
},
"outputName": "grantCouponResponse",
"timeout": 10
},
{
"name": "send_msg",
"desc": "发送短信",
"type": "api",
"method":"get",
"url": "http://msg_service/api/v/1/send_msg",
"input": {
"phone": "{request.body.phone}"
},
"outputName": "msgResponse",
"timeout": 10
}
]
],
"response": {
"type": "json",
"body": {
"code": "{response.code}",
"msg": "{response.msg}",
"data": "{response.data}"
},
"defaultBody": {
"code": 200,
"msg": "success",
"data": "注册成功"
}
}
}
看到以上配置是不是还是一脸懵逼,没关系,接下来我们一步步讲解。
流程主配置主要由请求配置(request)、组件配置(components)、返回配置(response)组成,主要结构如下:
{
"record": true,
"suspend": false, //是否支持任务挂起
"request": {}, //请求相关配置
"response": {}, //返回相关配置
"components": [ //执行组件相关配置
[
{}
]
]
}
# 字段含义解释
record:是否记录执行日志,当流程执行完成之后,是否需要将执行日志写入数据库,可以用作后续的执行流程查询等。
suspend:是否支持任务挂起,任务挂起就是指当我们的执行流程比较长的情况下,若遇到代码bug\接口bug\网络波动等情况,导致流程异常时,是否支持恢复重试,此时流程会保存当前执行状态,写入数据库,可以等待异常解决之后进行手动恢复。
request:请求相关配置,后续详细说明。
response:返回相关配置,后续详细说明。
components:执行组件相关配置,后续详细说明。
在讲解请求配置之前,我们先来认识以下,字段验证配置(FieldValidate),字段验证配置时用来对字段进行限制的,具体配置如下:
{
"type": "string", //字段类型 [int|float|string|slice|bool|object]
"attribute": FieldValidate, //子属性字段验证配置, 仅[object]支持
"required": true, //是否必填
"default": "1", //默认值
"maxLen": 10, //最大长度, 仅[string]支持
"minLen": 2, //最小长度, 仅[string]支持
"max": 10, //最大值, 仅[int|float]支持
"min": 4, //最小值, 仅[int|float]支持
"enum": ["1","2"] //枚举值, 仅[int|float|string|slice]支持
}
请求主配置字段:
{
"type": "json", //数据类型,比如发送post请求的时候,前端可能发送的时xml格式的数据,这时候type则填写xml
"query": FieldValidate, //通过url携带的参数规则校验
"body": FieldValidate, //通过body携带的参数规则校验
"header": FieldValidate //通过请求头携带的参数规则校验
}
我希望设置前端在请求的时候必须传一个数据为json,存在一个字段为phone,且必须为string类型的配置示例:
{
"type": "json",
"body": {
"phone": {
"type": "string",
"required": true
}
}
}
流程组件配置是一个二维数组,是由多个组件配置组成的,格式为[][]component,示例如下:
[
[ // step-1
{ // action-1
...
},
{ // action-2
...
},
...//具体的component的配置规则
],
[ // step-2
{ // action-1
...
},
{ // action-2
...
},
...//具体的component的配置规则
],
]
我们把这个而为数组的行称作step,把每一行中的配置叫做组件配置component。 在执行流程组件是,是从上往下执行的,第一层执行完成之后,执行第二层,第二层执行完成之后执行第三层...,知道流程结束。 而每一层中,具有N个组件配置,这些组件配置是真正需要执行的配置,每一层中的组件配置是通过并发进行执行的。当第一层的所有组件并发执行完成之后,才会执行第二层。
执行顺序从上往下,流程如下:
|-step-1->并发执行[action-1、action-2...]
|-step-2->并发执行[action-1、action-2...]
|-step-3->并发执行[action-1、action-2...]
|-step-4->并发执行[action-1、action-2...]
...
了解了执行规则之后,我们再来详细说一下组件配置,具体可配置字段如下:
{
"name": "devops", //组件名,同一个step层下,name不能重复
"desc": "流程描述", //组件描述
"type": "script", //组件类型 [api|script]
"url": "rule/api/test2.js", //type=script时则为具体的脚本文件,否则为api的url
"input": { //输入参数
"data": "{request.body}" //{request.body}表示去输入的request配置下的body字段的值,也就是请求时携带的body数据
},
"condition":"{request.body.phone} != '0000'" //执行准入条件,当条件符合才进行执行脚本,否则跳过执行
"isCache":true, //是否进行执行缓存,设置了之后,不会执行组件,直接取上一次的返回值。
"outputName": "devops", //输出的对象名。假如你的接口或者脚本最终返回数据为{"code":200}。则后续可以通过{devops.code}进行取值
"timeout": 10 //执行超时时间
"retryMaxCount":1,//最大重试次数
"retryMaxWait":10, //重试最大等待时长
"method":"get",//请求方法,仅api支持
"contentType":"", //数据类型,仅api支持
"auth":["123","456"],//请求header auth,仅api支持
"header":{}, //请求header头,仅api支持
"responseType":"json/xml", //返回数据类型,仅api支持
"dataType":"json/xml", //请求数据类型,仅api支持
// {code:200,msg:"success",data:{phone:"xxxx"}}
"responseCondition":"{code}==200", //返回条件判断
"outputData":"{data}", //返回数据
"errMsg":"{msg}",
"ignoreError":true, //是否忽略错误
"tls":{ //发送http请求携带的证书
"ca":"123", //ca 标志符。会从密钥库查询标志符对应的密钥
"key":"345" //key 标志符。会从密钥库查询标志符对应的密钥
}
}
组件主要分为两种,一种是脚本组件,一种是api组件。脚本组件我们可以用它来进行复杂的判断等,我们也可以通过javascript来进行编写脚本。比如上面的配置执行了一个rule/api/test2.js的脚本。我们来看看这个脚本的代码
function handler(ctx,input){
return ctx.request({"method":"get","url":"https://baidu.com"})
}
这是一个很简单的脚本代码,主要是发送了一个get请求。但是这里我们要注意的是,脚本文件具有固定的格式,也就是
function handler(ctx,input){
// do some
return xxx;
}
其中ctx可以用来调用一些系统提供的方法。比如刚才我们看到的ctx.request()就是发送请求的。 input是组件配置中input配置的输入的参数,函数return之后的数据就会被挂载到配置中的outputName设置的字段上。
ctx 目前提供的方法主要如下:
ctx.request //发送请求 ctx.request({})
ctx.log //打印日志 ctx.log.info()
ctx.data //设置上下文数据 ctx.data.load() ctx.data.store()
ctx.logId //获取日志链路id ctx.logId()
ctx.trx //获取唯一标志符 ctx.trx()
ctx.suspend //主动挂起流程 ctx.suspend("挂起code","挂起原因")
ctx.break //主动中断流程 ctx.break("中断原因")
ctx.response //主动返回数据 ctx.response({})
ctx.base64 //base64加解密相关 ctx.base64.encode() ctx.base64.decode()
ctx.uuid //生成唯一字符串 ctx.uuid()
ctx.aes //aes加解密相关
ctx.rsa //rsa加解密相关,关联密钥管理库
密钥管理库就是把所有的密钥信息进行统一管理,一个密钥存在一个对应的标志符。我们在对接一些接口的时候,存在需要使用密钥的情况,这种时候我们不需要在代码里面去处理密钥,直接使用密钥标志符就可以了,在代码里面会通过密钥标志符找到对应的密钥信息使用。
// 流程规则相关api
api.GET("/rule", handler.GetRule)
api.GET("/rule/page", handler.PageRule)
api.POST("/rule", handler.AddRule)
api.PUT("/rule/switch_version", handler.SwitchRuleVersion)
api.DELETE("/rule", handler.DeleteRule)
// 脚本相关api
api.GET("/script", handler.GetScript)
api.GET("/script/page", handler.PageScript)
api.POST("/script", handler.AddScript)
api.PUT("/script/switch_version", handler.SwitchScriptVersion)
api.DELETE("/script", handler.DeleteScript)
// 密钥管理相关
api.GET("/secret", handler.GetSecret)
api.GET("/secret/page", handler.PageSecret)
api.POST("/script", handler.AddSecret)
api.PUT("/secret", handler.UpdateSecret)
api.DELETE("/script", handler.DeleteSecret)
// 异常中断api
api.GET("/suspend/page", handler.PageSuspend)
api.GET("/suspend", handler.GetSuspend)
api.POST("/suspend/recover", handler.SuspendRecover) //异常中断恢复
api.PUT("/suspend", handler.UpdateSuspend)
// 执行日志相关api
api.GET("/run_log", handler.GetRunLog)
version: '3' services: db: #构建mysql镜像 image: mysql container_name: mysql-db # 容器名 command: mysqld --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci #设置utf8字符集 restart: always environment: MYSQL_ROOT_PASSWORD: root #root管理员用户密码 MYSQL_USER: test #创建test用户 MYSQL_PASSWORD: test #设置test用户的密码 ports:
- '3307:3306' #host物理直接映射端口为6606 volumes: #mysql数据库挂载到host物理机目录/e/docker/mysql/data/db
- "/e/docker/mysql/data/db:/var/lib/mysql"
#容器的配置目录挂载到host物理机目录/e/docker/mysql/data/conf - "/e/docker/mysql/data/conf:/etc/mysql/conf.d"