PolicyStat/jobtastic

Allow passing kwargs/args to apply_async

thenewguy opened this issue · 10 comments

The celery docs show that one can specify the queue a task uses with apply_async:

>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
...                         queue='feed_tasks',
...                         routing_key='feed.import')

It would be nice to expose this in the delay_or_fail method. My use case is a view that exposes various django model methods as web hooks. The tasks need to be routed to different queues based on how long they are expected to run. This needs to use the queues kwarg of apply_async but isn't exposed via delay_or_fail().

It would be handy if there was a way to specify the args/kwargs delay_or_fail passed to apply_async

Something like:

    @classmethod
    def async_or_eager(self, **options):
        """
        modified the delay_or_eager method to follow the apply_async signature
        """
        possible_broker_errors = self._get_possible_broker_errors_tuple()
        try:
            return self.apply_async(**options)
        except possible_broker_errors:
            return self.apply(**options)

    @classmethod
    def async_or_fail(self, **options):
        """
        modified the delay_or_fail method to follow the apply_async signature
        """
        possible_broker_errors = self._get_possible_broker_errors_tuple()
        try:
            return self.apply_async(**options)
        except possible_broker_errors as e:
            return self.simulate_async_error(e)

That looks like a good function signature, to me. I'm +1 on adding that.

Had not attempted to use the previous code. Needed to modify to match the way JobtasticTask.apply_async expects args/kwargs

    @classmethod
    def async_or_eager(self, **options):
        """
        modified the delay_or_eager method to follow the apply_async signature
        """
        args = options.pop("args", None)
        kwargs = options.pop("kwargs", None)
        possible_broker_errors = self._get_possible_broker_errors_tuple()
        try:
            return self.apply_async(args, kwargs, **options)
        except possible_broker_errors:
            return self.apply(args, kwargs, **options)

    @classmethod
    def async_or_fail(self, **options):
        """
        modified the delay_or_fail method to follow the apply_async signature
        """
        args = options.pop("args", None)
        kwargs = options.pop("kwargs", None)
        possible_broker_errors = self._get_possible_broker_errors_tuple()
        try:
            return self.apply_async(args, kwargs, **options)
        except possible_broker_errors as e:
            return self.simulate_async_error(e)

I am going to use this for a little while in an abstract task subclass to make sure there are no issues and then I will put together a pull request

That sounds excellent! If you could also give documenting these new methods a shot, that would be appreciated. It seems like immediately after the delay_or_fail section might be a good spot.

As far as testing, I think we should be able to get away with just adding the async_or_fail_FOO and async_or_eager_FOO permutations to BrokenBrokerTestCase and to WorkingBrokerTestCase. That should be enough coverage to keep things stable.

Looking forward to hearing how things go when you try this out.

-Wes

PR submitted with code/tests/docs. It seems like everything will pass but still running at the moment

@winhamwr Have you had a chance to look over this PR?

@winhamwr not trying to pester you... but just checking back in on this one too

@winhamwr any chance of a pypi release with this PR in it?

New version up on PyPi