qinxuye/cola

任务执行完成后为什么始终不退出

brightgems opened this issue · 5 comments

Task类的run方法内有两个循环,最外面循环只有在stop事件出现后才出退出, 为什么?

def run(self):
        try:
            curr_priority = 0
            while not self.stopped.is_set():
                priority_name = 'inc' if curr_priority == self.n_priorities \
                                    else curr_priority
                is_inc = priority_name == 'inc'
                
                while not self.nonsuspend.wait(5):
                    continue
                if self.stopped.is_set():
                    break
                
                self.logger.debug('start to process priority: %s' % priority_name)
                
                last = self.priorities_secs[curr_priority]
                clock = Clock()
                runnings = []
                try:
                    no_budgets_times = 0
                    while not self.stopped.is_set():
                        if clock.clock() >= last:
                            break
                        
                        if not is_inc:
                            status = self._apply(no_budgets_times)
                            if status == CANNOT_APPLY:
                                break
                            elif status == APPLY_FAIL:
                                no_budgets_times += 1
                                if not self._has_not_finished(curr_priority) and \
                                    len(runnings) == 0:
                                    continue
                                
                                if self._has_not_finished(curr_priority) and \
                                    len(runnings) == 0:
                                    self._get_unit(curr_priority, runnings)
                            else:
                                no_budgets_times = 0
                                self._get_unit(curr_priority, runnings)
                        else:
                            self._get_unit(curr_priority, runnings)
                            
                        if len(runnings) == 0:
                            break
                        if self.is_bundle:
                            self.logger.debug(
                                'process bundle from priority %s' % priority_name)
                            rest = min(last - clock.clock(), MAX_BUNDLE_RUNNING_SECONDS)
                            if rest <= 0:
                                break
                            obj = self.executor.execute(runnings.pop(), rest, is_inc=is_inc)
                        else:
                            obj = self.executor.execute(runnings.pop(), is_inc=is_inc)
                            
                        if obj is not None:
                            runnings.insert(0, obj)  
                finally:
                    self.priorities_objs[curr_priority].extend(runnings)
                    
                curr_priority = (curr_priority+1) % self.full_priorities
        finally:
            self.counter_client.sync()
            self.save()

不退出是在分布式模式下?在分布式模式下,需要用coca命令来kill job。
在local模式下,可以通过ctrl+c退出。

原因是,在分布式模式下,任意一个worker接受信号退出是不正确的行为。

我在执行weibo数据抓取时需要知道每次执行需要多少时间,因此当所有的URL抓取完毕时希望马上看到任务结束。
这里我在branch的版本上增加了一个功能,允许配置文件里的job.size设置成auto,这时budgets值正好等于需要抓取的URL/BUNDLE任务数量,并且在URL处理结束后将调用budget_client.finish;如果有新的URL产生时调用inc_budget增加budgets。在单机和分布模式下测试通过

请教大神上面的思路是否有错

我理解你是要看到执行时间等相关运行时信息吧。

其实有个counter_client,那里面应该有执行时间等的相关信息。

和counter_client打印信息的目的不太一样. 在没有auto size之前,如果size值设置大了任务总是无法结束;如果设置小了又会出现no budget的日志导致不能处理完所有的URL.
size设置成auto后,任务可以很快结束,这样能显现分步式爬虫的性能优势

All objects have been fetched, try to finish job
Counters during running:
{'finishes': 4,
 'pages': 5,
 'processed_weibo_list_page': 5,
 'secs': 42.1710000038147}
Processing shutting down
Shutdown finished
Job id:7EaR1ZsULyX finished, spend 55.28 seconds for running
Press any key to continue . . .

OK,当初的做法是单机模式下,会检查,过一段时间没有任务的话,才会退出;分布式下不做这个处理,等待budget完成,才会退出。

你的做法是想让他有要抓取的,才会去更新budget是吧?

不知道在多个worker的时候,会不会导致任务提前终止了,这个如果能解决应该还是不错的改进。