ydf0509/funboost

使用延时任务发布的时候,任务是async的就会出错,是不支持吗还是我写错了

DeemoASCII opened this issue · 5 comments

@boost("register", broker_kind=BrokerEnum.REDIS_STREAM,
       concurrent_mode=ConcurrentModeEnum.ASYNC, log_level=20)
async def register(cookie_dict: dict, meta_dict: dict, mobile: str):
       pass

register.publish({'cookie_dict': cookie_dict, 'meta_dict': meta_dict, 'mobile': mobile},
                                 priority_control_config=PriorityConsumingControlConfig(countdown=40))

报错

Error submitting job "AsyncPoolExecutor.submit (trigger: date[2022-04-09 17:55:16 CST], next run at: 2022-04-09 17:55:16 CST)" to executor "default"
Traceback (most recent call last):
  File "C:\ProgramData\Miniconda3\lib\site-packages\apscheduler\schedulers\base.py", line 979, in _process_jobs
    executor.submit_job(job, run_times)
  File "C:\ProgramData\Miniconda3\lib\site-packages\apscheduler\executors\base.py", line 71, in submit_job
    self._do_submit_job(job, run_times)
  File "C:\ProgramData\Miniconda3\lib\site-packages\apscheduler\executors\pool.py", line 28, in _do_submit_job
    f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
  File "C:\ProgramData\Miniconda3\lib\concurrent\futures\thread.py", line 161, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

while 1:
time.sleep(10)

在代码末尾加上这句话就可以了

了解了,感谢,但是觉得这里还是有点奇怪,这样做的话发布任务的方法就不能结束,得等着到下一个执行时间到了才能结束,这就跟先time.sleep()一段时间之后再直接发布任务是一样的了,跟预想的延时任务有些出入

了解了,感谢,但是觉得这里还是有点奇怪,这样做的话发布任务的方法就不能结束,得等着到下一个执行时间到了才能结束,这就跟先time.sleep()一段时间之后再直接发布任务是一样的了,跟预想的延时任务有些出入

只发布是不需要末尾加入这两行的,你肯定是启动消费了才报这个错。你截图肯定不全面肯定是是启动了消费。

消费任务本来就是永久循环不结束的,这个apschedule的backgroudshedule在python3.6不需要加入time.sleep末尾,版本高了就需要加,或者里设置boost装饰器的shedule_task_on_main_thread就不需要末尾加两行了

所以这里的延时任务是通过延时发送的方式实现的,消费端还是实时的,我之前理解的是已经发送完成了,只在消费端延时消费。非常感谢,顺便再问一个问题,这个消费有优先级的概念吗

延时任务就是立即把消息发布到消息队列的,发布完成后发布者脚本可以关了。消费时候取出来后用pashduer把任务定时丢到线程池运行