sogou/workflow

我有一个需求,希望能动态创建多个task,又希望这些task能被顺序的执行来避免多线程竞争,该如何做。

lordoffox opened this issue · 28 comments

如题。

感谢小伙伴的使用~这是一个比较经典的场景,一种解决办法是创建一个series来保证多个task的顺序执行:

SeriesWork *series = Workflow::create_series_work(first_task, series_callback);
series->start();

只要这个series指针可以被共享,就可以被多方往里放任务。但需要注意的是,series如果执行完、没有任务了就会结束,所以可以使用WFCounterTask作为内存开关,series内没有任务的时候放一个

WFCounterTask *counter = WFTaskFactory::create_counter_task(1, nullptr);

在task的回调里判断一下,当前series里是否还有后续任务,如果没有,就放入一个counter。counter可以用带名字的counter来全局技术,避免自己保存counter task指针的麻烦~

void mytask_callback(MyTask *task)
{
    if (/* series里没有其他任务了*/ )
    {
        WFCounterTask *counter = WFTaskFactory::create_counter_task("COUNTER_A", 1, nullptr);
        series->push_back(counter);
    }
    ...
}

然后每个想往series里放的task,放入的时候都配合打开一下开关;

series->push_back(my_task);
WFTaskFactory::count_by_name("COUNTER_A");

就可以做到task本身被顺序执行,又能长期使用同一个series的做法了~麻烦看看是否符合当前的需求,如果还有其他用法,欢迎多多交流~

这个需求挺常见的,其实你需要的就是一个不会自动结束的series,你可以向这个series里不断的增加任务,这样子这些任务就可以顺序的被执行。方法是每次push_back任务时,除了push_back当前任务,还需要再push_back一个目标值为1的counter任务。接下来,上一个counter打开,让当前任务可以被拉起。counter相当于一个塞子,用于堵住series,让series不会自动结束。
其实我们series的push和pop操作都是加锁的,也就是为了用户可以实现这个功能。
@holmes1412 你的示例代码在callback里push_back(counter)应该是不对的。我一会写个block series的示例。

@Barenboim 在callback里面push计数器有什么不对的地方吗

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

@Barenboim @MaybeShewill-CV 我是在执行的task的callback里检查series的状态并push_back(counter)哈

@Barenboim @MaybeShewill-CV 我是在执行的task的callback里检查series的状态并push_back(counter)哈

这个在并发访问的时候就有问题了。

@Barenboim @MaybeShewill-CV 我是在执行的task的callback里检查series的状态并push_back(counter)哈

这个在并发访问的时候就有问题了。

嗯嗯,了解,要保证原子性的话,交给series做才行~

非常感谢回答。

如果在一段时间内没有新的task,我希望回收这个serise的相关资源,比如没有你说的那个“塞子”,让这个serise结束掉,又该如何处理呢?

想让series结束,就count一下这个塞子就可以了。
counter的定义就是当count的次数达到目标值,就callback。因此可以当成塞子使用。
你说没有“塞子”是什么意思?

主要是想实现这样一个功能:
可以动态创建task,如果一段时间内没有新的任务到来就结束掉这个serise,我在想应该是有一个定时器来判断这个时间,时间到了就结束掉serise。
我说的“没有塞子”,是想用定时器来替代counter来实现这个功能,刚接触workflow不久,还没有想到如何来做这个功能

或者说我这个想法有没有实现的必要

我想了想,这个挺好做的。首先,你个每个塞子都起一个独立的名字,并且每次创建塞子,也同时启动一个定时打开塞子的定时器。上面push_back操作大概写成这样:

class BlockSeries
{
public:
	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
                int counter_id = ++this->counter_id;
		counter = WFTaskFactory::create_counter_task(std::to_string(counter_id), 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
                // 启动一个定时器,10秒之后打开塞子。
                WFTimerTask *timer = create_timer_task(10, 0, [counter_id](WFTimerTask *) {
                        WFTaskFactory::count_by_name(std::to_string(counter_id));
                });
                timer->start(); 
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(std::to_string(id - 1));
	}
private:
        int counter_id = 0;
}

大概的逻辑就是用一个定时器,到时间通过name来打开塞子。所以每个塞子的name在这里是唯一的。如果塞子已经被下一个任务打开,定时器的count_by_name操作并不会产生任何作用。
Timer所占资源非常小,可在随便创建。不用担心之前的timer没有被取消的问题。

说得很明朗了,谢谢指教,具体实施的时候我觉得还应该有一些工作要做,比如检查BlockSeries里的series是否已经被释放了,因为没有task后,最后一个counter塞子会被定时器拔掉,导致serise被释放,这时BlockSeries里的series就是一个野指针了。
不过思路给出来了,我按这个思路弄完备就完事儿了

现在这个功能也可以用WFResourcePool来实现。
https://github.com/sogou/workflow/blob/master/docs/about-conditional.md

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,

第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1
counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出
this->series->push_back(task);
this->series->push_back(counter);
第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行
WFTaskFactory::count_by_name(this->counter_name);

可以这么理解吗,大佬

你自己试一下吧。我倒不觉得你真的需要这么复杂的东西。

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,

第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1 counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr); 第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出 this->series->push_back(task); this->series->push_back(counter); 第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行 WFTaskFactory::count_by_name(this->counter_name);

可以这么理解吗,大佬

你自己试一下吧。我倒不觉得你真的需要这么复杂的东西。

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,
第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1 counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr); 第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出 this->series->push_back(task); this->series->push_back(counter); 第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行 WFTaskFactory::count_by_name(this->counter_name);
可以这么理解吗,大佬

我感觉你在看不起我(哈哈,开玩笑哈),现在就是需要这个,现在业务有个状态需要维护,然后要根据各种处理结果来维护,这个正合适的,本来用同步编程,我取数据放到内存,调用hredis 操作的话这个就简单了,这不向大佬们学习使用异步嘛。

你自己试一下吧。我倒不觉得你真的需要这么复杂的东西。

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,
第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1 counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr); 第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出 this->series->push_back(task); this->series->push_back(counter); 第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行 WFTaskFactory::count_by_name(this->counter_name);
可以这么理解吗,大佬

我感觉你在看不起我(哈哈,开玩笑哈),现在就是需要这个,现在业务有个状态需要维护,然后要根据各种处理结果来维护,这个正合适的,本来用同步编程,我取数据放到内存,调用hredis 操作的话这个就简单了,这不向大佬们学习使用异步嘛。

没有看不起,只是希望你多做一些实验,有可能问题没有你想的那么复杂。

从你刚才贴的一些代码,我觉得你还是没有看我们的文档和示例。

恩恩,我先把你这个blackseries先用上吧,我感觉很适合我的这个的

你自己试一下吧。我倒不觉得你真的需要这么复杂的东西。

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,
第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1 counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr); 第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出 this->series->push_back(task); this->series->push_back(counter); 第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行 WFTaskFactory::count_by_name(this->counter_name);
可以这么理解吗,大佬

我感觉你在看不起我(哈哈,开玩笑哈),现在就是需要这个,现在业务有个状态需要维护,然后要根据各种处理结果来维护,这个正合适的,本来用同步编程,我取数据放到内存,调用hredis 操作的话这个就简单了,这不向大佬们学习使用异步嘛。

没有看不起,只是希望你多做一些实验,有可能问题没有你想的那么复杂。

从你刚才贴的一些代码,我觉得你还是没有看我们的文档和示例。

验证了下,确实不是我要的那个场景,哈哈,但是就是说,假如我有10个任务,我想感知到第10任务执行完了后,再重新执行一个任务,这种要如何做适合,都是异步的有点不知道咋搞,又不想用成员容器来保存数据记录状态这种,就是通过任务来感知,因为我不到10个任务执行完,我直接开始第11个任务的话,有可能拿到我还处理完的数据又去处理了。

你自己试一下吧。我倒不觉得你真的需要这么复杂的东西。

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,
第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1 counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr); 第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出 this->series->push_back(task); this->series->push_back(counter); 第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行 WFTaskFactory::count_by_name(this->counter_name);
可以这么理解吗,大佬

我感觉你在看不起我(哈哈,开玩笑哈),现在就是需要这个,现在业务有个状态需要维护,然后要根据各种处理结果来维护,这个正合适的,本来用同步编程,我取数据放到内存,调用hredis 操作的话这个就简单了,这不向大佬们学习使用异步嘛。

没有看不起,只是希望你多做一些实验,有可能问题没有你想的那么复杂。
从你刚才贴的一些代码,我觉得你还是没有看我们的文档和示例。

验证了下,确实不是我要的那个场景,哈哈,但是就是说,假如我有10个任务,我想感知到第10任务执行完了后,再重新执行一个任务,这种要如何做适合,都是异步的有点不知道咋搞,又不想用成员容器来保存数据记录状态这种,就是通过任务来感知,因为我不到10个任务执行完,我直接开始第11个任务的话,有可能拿到我还处理完的数据又去处理了。

那不就是在第10个任务的callback里,push_back第11个任务吗?

10个相同的任务,比如redis del指令,我不知道哪个是第10啊,你的意思在创建第10个的时候set_callback?

放到一个series应该是串行的,但我也不知道哪个是第十个啊,任务不是异步执行的嘛,难道我加个计数?

我知道了,有个那个user_data可以用对吧

我真不想回你的问题了。。。 我教不会你。你自己看文档吧。

---原始邮件--- 发件人: @.> 发送时间: 2024年6月6日(周四) 晚上7:46 收件人: @.>; 抄送: @.@.>; 主题: Re: [sogou/workflow] 我有一个需求,希望能动态创建多个task,又希望这些task能被顺序的执行来避免多线程竞争,该如何做。 (#301) 我知道了,有个那个user_data可以用对吧 — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: @.***>

别阿大哥,能者多劳嘛,那个user_data可以用了,因为我一次10个数据处理,会建10个任务,我在建最后一个任务绑定一下user_data,待任务执行完回到call_back的时候判断一下就行了

我真不想回你的问题了。。。 我教不会你。你自己看文档吧。

---原始邮件--- 发件人: @.> 发送时间: 2024年6月6日(周四) 晚上7:46 收件人: _@**._>; 抄送: _@.@._>; 主题: Re: [sogou/workflow] 我有一个需求,希望能动态创建多个task,又希望这些task能被顺序的执行来避免多线程竞争,该如何做。 (#301) 我知道了,有个那个user_data可以用对吧 — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: _@_.*>

别阿大哥,能者多劳嘛,那个user_data可以用了,因为我一次10个数据处理,会建10个任务,我在建最后一个任务绑定一下user_data,待任务执行完回到call_back的时候判断一下就行了

不要再提问了。或者你去找个同步框架用吧。