Add support for dramatiq async middleware
mhdzumair opened this issue · 1 comments
Issues
GitHub issues are for bugs. If you have questions, please ask them on the mailing list.
Checklist
- Does your title concisely summarize the problem?
- Did you include a minimal, reproducible example?
- What OS are you using?
- What version of Dramatiq are you using?
- What did you do?
- What did you expect would happen?
- What happened?
What OS are you using?
Debian 12
What version of Dramatiq are you using?
v1.16.0
code
class TaskManager(dramatiq.Middleware):
async def before_process_message(self, broker, message):
task_name = message.actor_name
args = message.args
kwargs = message.kwargs
# Retrieve the minimum run interval from the actor (if it's been set)
actor = broker.get_actor(task_name)
min_interval = getattr(actor, "_minimum_run_interval", None)
if not min_interval:
return
existing_task = await TaskDetail.find_one(
TaskDetail.task_name == task_name,
TaskDetail.args == args,
TaskDetail.kwargs == kwargs,
)
if existing_task:
if datetime.now() - existing_task.last_run < min_interval:
logging.warning(
f"Discarding task {task_name} due to minimum run interval."
)
raise SkipMessage(
f"Discarding task {task_name} due to minimum run interval."
)
async def after_process_message(
self, broker, message, *, result=None, exception=None
):
task_name = message.actor_name
args = message.args
kwargs = message.kwargs
existing_task = await TaskDetail.find_one(
TaskDetail.task_name == task_name,
TaskDetail.args == args,
TaskDetail.kwargs == kwargs,
)
# Update last_run or create a new task entry
if existing_task:
existing_task.last_run = datetime.now()
await existing_task.save()
else:
new_task = TaskDetail(
task_name=task_name,
args=args,
kwargs=kwargs,
last_run=datetime.now(),
)
await new_task.insert()
What did you do?
Trying to limit the background task to be run only if minimum run interval exceeding. I already used your wrapper for worker_rate_limit with ConcurrentRateLimiter and WindowRateLimiter, but not enough for me to precisely controlling what to run when. So I want to store the task function, args and kwargs in db and then validate whether to run or not. However im using async mongodb beanie library.
What did you expect would happen?
Support async before_process_message
and after_process_message
function execution.
What happened?
dramatiq/broker.py:105: RuntimeWarning: coroutine 'TaskManager.before_process_message' was never awaited
getattr(middleware, signal)(self, *args, **kwargs)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
I don't think this is in the cards. I would call run_until_complete
(or a similar function) on the event loop from the hooks.