Add functionality to cleanup errored tasks
Closed this issue · 2 comments
Failed tasks can be left in the ERROR
state and must be manually removed. There should be an easy way to purge old errored tasks.
- Add method
purge_errored_tasks(queues=None, exclude_queues=None, last_execution_before=None)
to the TaskTiger classqueues
andexclude_queues
should function similar to the way the Worker class processes them, using config values if they aren't specified in the function calllast_execution_before
seconds since the epoch that can be used to only delete errored tasks older than the specified age OR we could specify age based on last execution in seconds (e.g. tasks last executed more than 12 weeks ago = 7,257,600 seconds)
- Once tasks are identified their delete() method can be called to remove them.
- Might want to include a way to slow down the deletes in case there are millions to process and the caller doesn't want to impact Redis
Might want to include a way to slow down the deletes in case there are millions to process and the caller doesn't want to impact Redis
Will the throttled call be the primary use case? My initial thought is to have purge_errored_tasks
return a iterator where each item yielded represents processing batch_size
items, but if throttled requests isn't the primary usage pattern I might want to do something different. Example usage:
# process all
all(purge_errored_tasks(...))
# throttle batches
for n_processed in purge_errored_tasks(...):
print('Progress: {n} error\'d tasks have been purged so far')
time.sleep(.1)
print('Done')
Then the time.sleep
call could be easily replaced with some sort smarter throttle using something like limitlion
, based on current redis usage metrics, etc.
Per some offline discussion, I'll be updating the call signature from batch_size
to limit
, which makes the call pattern much better and allows it to return the number of deleted tasks rather than an iterator.
New example usage:
# processing all
limit = 10000
n_processed = limit
while n_processed == limit:
n_processed = purge_errored_tasks(..., limit=limit)