sogou/workflow

如何汇总一个ParallelWork的结果

duyixian1234 opened this issue · 6 comments

背景

我打算实现一个http服务,提供一个接口用来并行获取若干个http url的响应结果并汇总返回。基本上算是合并了tutorial-06-parallel_wgettutorial-04-http_echo_server的功能。

存在的一点区别是tutorial-06-parallel_wget只是在ParallelWorkcallback中往标准输出中打印了每个请求的结果,我的需求是想要处理一下这些抓取结果并汇总作为http 接口的返回。

我想到的一个方案是在ParallelWork的callback里获取到当前http server task(不是很确认正确的获取方式),并设置响应。

void callback(const ParallelWork *pwork)
{
	tutorial_series_context *ctx;
	const void *body;
	size_t size;
	size_t i;
        // 获取到当前http server task 
        HttpResponse *resp = task->get_resp();

	for (i = 0; i < pwork->size(); i++)
	{
		ctx = (tutorial_series_context *)pwork->series_at(i)->get_context();
		printf("%s\n", ctx->url.c_str());
		if (ctx->state == WFT_STATE_SUCCESS)
		{
			ctx->resp.get_parsed_body(&body, &size);
			resp->append_output_body_nocopy(body, size);
		}
		else
			printf("ERROR! state = %d, error = %d\n", ctx->state, ctx->error);

		delete ctx;
	}
}
int process(WFHttpTask *server_task)
{
	ParallelWork *pwork = Workflow::create_parallel_work(callback);
	SeriesWork *series;
	WFHttpTask *task;
	HttpRequest *req;
	tutorial_series_context *ctx;
	int i;

	for (i = 1; i < 100; i++)
	{
		std::string url = "http://localhost:8100/worker";

		task = WFTaskFactory::create_http_task(
                                   url, 
                                   REDIRECT_MAX, RETRY_MAX,
				   [](WFHttpTask *task) {
						tutorial_series_context *ctx =
						 (tutorial_series_context *)series_of(task)->get_context();
						ctx->state = task->get_state();
						ctx->error = task->get_error();
						ctx->resp = std::move(*task->get_resp());
						});

		req = task->get_req();
		req->add_header_pair("Accept", "*/*");
		req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
		req->add_header_pair("Connection", "close");

		ctx = new tutorial_series_context;
		ctx->url = std::move(url);
		series = Workflow::create_series_work(task, nullptr);
		series->set_context(ctx);
		pwork->add_series(series);
	}

	WFFacilities::WaitGroup wait_group(1);

	Workflow::start_series_work(pwork, [&wait_group](const SeriesWork *) {
		wait_group.done();
	});

	wait_group.wait();
	return 0;
}

问题

想知道如何在ParallelWork里获取到当前请求的Http server task,或者把http server task设置到ParallelWork的上下文中

注意到ParallelWork有这两个方法,似乎可以在callback中设置上下文,然后在process中读取ParallelWork 的上下文并设置http请求的返回结果。

public:
	void *get_context() const { return this->context; }
	void set_context(void *context) { this->context = context; }

参考http proxy的例子。然后parallel也是一种task,所以把你的parallel串到server task所在的series里,最后parallel的callback里填写resp的body就好。这是一个标准异步server的玩法。

不要在process里等你的parallel结束啊。

谢谢指点,问题解决了。对与任务调度还不是很熟悉。有机会的话还是要研读源码。 @Barenboim

感谢使用,也欢迎之后把项目开源。
目前的文档看一遍应该可以解决大多数的使用问题,更多细节可能需要看看源码,都不太复杂。
另外注意不要在任何回调函数里同步等待噢