ytyht226/taskflow

循环节点

cookie2013 opened this issue · 6 comments

循环节点应该如何编排?

循环节点应该如何编排?

是说某个节点需要执行多次?目前引擎在编排层面不支持这种方式,不过你可以在op的具体实现里面控制循环的次数。另外,循环节点的使用场景可以举例说明一下么,我看看后续版本是否需要支持?

循环节点应该如何编排?

是说某个节点需要执行多次?目前引擎在编排层面不支持这种方式,不过你可以在op的具体实现里面控制循环的次数。另外,循环节点的使用场景可以举例说明一下么,我看看后续版本是否需要支持?

是的。比如不断判断某个节点的返回数据是否有值,如果还有值,则参数+1后继续执行该节点,直到取出所有数据。比如某个http调用是需要分页,不断查询的

循环节点应该如何编排?

是说某个节点需要执行多次?目前引擎在编排层面不支持这种方式,不过你可以在op的具体实现里面控制循环的次数。另外,循环节点的使用场景可以举例说明一下么,我看看后续版本是否需要支持?

是的。比如不断判断某个节点的返回数据是否有值,如果还有值,则参数+1后继续执行该节点,直到取出所有数据。比如某个http调用是需要分页,不断查询的

除了上面说的可以直接在op逻辑内部实现循环控制次数,还可以结合op的回调接口实现,下面是一个例子,可以参考:
CycleTest代码实现:
image
image
Operator1代码实现:
image
Operator2代码实现:
image

next的下个节点,加了judge是false的也会执行,是否有问题

next的下个节点,加了judge是false的也会执行,是否有问题

@Test
    public void test2() {
        DagEngine engine = new DagEngine(executor);
        int param = 1;
        OperatorWrapper<Integer, Integer> wrapper1 = new OperatorWrapper<Integer, Integer>()
                .id("1")
                .engine(engine)
                .operator(operator1)
                .context(param)
                .next("2")
                ;
        OperatorWrapper<Integer, Integer> wrapper2 = new OperatorWrapper<Integer, Integer>()
                .id("2")
                .engine(engine)
                .operator(operator2)
                .addParamFromWrapperId("1")
                .condition(new Wrapper2Condition())
                ;
        engine.runAndWait(900_000);
    }

private static class Wrapper2Condition implements ICondition {

        @Override
        public boolean judge(OperatorWrapper wrapper) {
            OperatorResult<Integer> wrapper1Result = DagContextHolder.getOperatorResult("1");
            int result = 0;
            if (wrapper1Result != null && wrapper1Result.getResultState() == ResultState.SUCCESS) {
                result = wrapper1Result.getResult();
            }
            System.out.println("Wrapper2Condition result: " + result);
            return result == 0;
        }
    }

next的下个节点,加了judge是false的也会执行,是否有问题

@Test
    public void test2() {
        DagEngine engine = new DagEngine(executor);
        int param = 1;
        OperatorWrapper<Integer, Integer> wrapper1 = new OperatorWrapper<Integer, Integer>()
                .id("1")
                .engine(engine)
                .operator(operator1)
                .context(param)
                .next("2")
                ;
        OperatorWrapper<Integer, Integer> wrapper2 = new OperatorWrapper<Integer, Integer>()
                .id("2")
                .engine(engine)
                .operator(operator2)
                .addParamFromWrapperId("1")
                .condition(new Wrapper2Condition())
                ;
        engine.runAndWait(900_000);
    }

private static class Wrapper2Condition implements ICondition {

        @Override
        public boolean judge(OperatorWrapper wrapper) {
            OperatorResult<Integer> wrapper1Result = DagContextHolder.getOperatorResult("1");
            int result = 0;
            if (wrapper1Result != null && wrapper1Result.getResultState() == ResultState.SUCCESS) {
                result = wrapper1Result.getResult();
            }
            System.out.println("Wrapper2Condition result: " + result);
            return result == 0;
        }
    }

目前DAG图中的节点引擎只会执行一次,这个例子中节点2只有一个前继节点1,这种情况加在节点2上的condition其实是不起作用的,不管judge返回true、false当前节点都会执行;你可以试想一下,如果judge返回false,此时节点1已经执行过(state != DagState.RUNNING)不会再次执行了,那么引擎就没有机会再判断加在节点2上的condition,节点2就不可能执行了。代码中如果judge返回false时,其实有一个逻辑是看还有没有执行中的前继节点,如果没有是会执行当前节点的:
image