gavinwahl/django-postgres-queue

how can different message queues be isolated?

Opened this issue · 13 comments

this seems very cool, I'm going to use it

in rabbitmq, amqp, kafka etc the name of a queue is an important design capability

for example, you might have a "real time queue" and a "batch queue" with different number of workers allocated

or, a "first time queue" and a "retry queue" or "failed queue"

or just, in general, two django apps or parts of a large system that both want to use a postgres-queue without getting crosstalk among their messages

unless I misunderstand, there isn't an equivalent here?

it seems like it could be added by having an indexed queue_name field on dpq.models.Job, then having the dequeue method accept the name and use it as part of the where clause.

for backwards compatibility, None or some default name could be used; but in the future dpq.queue.Queue could have a name field that it attaches to jobs before enqueueing

another design approach might be to actually have separate tables for each logical queue name; not sure if there are advantages to table-level isolation here or not

I think most uses of multiple queues are working around the lack of priority in amqp, kafka etc. I have implemented on myself where there's a 'bulk' and 'realtime' queue to ensure high-priority tasks don't get stuck behind 1000s of bulk tasks. In dpq though, this can be accomplished more directly with priority.

Are there other interesting uses of multiple queues that can't be implemented with priority?

Thanks for the prompt response!

Good point -- here are some plausible-ish use cases that can't be handled with priority:

1- heterogeneous workers

maybe for security reasons you want to only put some SSH keys where they are needed on specific workers and not throughout your whole system (e.g. the task is SFTP-ing data to a business partner's server)

maybe you are training a neural net and want to make sure those jobs are only run on the subset of machines which have GPUs

2- dead queue

if you want to implement a "dead queue" where jobs will no longer be executed by you can manually try to fix them up; that is you can manually run a "cleanup" process that will try to pull all of the failures out of the dead queue and handle them but when the system is in its normal online state nothing should ever read from this queue

More generally, I think the strongest argument though is isolation. Just object oriented design principles -- it would be nice to be able to instantiate two Queue instances and not get cross-talk between them.

Alternatively, you could make Queue into an explicit singleton that raises an exception if you try to instantiate it twice.

  1. I have the start of something in mind for this. Queue classes have their job_model parameterized (implemented). Most of the Job model can be moved to an abstract base class (not yet implemented), which would allow easily creating your own version of it. To make a separate queue you could do:

    class SomeSpecialJob(JobBase):
        pass
    
    class SomeSpecialQueue(AtLeastOnceQueue):
        job_model = SomeSpecialJob
    
    some_special_queue = SomeSpecialQueue()
  2. I'm not sure of the utility of using the Job model to store things that can't actually be executed. Why not store them in some model specific to your purpose? That said, I have been considering a design where Job.execute_at is nullable, with null indicating a task that exists but workers will never execute. I was exploring this design to implement what celery calls chords, tasks that don't execute until other tasks have finished.

I love the idea of users being able to have their own models participate in the process.

In addition to inheritance, two other approaches would be OnetoOne relationship from user object to job-queue-item, and GenericForeignKey from job-queue-item to user object.

A neat feature of the GenericForeignKey would be that it allows you to enqueue anything, not just models that were built specifically as jobs.

Something else -- would it be possible to tighten up the interface by folding the core enqueue/wait functionality into the Job manager? It might make for a nice tight abstraction layer to have Job.objects.enqueue("task", some_model, args) and Job.objects.wait("task"). Maybe job.objects.wait() could accept a list of task names?

Another small-ball recommendation -- TextField + base64(pickle) would probably work better for args than JSON. In addition to restricting to django 1.11+, JSON also silently modifies some types (tuples become lists, ordered dicts become plain dicts). In addition, using pickle is consistent with stdlib multiprocessing. There seems to be a well-maintained PickledObjectField if you don't mind adding the dependency: https://pypi.python.org/pypi/django-picklefield.

I guess I see where you are coming from since celery moved to JSON for security reasons. Pickle makes it trivial to escalate a malicious message into arbitrary code execution. However, by putting the message queue into the database we have a huge advantage :-) There is no network communication.

Just throwing some ideas out there

I played around with it a bit today, got something working:

https://gist.github.com/kurtbrose/2a01f81d03e74e223b45ee03e73fcb62

This creates public APIs that look like this:

Messages.queue.put(some_model, 'type')
Messages.queue.get('type')

get() accepts multiple names, so you could implement the dispatching on top of it:

tasks = { 'foo': foo, 'bar': bar }
while 1:
   nxt = Messages.queue.get(tasks.keys())
   # IOU mechanism to tell which queue it came from
   try:
       tasks[name](nxt)
   except:
       pass  # discard / log failed items

I guess this loses the function call semantics as well though -- now you are getting a single instance of a model instead of args/kwargs for a function.

Anyway, just playing around with it; maybe you'd like to use some of this code or it will give you an idea.

Regarding pickle vs json, that was an intentional decision. I don't think passing complex objects is a good idea. If you want to do something like a generic foreign key, I'd rather you just put on object id in the json, because it's not like you're getting any database support from a GFK anyway.

I'm not quite happy with the enqueue/dequeue API I've got now, but I'm not sure your suggestion is any better. I'm still trying to think of some other options.

Fair resource allocation / QoS might be another good use case for named queues. You might want to rate limit or throttle so that a bunch of messages of one type don't starve others from getting any worker time.

I picked up this project to try today and I made some helper API for the enqueue functionality. It is somewhat based on how task registration happens in celery. for example:

@task('queue-name', max_retries=5)
def send_email(queue, job):
 ...

send_email.enqueue({"email": "gavinwahl@example.com", "body": "tnx"})

I tried to use slightly different names than celery in case someone is moving from that to this and at a glance you could see if this particular task had been converted yet. Also it uses some of the conventions you have established here enqueue etc.

Would something along these lines be desirable?

I also agree with the JSON being a good option vs pickle. While pickle does have the advantages you listed, I don't think they outweigh the simplicity that JSON provides.

Final comment about the different queues concept. Is it not available out of the box (edit: perhaps not see update below)? I was able to set up three queues, with varying numbers of workers listening to each queue and then register tasks to each queue.

Without changing any library code I could even have tasks that are dynamically placed on a queue at enqueue time. e.g.

# send_email is an AsyncTask instantiated in an earlier comment
send_email.enqueue({"email": ...}, queue=queue)

(side note, I am still wondering about referring to queues by name or passing their direct instance, the above is closer to what the docs currently would use)


(edit)
I reviewed the code and I can see that creating separate queues would still use the same job table behind the scenes. So separate named queues would only benefit the listen/notify mechanism and control the subset of workers who are woken for those events?

I would agree having access to namespace jobs into queues would be useful for all the reasons @kurtbrose mentions above. Built-in priorities are nice to have, but aren't a replacement for the features a separate queue provides.

I avoided using decorators to register tasks because I want to avoid import-time side effects. With decorators, there's nothing that's going to import the modules tasks are registered in. Celery does weird magic to import modules having certain names just to get their side effects.

I register them in the app ready method

I'm going to try making the separate queues. It shouldn't need much of a change to this project, and I'll make a minimal sized PR to show.