Performing chunkified tasks in parallel
Closed this issue · 3 comments
What about concurrency? How could I implement or emulate it if I have more than 1 worker for my chunkified tasks?
I see 2 ways:
- slice initial chunk on X parts (for X workers) and create X tasks with different initial chunks. For example,
[1..12]
will chunked on[1..3]
,[4..6]
,[7..9]
,[10..12]
for 4 workers. - provide
concurrency=X
parameter for@chunkify_task
decorator and implement slicing initial chunk by X initial tasks inside@chunkify_task
.
HI! As you are noticed splitting into sub-chunks is not supported, but could you please describe your real-life usecase you need such functionality for? I suppose you have a huge amount of worker and a really huge chunks, correct?
Anyway you can implement the required behaviour using intermediate dispatcher task:
@task
@chunkify_task(
sleep_timeout=5,
initial_chunk=get_initial_chunk
)
def send_push_notifications(chunk: Chunk):
for sub_chunk in split_into_chunks(chunk):
process_sub_chunk.delay(sub_chunk)
@task
def process_sub_chunk(chunk):
chunk = Chunk(*chunk)
chunked_qs = (
users_queryset
.filter(pk__range=chunk.range)
.values_list('pk', flat=True)
.order_by('pk')
)
for user_id in chunked_qs:
send_push_notifications_for_user.delay(user_id)
No, I have 4-8 workers and small amount of data to process (500-1500Mb in CSV), but I want to utilize all workers for this job.
By default, chunkificator utilize only one worker, because there is only one task is running simultaneously.
Thank you so much for your example!
Good to hear that I was helpful, just to clarify: you can use chunkificator task to schedule subtasks, so it can just create few tasks (usually it's pretty fast) and these tasks are handled by your 4-8 workers, e.g.:
@task(queue='system') # different queue
@chunkify_task(
sleep_timeout=5,
initial_chunk=get_initial_chunk
)
def process_files(chunk: Chunk):
chunked_qs = (
files_queryset
.filter(pk__range=chunk.range)
.values_list('path', flat=True)
.order_by('pk')
)
for path in chunked_qs:
# the line below is going to be executed very quickly
# since it creates a task and does not execute it inline
process_single_file.delay(path)
@task(queue='files_processing') # the workers are listening this queue
def process_single_file(file_path):
# ... handle file_path