PyO3/rust-numpy

[Question] Parallelizing over a list of PyArrays with rayon

jatentaki opened this issue · 0 comments

I have a simple extension which operates on a single PyReadonlyArray1<'_, f64> and now I want to let it take a list of such arrays and apply the same function to all of them, in parallel. The serial version of a simplified example (I'm not really implementing cumsum) is below:

#[pymodule]
fn repro(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    fn cumsum(array: ArrayView1<'_, f64>) -> Array1<f64> {
        let mut total = 0.;
        Array1::from_iter(
            array
                .as_slice()
                .expect("input not contiguous")
                .iter()
                .map(|v| {
                    total += v;
                    total
                }) 
        )
    }

    #[pyfn(m)]
    #[pyo3(name = "cumsum_single")]
    fn cumsum_single_py<'py>(
        py: Python<'py>,
        array: PyReadonlyArray1<'_, f64>,
    ) -> &'py PyArray1<f64> {
        cumsum(array.as_array()).into_pyarray(py)
    }

    #[pyfn(m)]
    #[pyo3(name = "cumsum_many_sequential")]
    fn cumsum_many_sequential_py<'py>(
        py: Python<'py>,
        arrays: Vec<PyReadonlyArray1<'_, f64>>,
    ) -> Vec<&'py PyArray1<f64>> {
        arrays.into_iter().map(|arr| cumsum_single_py(py, arr)).collect()
    }

    Ok(())
}

The problem is when I try to use rayon and turn into_iter to into_par_iter. The compiler complains as follows:

error[E0599]: the method `into_par_iter` exists for struct `Vec<PyReadonlyArray<'_, f64, Dim<[usize; 1]>>>`, but its trait bounds were not satisfied
  --> src/lib.rs:46:16
   |
46 |         arrays.into_par_iter().map(|arr| cumsum_single_py(py, arr)).collect()
   |                ^^^^^^^^^^^^^ method cannot be called on `Vec<PyReadonlyArray<'_, f64, Dim<[usize; 1]>>>` due to unsatisfied trait bounds
   |
   = note: the following trait bounds were not satisfied:
           `[PyReadonlyArray<'_, f64, Dim<[usize; 1]>>]: Sized`
           which is required by `[PyReadonlyArray<'_, f64, Dim<[usize; 1]>>]: rayon::iter::IntoParallelIterator`
           `[PyReadonlyArray<'_, f64, Dim<[usize; 1]>>]: rayon::iter::ParallelIterator`
           which is required by `[PyReadonlyArray<'_, f64, Dim<[usize; 1]>>]: rayon::iter::IntoParallelIterator`

The constraints look a bit weird on the rayon side (to my best understanding, [T]: !Sized regardless of T), but perhaps there's a workaround? What I arrived at is below:

    #[pyfn(m)]
    #[pyo3(name = "cumsum_many_rayon")]
    fn cumsum_many_rayon_py<'py>(
        py: Python<'py>,
        arrays: Vec<PyReadonlyArray1<'_, f64>>,
    ) -> Vec<&'py PyArray1<f64>> {
        let arrays: Vec<_> = arrays
            .iter()
            .map(|pa| pa.as_array())
            .collect();
        // first collect: for some reason cannot send PyReadonlyArray<_, _>,
        // with ArrayBase<ViewRepr<_>, _> it works. But they hold references
        // in a way that forces me to materialize a vector, instead of using
        // par_bridge() directly

        let results: Vec<_> = arrays
            .into_par_iter()
            .map(cumsum)
            .collect();
        // second collect: need to turn the parallel iterator back to sequential
        // for into_pyarray
        
        results
            .into_iter()
            .map(|result| result.into_pyarray(py))
            .collect()
        // third collect: to create the actual returned Python list
    }

This solution uses three individual collect calls. It is unclear to me how much overhead that is: are those structs all just holding references or am I copying big arrays of input data? Is there a better way to achieve my goal?