HenrikBengtsson/future

plan(cluster, workers = <cluster>) has more overhead than plan(cluster, workers = <integer>)

traversc opened this issue · 6 comments

Describe the bug

I am trying to use plan(cluster) in order to export globals and set options (#273), but it is a lot slower than plan(multisession).

Reproduce example

Running with plan(multisession)

# Install latest
# install.packages(c("parallelly", "future", "future.apply"))
library(parallelly)
library(future)
library(future.apply)
library(tictoc)
plan(multisession, workers = 50)

tic()
future_lapply(1:50, function(i) getOption("arrow.use_altrep"))
toc()
# 24.244 sec linux / 18 sec mac

Running with plan(cluster)

# new instance
cl <- parallelly::makeClusterPSOCK(workers=50)
plan(cluster, workers = cl)
rm(cl) # makes no difference

tic()
future_lapply(1:50, function(i) getOption("arrow.use_altrep"))
toc()
# 42.983 sec linux / 38 sec mac

Expected behavior

I expect roughly the same performance for both plans.

Session information
(Tested on various machines)

R version 4.2.0 (2022-04-22)
Platform: x86_64-apple-darwin17.0 (64-bit)
Running under: macOS Big Sur/Monterey 10.16

Matrix products: default
BLAS:   /Library/Frameworks/R.framework/Versions/4.2/Resources/lib/libRblas.0.dylib
LAPACK: /Library/Frameworks/R.framework/Versions/4.2/Resources/lib/libRlapack.dylib

locale:
[1] en_US.UTF-8/en_US.UTF-8/en_US.UTF-8/C/en_US.UTF-8/en_US.UTF-8

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] tictoc_1.2          future.apply_1.11.1 future_1.33.1      
[4] parallelly_1.36.0  

loaded via a namespace (and not attached):
[1] compiler_4.2.0   tools_4.2.0      parallel_4.2.0   listenv_0.9.0   
[5] codetools_0.2-18 digest_0.6.31    globals_0.16.2> future::futureSessionInfo()
*** Package versions
future 1.33.1, parallelly 1.36.0, parallel 4.2.0, globals 0.16.2, listenv 0.9.0

*** Allocations
availableCores():
system 
    12 
availableWorkers():
$system
 [1] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"
 [7] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"


*** Settings
- future.plan=<not set>
- future.fork.multithreading.enable=<not set>
- future.globals.maxSize=<not set>
- future.globals.onReference=<not set>
- future.resolve.recursive=<not set>
- future.rng.onMisuse=<not set>
- future.wait.timeout=<not set>
- future.wait.interval=<not set>
- future.wait.alpha=<not set>
- future.startup.script=<not set>

*** Backends
Number of workers: 50
List of future strategies:
1. multisession:
   - args: function (..., workers = 50, envir = parent.frame())
   - tweaked: TRUE
   - call: plan(multisession, workers = 50)

*** Basic tests
Main R session details:
    pid     r sysname release
1 64842 4.2.0  Darwin  23.2.0
                                                                                           version
1 Darwin Kernel Version 23.2.0: Wed Nov 15 21:54:10 PST 2023; root:xnu-10002.61.3~2/RELEASE_X86_64
  nodename machine   login    user effective_user
1  host001  x86_64 user001 user001        user001

Thanks for reporting. I can reproduce this with:

library(future.apply)

benchmark <- function(title) {
  n <- nbrOfWorkers()
  ## Burn in (to rule out potentional startup costs)
  void <- future_lapply(1:n, FUN = identity)
  gc()

  dt <- system.time({
    replicate(n = 10L, { void <- future_lapply(1:n, FUN = identity) })
  })
  cat(sprintf("%s:\n", deparse(attr(plan(), "call"))))
  print(dt)
  gc()
}

nworkers <- 8L

plan(multisession, workers = nworkers)
benchmark()
plan(sequential)

plan(cluster, workers = nworkers)
benchmark()
plan(sequential)

cl <- parallelly::makeClusterPSOCK(workers = nworkers)
plan(cluster, workers = cl)
benchmark()
plan(sequential)
parallel::stopCluster(cl)

which gives:

plan(multisession, workers = nworkers):
   user  system elapsed 
  2.777   0.031   3.283 
plan(cluster, workers = nworkers):
   user  system elapsed 
  2.871   0.024   3.365 
plan(cluster, workers = cl):
   user  system elapsed 
  5.083   0.052   5.587 

This shows that plan(cluster, workers = n) and plan(multisession, workers = n) has the same overhead, as expected. The only difference between these two should be that multisession passes also rscript_libs = .libPaths() to makeClusterPSOCK().

Now, what's interesting is that plan(cluster, workers = cl) comes with a greater overhead than plan(cluster, workers = n). It's not clear to me why this would. I'm sure the reason is obvious, when it's been tracked down, but from a quick code inspection I couldn't spot anything. The two approaches creates the same type of cluster with the same options/parameters, so I think it's something else.

The "journal" plots created by future.tools, confirm the observations:

plan(multisession, workers = nworkers) by future

plan(cluster, workers = nworkers) by future

plan(cluster, workers = cl) by future

I'll try to investigate this further.

Thanks for detailing your thought process when debugging, Henrik. I know it must take extra time to write up things like this for us, but it's both educational (I keep notes on debugging issues with the future framework) and enjoyable from a curiosity perspective to see how you think through things.

Here's a reproducible example that does not depend on future.apply;

library(future)

benchmark <- function(title) {
  dt <- system.time({
    replicate(n = 50L, { void <- value(future(NULL)) })
  })
  cat(sprintf("%s:\n", deparse(attr(plan(), "call"))))
  print(dt)
}

plan(cluster, workers = 1L)
benchmark()
plan(sequential)

cl <- parallelly::makeClusterPSOCK(workers = 1L)
plan(cluster, workers = cl)
benchmark()
plan(sequential)
parallel::stopCluster(cl)

which gives:

plan(cluster, workers = 1L):
   user  system elapsed 
  0.491   0.004   1.322 
plan(cluster, workers = cl):
   user  system elapsed 
  1.086   0.022   3.373 

I think I've narrowed it down to:

future/R/Future-class.R

Lines 745 to 759 in cbbd847

## Reset future strategies when done
tmpl_exit_plan <- bquote_compile({
## covr: skip=2
.(exit)
## Reset option 'future.plan' and env var 'R_FUTURE_PLAN'
options(future.plan = .(getOption("future.plan")))
if (is.na(.(oenv <- Sys.getenv("R_FUTURE_PLAN", NA_character_))))
Sys.unsetenv("R_FUTURE_PLAN")
else
Sys.setenv(R_FUTURE_PLAN = .(oenv))
future::plan(.(strategies), .cleanup = FALSE, .init = FALSE)
## FIXME: If we move .(exit) here, then 'R CMD check' on MS Windows
## complain about leftover RscriptXXXXX temporary files. /2022-07-21
## .(exit)
})

Basically, expr2 is bigger than expr1 below:

plan(cluster, workers = 1)
void <- value(f <- future(NULL))
expr1 <- getExpression(f)

cl <- parallelly::makeClusterPSOCK(1)
plan(cluster, workers = cl)
void <- value(f <- future(NULL))
expr2 <- getExpression(f)

causing workers = cl to be passed along to each parallel worker in the latter case. So, the performance difference for every future launched is basically:

library(parallelly)
cl <- parallelly::makeClusterPSOCK(workers = 1L)

## plan(cluster, workers = 1L)
args <- list(workers = 1L)
dt <- system.time(void <- parallel::clusterExport(cl, "args"))
print(dt)

## plan(cluster, workers = cl)
args <- list(workers = cl)
dt <- system.time(void <- parallel::clusterExport(cl, "args"))
print(dt)

e.g.

   user  system elapsed 
  0.000   0.000   0.001 
   user  system elapsed 
  0.004   0.000   0.015 

I now have a prototype in the feature/getExpression-performance branch that avoids this extra overhead. It can be installed as:

remotes::install_github("HenrikBengtsson/future", ref = "feature/getExpression-performance")

With the above benchmark example;

library(future)

benchmark <- function(title) {
  dt <- system.time({
    replicate(n = 50L, { void <- value(future(NULL)) })
  })
  cat(sprintf("%s:\n", deparse(attr(plan(), "call"))))
  print(dt)
}

plan(cluster, workers = 1L)
benchmark()
plan(sequential)

cl <- parallelly::makeClusterPSOCK(workers = 1L)
plan(cluster, workers = cl)
benchmark()
plan(sequential)
parallel::stopCluster(cl)

we see that the difference is now gone;

plan(cluster, workers = 1L):
   user  system elapsed 
  0.325   0.001   0.399 

plan(cluster, workers = cl):
   user  system elapsed 
  0.325   0.000   0.395

This new implementation passes all checks for:

I'll merge into the develop branch when I feel 100% confident this won't have any unknown side effects.

I ran all the tests I can think of and the patch seems good. I've merged into the develop branch, so it'll be part of the next release.

Thanks again for reporting.

Forgot to say, the overhead would grow with the number of parallel workers in the cluster object.