如何汇总一个ParallelWork的结果
duyixian1234 opened this issue · 6 comments
背景
我打算实现一个http服务,提供一个接口用来并行获取若干个http url的响应结果并汇总返回。基本上算是合并了tutorial-06-parallel_wget和tutorial-04-http_echo_server的功能。
存在的一点区别是tutorial-06-parallel_wget
只是在ParallelWork
的callback
中往标准输出中打印了每个请求的结果,我的需求是想要处理一下这些抓取结果并汇总作为http 接口的返回。
我想到的一个方案是在ParallelWork的callback里获取到当前http server task(不是很确认正确的获取方式),并设置响应。
void callback(const ParallelWork *pwork)
{
tutorial_series_context *ctx;
const void *body;
size_t size;
size_t i;
// 获取到当前http server task
HttpResponse *resp = task->get_resp();
for (i = 0; i < pwork->size(); i++)
{
ctx = (tutorial_series_context *)pwork->series_at(i)->get_context();
printf("%s\n", ctx->url.c_str());
if (ctx->state == WFT_STATE_SUCCESS)
{
ctx->resp.get_parsed_body(&body, &size);
resp->append_output_body_nocopy(body, size);
}
else
printf("ERROR! state = %d, error = %d\n", ctx->state, ctx->error);
delete ctx;
}
}
int process(WFHttpTask *server_task)
{
ParallelWork *pwork = Workflow::create_parallel_work(callback);
SeriesWork *series;
WFHttpTask *task;
HttpRequest *req;
tutorial_series_context *ctx;
int i;
for (i = 1; i < 100; i++)
{
std::string url = "http://localhost:8100/worker";
task = WFTaskFactory::create_http_task(
url,
REDIRECT_MAX, RETRY_MAX,
[](WFHttpTask *task) {
tutorial_series_context *ctx =
(tutorial_series_context *)series_of(task)->get_context();
ctx->state = task->get_state();
ctx->error = task->get_error();
ctx->resp = std::move(*task->get_resp());
});
req = task->get_req();
req->add_header_pair("Accept", "*/*");
req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
req->add_header_pair("Connection", "close");
ctx = new tutorial_series_context;
ctx->url = std::move(url);
series = Workflow::create_series_work(task, nullptr);
series->set_context(ctx);
pwork->add_series(series);
}
WFFacilities::WaitGroup wait_group(1);
Workflow::start_series_work(pwork, [&wait_group](const SeriesWork *) {
wait_group.done();
});
wait_group.wait();
return 0;
}
问题
想知道如何在ParallelWork里获取到当前请求的Http server task,或者把http server task设置到ParallelWork的上下文中
注意到ParallelWork
有这两个方法,似乎可以在callback
中设置上下文,然后在process
中读取ParallelWork
的上下文并设置http请求的返回结果。
public:
void *get_context() const { return this->context; }
void set_context(void *context) { this->context = context; }
参考http proxy的例子。然后parallel也是一种task,所以把你的parallel串到server task所在的series里,最后parallel的callback里填写resp的body就好。这是一个标准异步server的玩法。
不要在process里等你的parallel结束啊。
谢谢指点,问题解决了。对与任务调度还不是很熟悉。有机会的话还是要研读源码。 @Barenboim
感谢使用,也欢迎之后把项目开源。
目前的文档看一遍应该可以解决大多数的使用问题,更多细节可能需要看看源码,都不太复杂。
另外注意不要在任何回调函数里同步等待噢
An experiment code here : https://github.com/chanchann/workflow_annotation/blob/main/demos/19_parallel/get_all_para.cc