/qogir

Primary LanguagePython

QOGIR

一个任务执行框架

  • 安装
pip install qogir
  • 创建任务
qogir createjob <job-name> --type <job-type>[default|pyspark]

当前目录下会生成如下文件:

./job-name
    ├── config.yaml
    ├── __init__.py
    ├── job.py
    └── requirements.txt

其中,requirements.txt是任务执行的python依赖,config.yaml定义了任务执行的一些基础参数,在job-type是default时,初始化为

job-type:
 default

entry:
 job:main   # Entry function of job

python:
 <python-version> # python version, must be set before running

include_paths:  # Directory to insert to PYTHONPATH
 - /path/to/include_dir1
 - /path/to/include_dir2

其中,include_paths�参数将会将指定路径插入PYTHONPATH。基于这个特性,你可以调用不属于Qogir任务本身的模块、类和方法。

在job-type是pyspark时,初始化为

job-type:
 pyspark

entry:
 job:main   # Entry function of job

app-name:
 app        # Spark app name

log-level:
 INFO

hdfs-client-cls:
  hdfs.client:InsecureClient

hdfs-url:
 http://hadoop-host:50070

python:
 <python-version> # python version, must be set before running

command-params: # Parameters of command `spark-commit`
 HADOOP_CONF_DIR:
  /etc/hadoop
 MASTER:
  yarn
 SUBMIT_PARAMS:
  --driver-memory 6G
  --conf spark.default.parallelism=200
  --conf spark.driver.maxResultSize=2G
  --num-executors 20
  --executor-memory 4G
  --executor-cores 2

include_paths:  # Directory to insert to PYTHONPATH
 - /path/to/include_dir1
 - /path/to/include_dir2

job.py定义了基础的任务入口

# -*- coding: utf-8 -*-

from __future__ import print_function, absolute_import, unicode_literals

def main(cls):
    cls.spark   # spark session
    cls.sc      # spark context
    cls.hdfs    # hdfs client, an instance of InscureClient from hdfs
    cls.configs # configurations from config.yaml
    cls.params
  • 准备就绪,执行任务:

通过bash命令行方式:

qogir runjob <job-name> -P/--params <params-json>

-P/--params选项支持任务执行时的参数传递,格式为Json,被解析为dict注入到cls.params

Qogir支持通过Python脚本动态地执行任务,你可以将�Qogir任务集成到你的Web服务、定时�任务或单元测试中:

# -*- coding: utf-8 -*-
from qogir.core.runner import JobRunner

job = JobRunner('/path/to/job')

job.run(params={'k1': 'v1'})

如果你使用Python3.6或以上版本,你可以使用Qogir�提供的异步任务执行器AsyncJobRunner来高效的执行你的任务:

from qogir.core.arunner import AsyncJobRunner
import asyncio


loop = asyncio.get_event_loop()

async def run_qogir_job():
    job = await AsyncJobRunner('/path/to/job')
    await job.run()

loop.run_until_complete(run_qogir_job())