plan(cluster, workers = <cluster>) has more overhead than plan(cluster, workers = <integer>)
traversc opened this issue · 6 comments
Describe the bug
I am trying to use plan(cluster)
in order to export globals and set options (#273), but it is a lot slower than plan(multisession)
.
Reproduce example
Running with plan(multisession)
# Install latest
# install.packages(c("parallelly", "future", "future.apply"))
library(parallelly)
library(future)
library(future.apply)
library(tictoc)
plan(multisession, workers = 50)
tic()
future_lapply(1:50, function(i) getOption("arrow.use_altrep"))
toc()
# 24.244 sec linux / 18 sec mac
Running with plan(cluster)
# new instance
cl <- parallelly::makeClusterPSOCK(workers=50)
plan(cluster, workers = cl)
rm(cl) # makes no difference
tic()
future_lapply(1:50, function(i) getOption("arrow.use_altrep"))
toc()
# 42.983 sec linux / 38 sec mac
Expected behavior
I expect roughly the same performance for both plans.
Session information
(Tested on various machines)
R version 4.2.0 (2022-04-22)
Platform: x86_64-apple-darwin17.0 (64-bit)
Running under: macOS Big Sur/Monterey 10.16
Matrix products: default
BLAS: /Library/Frameworks/R.framework/Versions/4.2/Resources/lib/libRblas.0.dylib
LAPACK: /Library/Frameworks/R.framework/Versions/4.2/Resources/lib/libRlapack.dylib
locale:
[1] en_US.UTF-8/en_US.UTF-8/en_US.UTF-8/C/en_US.UTF-8/en_US.UTF-8
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] tictoc_1.2 future.apply_1.11.1 future_1.33.1
[4] parallelly_1.36.0
loaded via a namespace (and not attached):
[1] compiler_4.2.0 tools_4.2.0 parallel_4.2.0 listenv_0.9.0
[5] codetools_0.2-18 digest_0.6.31 globals_0.16.2
…
> future::futureSessionInfo()
*** Package versions
future 1.33.1, parallelly 1.36.0, parallel 4.2.0, globals 0.16.2, listenv 0.9.0
*** Allocations
availableCores():
system
12
availableWorkers():
$system
[1] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"
[7] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"
*** Settings
- future.plan=<not set>
- future.fork.multithreading.enable=<not set>
- future.globals.maxSize=<not set>
- future.globals.onReference=<not set>
- future.resolve.recursive=<not set>
- future.rng.onMisuse=<not set>
- future.wait.timeout=<not set>
- future.wait.interval=<not set>
- future.wait.alpha=<not set>
- future.startup.script=<not set>
*** Backends
Number of workers: 50
List of future strategies:
1. multisession:
- args: function (..., workers = 50, envir = parent.frame())
- tweaked: TRUE
- call: plan(multisession, workers = 50)
*** Basic tests
Main R session details:
pid r sysname release
1 64842 4.2.0 Darwin 23.2.0
version
1 Darwin Kernel Version 23.2.0: Wed Nov 15 21:54:10 PST 2023; root:xnu-10002.61.3~2/RELEASE_X86_64
nodename machine login user effective_user
1 host001 x86_64 user001 user001 user001
…
Thanks for reporting. I can reproduce this with:
library(future.apply)
benchmark <- function(title) {
n <- nbrOfWorkers()
## Burn in (to rule out potentional startup costs)
void <- future_lapply(1:n, FUN = identity)
gc()
dt <- system.time({
replicate(n = 10L, { void <- future_lapply(1:n, FUN = identity) })
})
cat(sprintf("%s:\n", deparse(attr(plan(), "call"))))
print(dt)
gc()
}
nworkers <- 8L
plan(multisession, workers = nworkers)
benchmark()
plan(sequential)
plan(cluster, workers = nworkers)
benchmark()
plan(sequential)
cl <- parallelly::makeClusterPSOCK(workers = nworkers)
plan(cluster, workers = cl)
benchmark()
plan(sequential)
parallel::stopCluster(cl)
which gives:
plan(multisession, workers = nworkers):
user system elapsed
2.777 0.031 3.283
plan(cluster, workers = nworkers):
user system elapsed
2.871 0.024 3.365
plan(cluster, workers = cl):
user system elapsed
5.083 0.052 5.587
This shows that plan(cluster, workers = n)
and plan(multisession, workers = n)
has the same overhead, as expected. The only difference between these two should be that multisession passes also rscript_libs = .libPaths()
to makeClusterPSOCK()
.
Now, what's interesting is that plan(cluster, workers = cl)
comes with a greater overhead than plan(cluster, workers = n)
. It's not clear to me why this would. I'm sure the reason is obvious, when it's been tracked down, but from a quick code inspection I couldn't spot anything. The two approaches creates the same type of cluster with the same options/parameters, so I think it's something else.
The "journal" plots created by future.tools, confirm the observations:
I'll try to investigate this further.
Thanks for detailing your thought process when debugging, Henrik. I know it must take extra time to write up things like this for us, but it's both educational (I keep notes on debugging issues with the future framework) and enjoyable from a curiosity perspective to see how you think through things.
Here's a reproducible example that does not depend on future.apply;
library(future)
benchmark <- function(title) {
dt <- system.time({
replicate(n = 50L, { void <- value(future(NULL)) })
})
cat(sprintf("%s:\n", deparse(attr(plan(), "call"))))
print(dt)
}
plan(cluster, workers = 1L)
benchmark()
plan(sequential)
cl <- parallelly::makeClusterPSOCK(workers = 1L)
plan(cluster, workers = cl)
benchmark()
plan(sequential)
parallel::stopCluster(cl)
which gives:
plan(cluster, workers = 1L):
user system elapsed
0.491 0.004 1.322
plan(cluster, workers = cl):
user system elapsed
1.086 0.022 3.373
I think I've narrowed it down to:
Lines 745 to 759 in cbbd847
Basically, expr2
is bigger than expr1
below:
plan(cluster, workers = 1)
void <- value(f <- future(NULL))
expr1 <- getExpression(f)
cl <- parallelly::makeClusterPSOCK(1)
plan(cluster, workers = cl)
void <- value(f <- future(NULL))
expr2 <- getExpression(f)
causing workers = cl
to be passed along to each parallel worker in the latter case. So, the performance difference for every future launched is basically:
library(parallelly)
cl <- parallelly::makeClusterPSOCK(workers = 1L)
## plan(cluster, workers = 1L)
args <- list(workers = 1L)
dt <- system.time(void <- parallel::clusterExport(cl, "args"))
print(dt)
## plan(cluster, workers = cl)
args <- list(workers = cl)
dt <- system.time(void <- parallel::clusterExport(cl, "args"))
print(dt)
e.g.
user system elapsed
0.000 0.000 0.001
user system elapsed
0.004 0.000 0.015
I now have a prototype in the feature/getExpression-performance
branch that avoids this extra overhead. It can be installed as:
remotes::install_github("HenrikBengtsson/future", ref = "feature/getExpression-performance")
With the above benchmark example;
library(future)
benchmark <- function(title) {
dt <- system.time({
replicate(n = 50L, { void <- value(future(NULL)) })
})
cat(sprintf("%s:\n", deparse(attr(plan(), "call"))))
print(dt)
}
plan(cluster, workers = 1L)
benchmark()
plan(sequential)
cl <- parallelly::makeClusterPSOCK(workers = 1L)
plan(cluster, workers = cl)
benchmark()
plan(sequential)
parallel::stopCluster(cl)
we see that the difference is now gone;
plan(cluster, workers = 1L):
user system elapsed
0.325 0.001 0.399
plan(cluster, workers = cl):
user system elapsed
0.325 0.000 0.395
This new implementation passes all checks for:
- future
- future.apply
- doFuture
- furrr
- future.batchtools
- future.callr
- doFuture.tests.extra
- ...?
I'll merge into the develop
branch when I feel 100% confident this won't have any unknown side effects.
I ran all the tests I can think of and the patch seems good. I've merged into the develop
branch, so it'll be part of the next release.
Thanks again for reporting.
Forgot to say, the overhead would grow with the number of parallel workers in the cluster
object.