使用funboost-v18.5时,注册延迟任务失败
Hujian99 opened this issue · 5 comments
在执行下面语句时。出现报错,报错信息提示未知参数countdown
delay_notify_service_logic.apply_async({"data_block": data_block_dict}, countdown=delay_seconds)
具体报错如下:
Traceback (most recent call last):
File "/Users/nothingtosay/opt/anaconda3/envs/schedule/lib/python3.8/site-packages/funboost/consumers/base_consumer.py", line 818, in _async_run_consuming_function_with_confirm_and_retry
rs = await corotinue_obj
File "/Users/nothingtosay/Documents/WorkSpace/wk_schedule/wk_schedule/logic/create_data_block_logic.py", line 42, in create_data_block_logic
delay_notify_service_logic.apply_async({"data_block": data_block_dict}, countdown=delay_seconds)
TypeError: publish() got an unexpected keyword argument 'countdown'
使用eta也存在该报错
不知道你的代码是怎么写的,没有贴出你的代码来。
已经解决了,顺带问一下,为什么启动消费者后,网络流出速率非常高
消费者代码如下:
import asyncio
import os
import sys
from fastapi_libs.ctx.app_ctx import app_context
from fastapi_libs.script_tool.script_alive import script_alive
from fastapi_libs.asyncmsg import MsgEngine
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from wk_schedule import app
from wk_schedule.constants.enums import AsyncMsg
@script_alive(__file__.split('/')[-1])
async def run():
# 心跳检测
await MsgEngine().send(AsyncMsg.heartbeat, delay=30, script_name="batch_create_task")
async def work():
async with app_context(app):
from wk_schedule.logic.notify_service_logic import delay_notify_service_logic
delay_notify_service_logic.consume()
await run()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(work())
loop.run_until_complete(task)
任务代码如下:
@boost("notify_service_queue", broker_kind=BrokerEnum.REDIS_ACK_ABLE, concurrent_mode=ConcurrentModeEnum.ASYNC, max_retry_times=1)
async def notify_service_logic(data_block: dict):
if await is_time_out(data_block.get("current_overtime")):
logger.error(f"process data[{data_block.get('data_block')} - {data_block.get('block_sn')}] timeout, current_overtime {data_block.get('current_overtime')}")
return
client = await DrainerClient.get(data_block.get('data_type'))
if client is None:
logger.error(f"process data[{data_block.get('data_type')} - {data_block.get('block_sn')}] error, not box process")
return
# 通知下游服务时记录本次任务触发事件(last_try_time)
data_block['last_try_time'] = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')
bean = DataBlock()
await bean.init_bean(data_block)
state, info = await client.execute(bean)
if state not in DATA_BLOCK_STATE_RESULT:
raise ReturnResultValueError(state)
# 注册更新任务状态
update_data_block_logic.apply_async({"data_block": data_block, "state": state})
大致逻辑是延迟通知下游服务器
boost装饰器可以加个qps