/process_definition_import_tool

Dolphinscheduler 外部作业导入工具(2.1)

Primary LanguagePython

process_definition_import_tool

Dolphinscheduler 外部作业导入工具(2.1)


用途

将通用的外部定义的作业配置导入到 Dolphinscheduler 指定的项目中, 目前支持版本2.0.5, 主要适用于作业迁移和开发/测试与生产隔离的情况.

  • 2.1 改为从excel离线导入增量配置, 几个要点:

    1. 作业的shell节点必须在项目的整个DAG中保持唯一, 用作业所在流程的完整路径(流程/子流程/子流程...../任务节点)匹配作业
    2. 上线时不能再对生产环境的调度进行开发类配置操作, 符合金融机构环境管理要求
  • 由于2.0使用了特定code作为实际id的机制, 无法像1.3.x一样直接操作数据库, 当前版本通过调用DS的API实现项目的导入

** dependent任务和sub_process任务的参数中,指向其他作业的code在导入时无法自动映射, 在生成dependent任务和sub_process任务时, 引用的流程必须已经存在 **

** 因为数仓任务涉及业务连续性问题, 不允许跨日执行, 大部分作业节点必须有前一天的任务自依赖, 但迁移后首次执行时不能有依赖节点, 否则会等待不存在的任务实例(空跑仍然会等待依赖节点) **

  • 以上问题使用以下解决办法: ** 直接在数据库中生成上一天的伪实例,用于下一天的依赖检测 **

  • 同时更新作业定义时, 已存在的作业定义必须保留, 否则自依赖引用会失效, 使用以下流程实现完全增量更新:

  1. 先导入服务器中的所有作业,同时导入excel配置中的作业定义(导入时检测是否有重叠的节点,有重叠则更新)
  2. 用名称匹配项目下已有的任务code, 不存在时生成新任务code
  3. 遍历dag生成流程, 将流程定义更新到服务中

代码概述

  • main_general.py 主程序, 负责导入服务器及excel的作业,生成DAG树.
  • main_ui.py 主程序的tk ui
  • ds_config.py DS服务的口令及地址
  • ds_api.py http api的封装
  • ds_db.py 简单的数据库操作函数
  • name_standardizer.py 名称标准化函数, 依赖excel中的配置
  • schedule_config.py 用于导入excel配置
  • dag_node 实现DAG到流和作业的json的转换, 以及对比差异

特性

** 由于数仓作业种类多且杂且DS目前不支持项目级的作业操作, 每个作业一个流程会导致任务数巨大且触发配置困难, 上万个有复杂依赖关系的任务只能分层切分至多个流程组, 通过子流程完成整体任务的触发 **

  • 3级以下 分组的 任务的依赖 会上升为同级别(3级以下)中 子任务间的DAG依赖, 3级以下(或DAG_DEP_LEVELS定义) 分组找不到共同分组时, 自动添加任务的 依赖节点

生成过程(2.1)

  1. DS中添加 租户, 用户, 项目环境
  2. 配置ds_config.py(配置口令及服务IP)
  3. 执行python/python3 main_general.py [excel配置文件路径]
    1. 执行python/python3 main_ui.py 以ui形式进行作业配置导入