DavisVaughan/furrr

How to use ellipsis (...) with future_map?

Closed this issue · 3 comments

kbzsl commented

I was trying to replace naively the purrr::map() with the furrr:::future_map(), but I cannot find a way how to use the ellipsis (...) for passing the function arguments.

library(tidyverse)

fun_int <- function(max_disp, df, ...){
  df |> 
    filter(disp < max_disp) |> 
    group_by(...) |> 
    summarize(avg_mpg = mean(mpg)) |> 
    ungroup()
}

fun_int(200, mtcars)
#> # A tibble: 1 x 1
#>   avg_mpg
#>     <dbl>
#> 1    24.5

fun_int(200, mtcars, gear)
#> # A tibble: 3 x 2
#>    gear avg_mpg
#>   <dbl>   <dbl>
#> 1     3    21.5
#> 2     4    24.5
#> 3     5    25.4

fun_int(200, mtcars, gear, carb)
#> `summarise()` has grouped output by 'gear'. You can override using the `.groups` argument.
#> # A tibble: 6 x 3
#>    gear  carb avg_mpg
#>   <dbl> <dbl>   <dbl>
#> 1     3     1    21.5
#> 2     4     1    29.1
#> 3     4     2    24.8
#> 4     4     4    19.8
#> 5     5     2    28.2
#> 6     5     6    19.7


fun_map <- function(df, ...){
  tibble(max_disp= seq(100, 500, 100)) |> 
    mutate(data =  map(max_disp, fun_int, df, ...)) |> 
    unnest(data)
}

fun_map(mtcars)
#> # A tibble: 5 x 2
#>   max_disp avg_mpg
#>      <dbl>   <dbl>
#> 1      100    30.9
#> 2      200    24.5
#> 3      300    22.9
#> 4      400    21.0
#> 5      500    20.1

fun_map(mtcars, gear)
#> # A tibble: 14 x 3
#>    max_disp  gear avg_mpg
#>       <dbl> <dbl>   <dbl>
#>  1      100     4    31  
#>  2      100     5    30.4
#>  3      200     3    21.5
#>  4      200     4    24.5
#>  5      200     5    25.4
#>  6      300     3    18.3
#>  7      300     4    24.5
#>  8      300     5    25.4
#>  9      400     3    17.0
#> 10      400     4    24.5
#> 11      400     5    21.4
#> 12      500     3    16.1
#> 13      500     4    24.5
#> 14      500     5    21.4

fun_map(mtcars, gear, carb)
#> `summarise()` has grouped output by 'gear'. You can override using the `.groups` argument.
#> `summarise()` has grouped output by 'gear'. You can override using the `.groups` argument.
#> `summarise()` has grouped output by 'gear'. You can override using the `.groups` argument.
#> `summarise()` has grouped output by 'gear'. You can override using the `.groups` argument.
#> `summarise()` has grouped output by 'gear'. You can override using the `.groups` argument.
#> # A tibble: 38 x 4
#>    max_disp  gear  carb avg_mpg
#>       <dbl> <dbl> <dbl>   <dbl>
#>  1      100     4     1    31.2
#>  2      100     4     2    30.4
#>  3      100     5     2    30.4
#>  4      200     3     1    21.5
#>  5      200     4     1    29.1
#>  6      200     4     2    24.8
#>  7      200     4     4    19.8
#>  8      200     5     2    28.2
#>  9      200     5     6    19.7
#> 10      300     3     1    20.3
#> # ... with 28 more rows


future::plan("multisession", workers = 4) 

fun_furrr <- function(df, ...){
  tibble(max_disp= seq(100, 500, 100)) |> 
    mutate(data =  furrr::future_map(max_disp, fun_int, df, ...)) |> 
    unnest(data)
}

fun_furrr(mtcars)
#> # A tibble: 5 x 2
#>   max_disp avg_mpg
#>      <dbl>   <dbl>
#> 1      100    30.9
#> 2      200    24.5
#> 3      300    22.9
#> 4      400    21.0
#> 5      500    20.1

fun_furrr(mtcars, gear)
#> Error: Problem with `mutate()` column `data`.
#> i `data = furrr::future_map(max_disp, fun_int, df, ...)`.
#> x object 'gear' not found

Can you recommend a way to solve this? Thank you for your help.

This is the first common gotcha listed here https://furrr.futureverse.org/articles/articles/gotchas.html#non-standard-evaluation-of-arguments-1

Essentially in furrr we are forced to evaluate the ... before we send them off to the workers, so we try and evaluate gear on its own, which doesn't work because it doesn't "know" it should be looking in mtcars for a column called gear.

So you have two options, the first is to change to a standard-evaluation character vector based approach, like:

library(tidyverse)

fun_int <- function(max_disp, df, cols = character()){
  df |> 
    filter(disp < max_disp) |> 
    group_by(across(all_of(cols))) |> 
    summarize(avg_mpg = mean(mpg), .groups = "drop") |> 
    ungroup()
}

fun_furrr <- function(df, cols = character()) {
  tibble(max_disp = seq(100, 500, 100)) |> 
    mutate(data =  furrr::future_map(max_disp, fun_int, df, cols)) |> 
    unnest(data)
}

future::plan("multisession", workers = 4) 

fun_furrr(mtcars)
#> # A tibble: 5 × 2
#>   max_disp avg_mpg
#>      <dbl>   <dbl>
#> 1      100    30.9
#> 2      200    24.5
#> 3      300    22.9
#> 4      400    21.0
#> 5      500    20.1

fun_furrr(mtcars, "gear")
#> # A tibble: 14 × 3
#>    max_disp  gear avg_mpg
#>       <dbl> <dbl>   <dbl>
#>  1      100     4    31  
#>  2      100     5    30.4
#>  3      200     3    21.5
#>  4      200     4    24.5
#>  5      200     5    25.4
#>  6      300     3    18.3
#>  7      300     4    24.5
#>  8      300     5    25.4
#>  9      400     3    17.0
#> 10      400     4    24.5
#> 11      400     5    21.4
#> 12      500     3    16.1
#> 13      500     4    24.5
#> 14      500     5    21.4

fun_furrr(mtcars, c("gear", "cyl"))
#> # A tibble: 30 × 4
#>    max_disp  gear   cyl avg_mpg
#>       <dbl> <dbl> <dbl>   <dbl>
#>  1      100     4     4    31  
#>  2      100     5     4    30.4
#>  3      200     3     4    21.5
#>  4      200     4     4    26.9
#>  5      200     4     6    19.8
#>  6      200     5     4    28.2
#>  7      200     5     6    19.7
#>  8      300     3     4    21.5
#>  9      300     3     6    19.8
#> 10      300     3     8    16.3
#> # … with 20 more rows

The second option is to wrap this up a little nicer and do the non-standard evaluation before you call future_map(), like:

fun_furrr2 <- function(df, ...) {
  cols <- tidyselect::eval_select(rlang::expr(c(...)), data = df)
  cols <- names(cols)
  
  tibble(max_disp = seq(100, 500, 100)) |> 
    mutate(data =  furrr::future_map(max_disp, fun_int, df, cols)) |> 
    unnest(data)
}

fun_furrr2(mtcars, gear, cyl)
#> # A tibble: 30 × 4
#>    max_disp  gear   cyl avg_mpg
#>       <dbl> <dbl> <dbl>   <dbl>
#>  1      100     4     4    31  
#>  2      100     5     4    30.4
#>  3      200     3     4    21.5
#>  4      200     4     4    26.9
#>  5      200     4     6    19.8
#>  6      200     5     4    28.2
#>  7      200     5     6    19.7
#>  8      300     3     4    21.5
#>  9      300     3     6    19.8
#> 10      300     3     8    16.3
#> # … with 20 more rows
kbzsl commented

Sorry, I should read more carefully the documentation.

Just, I am wondering when (or how many times) the df dataframe is copied to a particular worker. In this simple example, only 1 worker has 2 tasks (in total 4 workers and 5 tasks), but in my application the dataframe is quite big, and the number of task is high.

Reading the referred vignette, "the grouped nature of the data frame prevents furrr from doing what it is good at - sharding the x column into equally sized groups and sending them off to the workers to process them in parallel" suggest to me that the df is copied only once for each worker. But, I am not sure that my interpretation is correct.

In the example you have above, df is copied once to each worker

The vignette example would only apply here if the data frame you called mutate(<data>, result = future_map()) on was grouped. In your case that was tibble(max_disp= seq(100, 500, 100)) and it wasn't grouped