ydf0509/funboost

使用funboost-v18.5时,注册延迟任务失败

Hujian99 opened this issue · 5 comments

在执行下面语句时。出现报错,报错信息提示未知参数countdown
delay_notify_service_logic.apply_async({"data_block": data_block_dict}, countdown=delay_seconds)

具体报错如下:
Traceback (most recent call last):
File "/Users/nothingtosay/opt/anaconda3/envs/schedule/lib/python3.8/site-packages/funboost/consumers/base_consumer.py", line 818, in _async_run_consuming_function_with_confirm_and_retry
rs = await corotinue_obj
File "/Users/nothingtosay/Documents/WorkSpace/wk_schedule/wk_schedule/logic/create_data_block_logic.py", line 42, in create_data_block_logic
delay_notify_service_logic.apply_async({"data_block": data_block_dict}, countdown=delay_seconds)
TypeError: publish() got an unexpected keyword argument 'countdown'

使用eta也存在该报错

不知道你的代码是怎么写的,没有贴出你的代码来。

image
看文档吧先,按照文档的例子来写就可以了

已经解决了,顺带问一下,为什么启动消费者后,网络流出速率非常高
image

消费者代码如下:

import asyncio
import os
import sys
from fastapi_libs.ctx.app_ctx import app_context
from fastapi_libs.script_tool.script_alive import script_alive
from fastapi_libs.asyncmsg import MsgEngine

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

from wk_schedule import app
from wk_schedule.constants.enums import AsyncMsg


@script_alive(__file__.split('/')[-1])
async def run():
    # 心跳检测
    await MsgEngine().send(AsyncMsg.heartbeat, delay=30, script_name="batch_create_task")


async def work():
    async with app_context(app):
        from wk_schedule.logic.notify_service_logic import delay_notify_service_logic
        delay_notify_service_logic.consume() 
        await run()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(work())
    loop.run_until_complete(task)

任务代码如下:

@boost("notify_service_queue", broker_kind=BrokerEnum.REDIS_ACK_ABLE, concurrent_mode=ConcurrentModeEnum.ASYNC, max_retry_times=1)
async def notify_service_logic(data_block: dict):
    if await is_time_out(data_block.get("current_overtime")):
        logger.error(f"process data[{data_block.get('data_block')} - {data_block.get('block_sn')}] timeout, current_overtime {data_block.get('current_overtime')}")
        return

    client = await DrainerClient.get(data_block.get('data_type'))
    if client is None:
        logger.error(f"process data[{data_block.get('data_type')} - {data_block.get('block_sn')}] error, not box process")
        return
    # 通知下游服务时记录本次任务触发事件(last_try_time)
    data_block['last_try_time'] = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')
    bean = DataBlock()
    await bean.init_bean(data_block)
    state, info = await client.execute(bean)
    if state not in DATA_BLOCK_STATE_RESULT:
        raise ReturnResultValueError(state)
    # 注册更新任务状态
    update_data_block_logic.apply_async({"data_block": data_block, "state": state})

大致逻辑是延迟通知下游服务器

boost装饰器可以加个qps