/run-once

Primary LanguagePythonMIT LicenseMIT

Easy distributed task parallelism in Python. Inspired by Joblib but supports multi-machine distribution.

run-once

The package name is a bit of a misnomer. The idea is that you can run the same code multiple times without re-computing everything from scratch. This library supports persistent progress tracking in distributed systems.

pip install run-once

Instantly turn your loops into distributed queues: just wrap any iterable with run_once.DistributedIterator.

task_keys = ['task_001', 'task_002', ...]
for key in DistributedIterator(task_keys):
    ...

You can run this script concurrently on multiple machines (e.g. over ssh, as kubernetes jobs), and the workload will be distributed evenly across all processes. Each iteration is computed exactly once globally regardless of how many times or where the code runs.

Tutorial

Suppose you have a list of string arguments. This could be the names of model checkpoints to evaluate or data you want to preprocess.

task_keys = [f'unique_task_name_{i}' for i in range(100000)]

A typical for-loop:

for key in task_keys:
    ...  # Independent computation. 

We want to 1. run this code on multiple machines simultaneously, 2. evenly distribute the tasks, 3. avoid duplicate work, and 4. easily recover from failure. DistributedIterator achieves (2) and (3). (1) is up to you; run the code as you would normally. (4) happens in-place. Progress is persistent between restarts, so you can simply rerun the script to pick up where you left off.

for key in DistributedIterator(task_keys):
    ... # Keys are claimed on a first-come, first-served basis. Other workers will skip this iteration.

Documentation is available as comments: run_once.py, distlock.proto

Repeating failed iterations

To be able to recover from catastrophic failures, set a timeout to let unconfirmed keys expire. Expired keys will be picked up in the next iteration attempt.

for key in DistributedIterator(task_keys, timeout=60):  
    # Exclusive access to the key is granted for 60 seconds.
    try:
        ...  # This may fail.

        # Claim permanently.
        notify_success(key)
    except:
        # Optional. Request immediate expiration on failure.
        notify_failure(key)
    
    # If neither is called, the key will time out eventually.

There is a decorator version that wraps the try-except block. This is more flexible but makes more RPC calls than the iterator version.

for key in task_keys:
    @run_once.distributed_task(key, timeout=3)
    def work():
        ...  # Throw on failure.

    work()  # The wrapper decides if work() should be skipped or not.

See Usage for configuration. The server needs to listen on a port accessible from where the Python code runs.

Design

The backend is implemented as a key-value lock using LevelDB. Each string key represents a lock. The Python API will send RPC requests for exclusive access to a key. notify_success disables expiration, permanently locking the key. Locked keys stay in the database to serve as "cache-hit" lookup tables. notify_failure removes the key from the database to start over.

  • Implicit: No consumer-producer queue pattern. No worker pool.
  • Fault-tolerant: Reschedule failed tasks in-place.

When do you need this package?

  • Ad-hoc scaling across multiple arbitrary machines.
  • Running unstable code that may fail: fix bug, sync code, rerun the same script and continue where you left off, skipping successful runs.
  • Avoiding duplicate work in distributed, cached pipeline jobs. The keys could be unique identifiers like S3 object URLs.

Caching example

Serialization & deserialization need to be implemented separately. There are a lot of options. This package only provides functionality for task distribution.

# Your own serialization and upload function.
def upload(output, url: str):
    ...

# Your own download and deserialization function.
def download(url: str):
    return ...

for url in s3_urls:
    # Increment the version if you ever need to recompute from scratch.
    @run_once(url, timeout=60, version=0)
    def work():
        output = ...
        upload(output, url)  
        return output
    status, output = work()
    
    if status == Status.SKIP_OK:
        # Cache hit.
        output = download(url)  
    elif status == Status.COMPUTE_OK:
        # Cache miss. `ret` is already the output value of `work()`.
        pass  
    elif status in (Status.SKIP_IN_PROGRESS, Status.COMPUTE_ERROR):
        # Try again later when it is available. Skip for now.
        continue
    else:
        raise NotImplementedError(f'Unrecognized status: {status}')
    
    # Do something with `output`.
    ...

For single-machine task distribution and caching, you can try joblib. You can use both.

Disadvantages

  • Not ideal for short-running (<1s) tasks. Non-local network latency seems to be the bottleneck.

Usage

Install via pip.

pip install run-once

Server

distlock --db=/tmp/testdb 

See --help for all options.

$ ./distlock --help
Distributed lock service.
Usage:
  distlock [OPTION...]

      --db arg          Path to LevelDB database
      --cache_size arg  LRU cache size in MB (default: 200)
      --port arg        HTTP service port (default: 22113)
      --host arg        Hostname (default: 127.0.0.1)
  -h, --help            Print usage

Client

Create ~/.run_once.ini as follows. This step is optional if the server is accessible at 127.0.0.1:22113.

[DEFAULT]
address = <server ip address>
port = <server port>

Optionally, if the server running on a remote machine, consider forwarding a local port.

ssh -N -L 22113:localhost:22113 <host>

Alternatively via mutagen,

mutagen forward create --name=run-once tcp:localhost:22113 <host>:tcp::22113

Development

Build

Install dependencies via python-poetry.

conda create -n run-once python=3.6
conda activate run-once
poetry update

Build and run server.

bash ./build.sh
./cmake-build-release/distlock --db=/tmp/testdb --port=22113

Run tests

pytest -s distlock_test.py
pytest -s run_once_test.py