meltano/sdk

Parallelize taps by partition (configurable in meltano.yml)

melgazar9 opened this issue · 2 comments

Feature scope

Taps (catalog, state, stream maps, tests, etc.)

Description

I'm considering writing my own parallelized task for a meltano tap. I'm submitting this feature request since this can be generalized to run taps in parallel by partition (or even by state, but it's less obvious how to do that IMO). I think we can use the multiprocessing library to run multi-processes by partition. Default behavior of meltano taps would run unless both of these conditions are set:

  • There is a partition set in the tap
  • The setting parallelize or multiprocessing (or multithreading ??) is set in meltano.yml

Something like this:

from multiprocessing import Process

processes = []
for i in range(<meltano processes set in meltano.yml>):
    p = Process(target=<meltano run command for specific partition>, args=args)
    p.start()
    processes.append(p)

for process in processes:
    process.join()

or similarly:

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = [<meltano run command> for cmd in range(<n processes in meltano.yml>)]
    
    for r in concurrent.futures.as_completed(results):
        r.result()

I think you can also do something like this:

executor.map(processes, args)

@melgazar9 - were you able to implement this.

Hey @iSidduHussain I haven't attempted to generalize this process and integrate it into meltano, but I did write something rough in one of my side projects. I haven't fully tested it or know if what I did is best practice following meltano guidelines, but one thing I noticed is it's referencing the same .meltano/meltano.db db even while running in parallel with different state_ids. I think this is ok since I'm auto-generating different state-ids, but I doubt this is a good idea if using the same state-id.

https://github.com/melgazar9/ds-mono-repo/blob/dev/projects/financial_elt/routes.py#L65-L131