Allow passing kwargs/args to apply_async
thenewguy opened this issue · 10 comments
The celery docs show that one can specify the queue a task uses with apply_async:
>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
... queue='feed_tasks',
... routing_key='feed.import')
It would be nice to expose this in the delay_or_fail
method. My use case is a view that exposes various django model methods as web hooks. The tasks need to be routed to different queues based on how long they are expected to run. This needs to use the queues kwarg of apply_async but isn't exposed via delay_or_fail().
It would be handy if there was a way to specify the args/kwargs delay_or_fail passed to apply_async
Something like:
@classmethod
def async_or_eager(self, **options):
"""
modified the delay_or_eager method to follow the apply_async signature
"""
possible_broker_errors = self._get_possible_broker_errors_tuple()
try:
return self.apply_async(**options)
except possible_broker_errors:
return self.apply(**options)
@classmethod
def async_or_fail(self, **options):
"""
modified the delay_or_fail method to follow the apply_async signature
"""
possible_broker_errors = self._get_possible_broker_errors_tuple()
try:
return self.apply_async(**options)
except possible_broker_errors as e:
return self.simulate_async_error(e)
That looks like a good function signature, to me. I'm +1 on adding that.
Had not attempted to use the previous code. Needed to modify to match the way JobtasticTask.apply_async expects args/kwargs
@classmethod
def async_or_eager(self, **options):
"""
modified the delay_or_eager method to follow the apply_async signature
"""
args = options.pop("args", None)
kwargs = options.pop("kwargs", None)
possible_broker_errors = self._get_possible_broker_errors_tuple()
try:
return self.apply_async(args, kwargs, **options)
except possible_broker_errors:
return self.apply(args, kwargs, **options)
@classmethod
def async_or_fail(self, **options):
"""
modified the delay_or_fail method to follow the apply_async signature
"""
args = options.pop("args", None)
kwargs = options.pop("kwargs", None)
possible_broker_errors = self._get_possible_broker_errors_tuple()
try:
return self.apply_async(args, kwargs, **options)
except possible_broker_errors as e:
return self.simulate_async_error(e)
I am going to use this for a little while in an abstract task subclass to make sure there are no issues and then I will put together a pull request
That sounds excellent! If you could also give documenting these new methods a shot, that would be appreciated. It seems like immediately after the delay_or_fail section might be a good spot.
As far as testing, I think we should be able to get away with just adding the async_or_fail_FOO
and async_or_eager_FOO
permutations to BrokenBrokerTestCase and to WorkingBrokerTestCase. That should be enough coverage to keep things stable.
Looking forward to hearing how things go when you try this out.
-Wes
PR submitted with code/tests/docs. It seems like everything will pass but still running at the moment
New version up on PyPi