apache/dolphinscheduler-sdk-python

workflow defined by python don't create task definition in database

Closed this issue · 12 comments

I guess, which cause problem when running tasks with relationship. The second task will re-start again and again.

image
image

environmen

py: install from source
dolphinscheduler: 3.2.0
start from docker

will take a look this issue today

do you mean you can not create the workflow via python sdk? or could not run it?

I am able to create workflows normally, but according to my analysis, the process of creating a workflow should automatically create or update tasks in the workflow.

However, currently, the Python SDK creates workflows but does not correctly create tasks (at lease not show in ui, and http api can not get task created in this way, neither).

This results in workflows with single tasks that have no dependencies being able to run normally, but workflows with task dependencies cannot be properly created. The database cannot read the relationships of these tasks (I guess), leading to a restart loop.

Last night, I attempted to replace the workflow.submit function with my custom submit() function that uses the HTTP API. However, the API list in swagger-ui/index.html?urls.primaryName=v1 still not functioning properly. Ultimately, I reverse-engineered the API through the UI interface and successfully workedaround this issue. Currently my program is running smoothly.

the replaced submit function is posted here, hoping it's helpful.


import json


def get_task_code(project_code, num=1) -> List[int]:
    ret = auth_request(
        requests.get,
        f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition/gen-task-codes?genNum={num}",
    )
    return json.loads(ret.content)["data"]


def get_project(name: str):
    ret = requests.get(
        "http://172.32.2.55:12345/dolphinscheduler/projects/list",
        headers={
            "token": AUTH_TOKEN,
            "Accept": "application/json",
        },
    )

    for i in json.loads(ret.content)["data"]:
        if name == i["name"]:
            return i
    return None


def get_workflow(project_code: int, name: str):
    ret = requests.get(
        f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/list",
        headers={
            "token": "25fc3ddf36991fe6f95ce6bdd0d36448",
            "Accept": "application/json",
        },
    )
    for workflow in json.loads(ret.content)["data"]:
        if workflow["processDefinition"]["name"] == name:
            return workflow["processDefinition"]
    return None


def get_tasks(project_code: int):
    ret = requests.get(
        f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition?pageNo=1&pageSize=30",
        headers={
            "token": "25fc3ddf36991fe6f95ce6bdd0d36448",
            "Accept": "application/json",
        },
    )
    return json.loads(ret.content)["data"]["totalList"]


def get_resource_id(name: str):
    ret = auth_request(
        requests.get,
        "http://172.32.2.55:12345/dolphinscheduler/datasources/list?type=SSH&testFlag=0",
    )
    # breakpoint()

    for resource in json.loads(ret.content)["data"]:
        if resource["name"] == name:
            return resource["id"]
    raise KeyError()


def release(project_code, workflow_code):
    auth_request(
        requests.post,
        f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/{workflow_code}/release",
    )


def submit(workflow: Workflow, online=True):

    workflow_def = workflow.get_define()

    project = get_project(workflow_def["project"])
    project_code = project["code"]

    url_workflow = get_workflow(project_code, name=workflow_def["name"])
    # [{"taskCode":12534793670368,"x":160,"y":74},{"taskCode":12535087404769,"x":135.7613525390625,"y":250.74148559570312}]

    tasks = get_tasks(project_code)

    task_codes = set([task["taskCode"] for task in tasks])
    locations = []

    x_offset = 100
    y_offset = 100

    task_code_map = {}
    for task in workflow_def["taskDefinitionJson"]:  # type: dict
        task.setdefault("createTime", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        task.setdefault("updateTime", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        task.setdefault("projectName", project["name"])
        task.setdefault("projectCode", project["code"])
        task.setdefault("operator", 1)
        task.setdefault("modifyBy", None)
        task["delayTime"] = str(task["delayTime"])
        task["description"] = str(task["description"])
        task["failRetryTimes"] = str(task["failRetryInterval"])
        task["failRetryInterval"] = str(task["failRetryInterval"])
        if task.get("environmentCode", None) is None:
            task["environmentCode"] = -1
        if task.get("timeoutNotifyStrategy", None) is None:
            task["timeoutNotifyStrategy"] = ""
        task.setdefault("userName", None)
        task.setdefault("userId", 1)
        task.setdefault("taskParamList", [])
        task.setdefault("taskParamMap", {})
        task.setdefault("taskExecuteType", "BATCH")

        # locations.append(
        #     {
        #         "taskCode": task["code"],
        #         "x": x_offset,
        #         "y": y_offset,
        #     }
        # )
        x_offset += 30
        y_offset += 30

        # not needed
        # if task["code"] not in task_codes:
        #     gen_task_code = get_task_code(project_code)[0]
        #     task_code_map.setdefault(task["code"], gen_task_code)
        #     task["code"] = gen_task_code
        #     ret = create_single_task(project_code, url_workflow["code"], task)

    for relation in workflow_def["taskRelationJson"]:
        relation["preTaskCode"] = task_code_map.get(
            relation["preTaskCode"], relation["preTaskCode"]
        )
        relation["postTaskCode"] = task_code_map.get(
            relation["postTaskCode"], relation["postTaskCode"]
        )

    data = {
        "taskDefinitionJson": json.dumps(workflow_def["taskDefinitionJson"]),
        "taskRelationJson": json.dumps(workflow_def["taskRelationJson"]),
        "locations": "",
        "name": workflow_def["name"],
        "executionType": workflow_def["executionType"],
        "description": workflow_def["description"],
        "globalParams": json.dumps(workflow.param_json),
        "timeout": workflow_def["timeout"],
        "releaseState": "ONLINE" if online else "OFFLINE",
    }

    assert project is not None, workflow_def["project"]
    workflow = get_workflow(project["code"], workflow_def["name"])

    if workflow is None:
        # create
        ret = auth_request(
            requests.post,
            f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition",
            data=data,
        )
    else:
        if workflow_def["releaseState"] == 1:
            release(project_code, workflow["code"])
        # update
        ret = auth_request(
            requests.put,
            f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition/{workflow['code']}",
            data=data,
        )
    print(json.loads(ret.content))

btw, sub workflow is also not worked properly, I workaround it by replace definition of process_definition_code:

class CustomSubWorkflow(Task):
    """Task SubWorkflow object, declare behavior for SubWorkflow task to dolphinscheduler."""

    _task_custom_attr = {"process_definition_code"}

    def __init__(self, name: str, workflow_name: str, *args, **kwargs):
        super().__init__(name, TaskType.SUB_WORKFLOW, *args, **kwargs)
        self.workflow_name = workflow_name

    @property
    def process_definition_code(self) -> str:
        """Get workflow code, a wrapper for :func:`get_workflow_info`.

        We can not change this function name to workflow_code, because it is a keyword used in
        dolphinscheduler itself.
        """
        project_code = get_project(self.workflow.project.name)["code"]
        return get_workflow(project_code, self.workflow_name)["code"]

In my experience, perhaps a Python SDK that purely utilizes the HTTP API might be better.

I think the problem here is that when submitting with the Python SDK, the task definition isn't following the right task schema.

I am able to create workflows normally, but according to my analysis, the process of creating a workflow should automatically create or update tasks in the workflow.

However, currently, the Python SDK creates workflows but does not correctly create tasks (at lease not show in ui, and http api can not get task created in this way, neither).

This results in workflows with single tasks that have no dependencies being able to run normally, but workflows with task dependencies cannot be properly created. The database cannot read the relationships of these tasks (I guess), leading to a restart loop.

Last night, I attempted to replace the workflow.submit function with my custom submit() function that uses the HTTP API. However, the API list in swagger-ui/index.html?urls.primaryName=v1 still not functioning properly. Ultimately, I reverse-engineered the API through the UI interface and successfully workedaround this issue. Currently my program is running smoothly.

the replaced submit function is posted here, hoping it's helpful.


import json


def get_task_code(project_code, num=1) -> List[int]:
    ret = auth_request(
        requests.get,
        f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition/gen-task-codes?genNum={num}",
    )
    return json.loads(ret.content)["data"]


def get_project(name: str):
    ret = requests.get(
        "http://172.32.2.55:12345/dolphinscheduler/projects/list",
        headers={
            "token": AUTH_TOKEN,
            "Accept": "application/json",
        },
    )

    for i in json.loads(ret.content)["data"]:
        if name == i["name"]:
            return i
    return None


def get_workflow(project_code: int, name: str):
    ret = requests.get(
        f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/list",
        headers={
            "token": "25fc3ddf36991fe6f95ce6bdd0d36448",
            "Accept": "application/json",
        },
    )
    for workflow in json.loads(ret.content)["data"]:
        if workflow["processDefinition"]["name"] == name:
            return workflow["processDefinition"]
    return None


def get_tasks(project_code: int):
    ret = requests.get(
        f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition?pageNo=1&pageSize=30",
        headers={
            "token": "25fc3ddf36991fe6f95ce6bdd0d36448",
            "Accept": "application/json",
        },
    )
    return json.loads(ret.content)["data"]["totalList"]


def get_resource_id(name: str):
    ret = auth_request(
        requests.get,
        "http://172.32.2.55:12345/dolphinscheduler/datasources/list?type=SSH&testFlag=0",
    )
    # breakpoint()

    for resource in json.loads(ret.content)["data"]:
        if resource["name"] == name:
            return resource["id"]
    raise KeyError()


def release(project_code, workflow_code):
    auth_request(
        requests.post,
        f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/{workflow_code}/release",
    )


def submit(workflow: Workflow, online=True):

    workflow_def = workflow.get_define()

    project = get_project(workflow_def["project"])
    project_code = project["code"]

    url_workflow = get_workflow(project_code, name=workflow_def["name"])
    # [{"taskCode":12534793670368,"x":160,"y":74},{"taskCode":12535087404769,"x":135.7613525390625,"y":250.74148559570312}]

    tasks = get_tasks(project_code)

    task_codes = set([task["taskCode"] for task in tasks])
    locations = []

    x_offset = 100
    y_offset = 100

    task_code_map = {}
    for task in workflow_def["taskDefinitionJson"]:  # type: dict
        task.setdefault("createTime", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        task.setdefault("updateTime", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        task.setdefault("projectName", project["name"])
        task.setdefault("projectCode", project["code"])
        task.setdefault("operator", 1)
        task.setdefault("modifyBy", None)
        task["delayTime"] = str(task["delayTime"])
        task["description"] = str(task["description"])
        task["failRetryTimes"] = str(task["failRetryInterval"])
        task["failRetryInterval"] = str(task["failRetryInterval"])
        if task.get("environmentCode", None) is None:
            task["environmentCode"] = -1
        if task.get("timeoutNotifyStrategy", None) is None:
            task["timeoutNotifyStrategy"] = ""
        task.setdefault("userName", None)
        task.setdefault("userId", 1)
        task.setdefault("taskParamList", [])
        task.setdefault("taskParamMap", {})
        task.setdefault("taskExecuteType", "BATCH")

        # locations.append(
        #     {
        #         "taskCode": task["code"],
        #         "x": x_offset,
        #         "y": y_offset,
        #     }
        # )
        x_offset += 30
        y_offset += 30

        # not needed
        # if task["code"] not in task_codes:
        #     gen_task_code = get_task_code(project_code)[0]
        #     task_code_map.setdefault(task["code"], gen_task_code)
        #     task["code"] = gen_task_code
        #     ret = create_single_task(project_code, url_workflow["code"], task)

    for relation in workflow_def["taskRelationJson"]:
        relation["preTaskCode"] = task_code_map.get(
            relation["preTaskCode"], relation["preTaskCode"]
        )
        relation["postTaskCode"] = task_code_map.get(
            relation["postTaskCode"], relation["postTaskCode"]
        )

    data = {
        "taskDefinitionJson": json.dumps(workflow_def["taskDefinitionJson"]),
        "taskRelationJson": json.dumps(workflow_def["taskRelationJson"]),
        "locations": "",
        "name": workflow_def["name"],
        "executionType": workflow_def["executionType"],
        "description": workflow_def["description"],
        "globalParams": json.dumps(workflow.param_json),
        "timeout": workflow_def["timeout"],
        "releaseState": "ONLINE" if online else "OFFLINE",
    }

    assert project is not None, workflow_def["project"]
    workflow = get_workflow(project["code"], workflow_def["name"])

    if workflow is None:
        # create
        ret = auth_request(
            requests.post,
            f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition",
            data=data,
        )
    else:
        if workflow_def["releaseState"] == 1:
            release(project_code, workflow["code"])
        # update
        ret = auth_request(
            requests.put,
            f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition/{workflow['code']}",
            data=data,
        )
    print(json.loads(ret.content))

could you share what kind of task do you use in pydolphinschsduler?

Hi @sailist this issue fix in #140

I find DolphinScheduler to be an incredibly useful and powerful tool for enhancing efficiency. Currently I use it to daily build my repo and run regression test which may not be worth sharing. I am still exploring scenarios that are suitable for delegating tasks to DolphinScheduler. Perhaps a little later, when the project becomes more complex, I will be willing to do this sharing.

As I create more workflows and aim for easier management of data sources and environments, I may consider redeveloping a python SDK, which will transition the backend from Py4J to an HTTP API to incorporate functionalities that Py4J cannot provide.

I hope to get your opinion on this. Do you think resource management and other functions that HTTP Api have can be achieved through Py4J backend? Do you think this is a good way?

I find DolphinScheduler to be an incredibly useful and powerful tool for enhancing efficiency. Currently I use it to daily build my repo and run regression test which may not be worth sharing. I am still exploring scenarios that are suitable for delegating tasks to DolphinScheduler. Perhaps a little later, when the project becomes more complex, I will be willing to do this sharing.

sound cool, currently most of our users use dolhinscheduler to schedule data task workflow, like sql, spark, or hive job. And you put it in the new fields. May i ask you did it via ui or python sdk?

Further more, could you share some of sample you running in your production?

As I create more workflows and aim for easier management of data sources and environments, I may consider redeveloping a python SDK, which will transition the backend from Py4J to an HTTP API to incorporate functionalities that Py4J cannot provide.

The init reason we use py4j instead of http are

  • http protocol only support single round CURD, but we want to create some of the resource in batch
  • when we start python sdk, we also plan to refactor the http entry point
    so we use py4j to do that

I hope to get your opinion on this. Do you think resource management and other functions that HTTP Api have can be achieved through Py4J backend? Do you think this is a good way?

of cause we can add more entry point for python gateway and support all the resources, or we can reuse the exists http protocol