/parapurrr

Do Purrr in Parallel

Primary LanguageR

parapurrr: Do purrr in Parallel (Alpha version)

Moosa Rezwani 2022-07-08

R-CMD-check

Short -Yet Enough- Introduction

Run purrr’s mapping functions in parallel (i.e., incorporate multiple CPU cores instead of the default, one). The package parapurrr does that by connecting purrr to foreach package and its adaptors. Users are only required to add a prefix “pa_” before their desired purrr functions (e.g., pa_map instead of map). All map family functions and all foreach adaptors on CRAN are supported. That’s it! You do not need to worry about anything else; the rest will be handled internally for you.

library(purrr)
library(parapurrr)
x <- map(1:10, sqrt)
y <- pa_map(1:10, sqrt)

# the two objects are identical, but x was computed sequentially and y was
# computed using parallel compution
identical(x, y)
#> [1] TRUE

Currently, the following map variants are supported:

  • map family: map, map_chr, map_dbl, map_df, map_dfc, map_dfr, map_int, map_lgl

  • map2 family: map2, map2_chr, map2_dbl, map2_df, map2_dfc, map2_dfr, map2_int, map2_lgl

  • Conditional map family: map_at, map_if

  • pmap family: pmap, pmap_chr, pmap_dbl, pmap_df, pmap_dfc, pmap_dfr, pmap_int, pmap_lgl

  • imap family: imap, imap_chr, imap_dbl, imap_df, imap_dfc, imap_dfr, imap_int, imap_lgl

  • walk family: walk, walk2,iwalk, pwalk


How to Install

Currently, parapurrr is only published on GitHub. To install the latest developmental version of parapurrr, You can use remotes package:

install.packages("remotes")
remotes::install_github("moosa-r/parapurrr")

A Lengthy Introduction

Broadly speaking, computations can be performed in sequential or in parallel. These terms are self-explanatory, but what we mean here by sequential is that a single CPU core will be dedicated to performing your jobs. So, your function will be applied to your input’s elements one by one, meaning that each call will have to wait for the previous call to finish (hence the term sequential). On the other hand, what we mean here by parallel is that your job will be split into multiple segments, and each segment will be sent to a separate process and then to a distinct CPU core, to be computed simultaneously (hence the term parallel).

This package aims to make the latter possible by requiring the minimum effort or background knowledge from the users. As mentioned earlier, you could simply add the prefix “pa_” before the purrr’s mapping function name (e.g., pa_map instead of map). Of course, there is much more and you have control over various aspects of the parallelization job. For instance, you can choose the backend, the parallelization strategy, and the number of workers. You can do that by using arguments available in every parapurrr function, but they are optional; So, more on that later.

Under the hood, after splitting your job into multiple segments, parapurrr will hand over each job to the famous package foreach and its adaptor. Every available foreach adaptor in CRAN is supported; Thus, you have a variety of choices based on your running environment. The adaptor, number of CPU cores to use, along with multiple other options could be altered within each parapurrr function call; Nevertheless, each of these arguments is either optional or has a default value; hence you can leave them as they are.


Does it Really Run in Parallel?

To be sure, you can check the PID within your .f function. In sequential mode, because one R process will execute your codes, we expect that all the returned PIDs to be the same:

pid_seq <- map(1:4, ~Sys.getpid())
print(pid_seq)
#> [[1]]
#> [1] 12080
#> 
#> [[2]]
#> [1] 12080
#> 
#> [[3]]
#> [1] 12080
#> 
#> [[4]]
#> [1] 12080

As you can see, all of the .f calls were performed by a single process. Now, let us check parapurrr:

pid_par <- pa_map(1:4, ~Sys.getpid())
print(pid_par)
#> [[1]]
#> [1] 2276
#> 
#> [[2]]
#> [1] 2276
#> 
#> [[3]]
#> [1] 14580
#> 
#> [[4]]
#> [1] 14580

You can see that different PIDs have been returned. This is because to do your job in parallel, different R processes were spawned or forked. You can even confirm that the number of unique PIDs does correspond to the “cores” argument. Another- perhaps more tangible- approach would be to return the time within your .f function:

# This function will wait for 3 seconds, then returns the current time:
check_time <- function(...) {
    Sys.sleep(3)
    Sys.time()
}

time_seq <- map(1:2, check_time)
time_par <- pa_map(1:2, check_time)

# You can see that in sequental mode, there is a 3 seconds difference between
# the recorded times:
print(time_seq)
#> [[1]]
#> [1] "2022-07-08 13:01:58 +0430"
#> 
#> [[2]]
#> [1] "2022-07-08 13:02:01 +0430"

# But in parallel mode, the recorded times are the same, because they were run
# togeather in parallel
print(time_par)
#> [[1]]
#> [1] "2022-07-08 13:02:05 +0430"
#> 
#> [[2]]
#> [1] "2022-07-08 13:02:05 +0430"

Exercising More Control: Extra Arguments

As stated earlier, parapurrr simply does its job by linking purr package to foreach package. Thus, there are some extra arguments which govern this process, either by directly affecting parapurrr’s behavior or by being handed down to foreach or foreach adaptor.

cores

This argument, which is the equivalent of workers, controls the number of CPU cores incorporated into your job. The default value is: number of system's CPU cores – 1. To put it in more exact terminology, it is the number of parts in which your input will be split into, and subsequently, it will be the number of child processes that will be created to do your job.

adaptor

foreach uses “adaptor” packages which make a bridge between foreach and the parallel computation package backend. There are many doPar adaptor packages available on CRAN. In fact, this is one of the reasons that I decided to rely on foreach constructs as the internal mechanism of parapurrr. My strategy is to support every available foreach adaptor packages. You can choose one using this argument. Currently parapurrr supports: doFuture, doMC, doMPI, doParallel and doSNOW. Because doParallel is distributed with R, it is the default adaptor.

cluster_type

Each foreach adaptor relies on specific architecture to perform your code in parallel. Most of the backends allow you to have different options to choose from based on your system and needs. The allowed values depend on your selected adaptor and your operation system.

Supported parallel adaptors and their available cluster types.
adaptor available cluster_type default cluster_type value
“doFuture” “multisession”
“multicore” (only on UNIX)
“cluster_FORK”, (only on UNIX)
“cluster_PSOCK”
on windows: “multisession”
on UNIX: “multicore”
“doMC” (only on UNIX) “FORK” “FORK”
“doMPI” “MPI” “MPI”
“doParallel” (default adaptor) “FORK” (only on UNIX)
“PSOCK”
on windows: “PSOCK”
on UNIX: “FORK”
“doSNOW” “MPI”
“NWS”
“SOCK”
“SOCK”

Supported parallel adaptors and their available cluster types.

Note that to use MPI, you have to setup it on your machine before starting your R session. Read the guide from Rmpi developer for Windows, Linux, or macOS.

splitter

To do your job in parallel, parapurrr will split your input into multiple segments. This splitting process is as symmetrically distributed as possible and will not consider factors other than your input’s length and available workers (cores). However, you can override this by explicitly telling parapurrr how to split the input. To do this, supply a list where each of its elements is a vector of integers or integer-like numbers (i.e., no decimal points) of the indexes of your input elements.

For example, suppose you have 2 CPU cores, and your input’s length is 6. parapurrr will split your input into 2 segments: The first segment will consist of elements 1, 2, and 3, and the second segment will have elements 4, 5 and 6:

auto_split <- pa_map(1:6, ~Sys.getpid(), cores = 2)
# You can see that the first three elements returned a unique PID and the
# second three elements returned another PID. This shows how your job was
# distributed across the workers.
print(auto_split)
#> [[1]]
#> [1] 8252
#> 
#> [[2]]
#> [1] 8252
#> 
#> [[3]]
#> [1] 8252
#> 
#> [[4]]
#> [1] 14720
#> 
#> [[5]]
#> [1] 14720
#> 
#> [[6]]
#> [1] 14720

You can change this, by for example, demanding the elements 1 and 2 to be sent it the first CPU core and the rest to the second CPU core. This is done by adding the argument splitter = list(1:2, 3:6).

man_split <- pa_map(1:6, ~Sys.getpid(), cores = 2, splitter = list(1:2, 3:6))
# You can see that the first twp elements returned a unique PID and the last
# four returned another PID. This shows how your job was distributed across the
# workers.
print(man_split)
#> [[1]]
#> [1] 10448
#> 
#> [[2]]
#> [1] 10448
#> 
#> [[3]]
#> [1] 7780
#> 
#> [[4]]
#> [1] 7780
#> 
#> [[5]]
#> [1] 7780
#> 
#> [[6]]
#> [1] 7780

Note that some complications may arise when you manually supply the splitter argument:

  1. You may supply duplicated element indexes or miss some elements. In such cases, parapurrr will halt the code execution and issues an error.

  2. There may be inconsistency between your supplied number of CPU cores (i.e., workers) and the length of the splitter list (which implicitly dictates the number of workers to use). In this case, parapurrr will issue a warning, ignores your supplied “cores” arguments, and continues the code execution based on the splitter argument.

  3. When using pa_map_if or pa_map_at, only the elements of .x which have been selected by .p or .at respectively, will be used as the input. Thus, the splitter should correspond to the selected elements of .x not .x itself. Parapurrr will try to detect this issue and inform you about it. For example, if your input has 10 elements and 4 of them will be selected by .at, your splitter should contain the indexes 1 to 4.

auto_export

The package foreach exports every object in your calling environment to the workers. By default, parapurrr will mimic this behavior and automatically exports the objects present in the environment that the parapurrr function was called to the workers. You can disable that using this argument. The default is set to TRUE for convenience; But to improve the performance and memory costs, consider turning auto_export off and manually setting the exported variables using .export argument.

Arguments passed to foreach package

Again, parapurrr uses foreach package to make your job parallel. The following arguments will be directly passed to foreach function and provide you with the means to have more control over the parallelization process. These are .export, .packages, .noexport, .errorhandling, .inorder, and .verbose. Read any parapurrr function’s manual to learn more as their manual is being directly imported from foreach. Here are some supplementary notes:

  • .error: note that this argument affects the whole segment sent to the worker (CPU core) not each list’s element independently. For example, say you have set error = “skip”, in such cases if applying your expression to an element fails, the whole segment that contains your faulty element will be skipped and omitted from the results, not that element alone. Also, when using purrr’s map vitiations which coerce the output to certain classes (e.g., pa_map_df), be careful about setting error = “message”.

  • .inorder: As stated in the functions manuals, setting this to FALSE may change the order of results’ elements. Nevertheless, parapurrr will preserve the elements’ names.


Manually handling the parallel backends

By default, when you call a parapurrr function, a series of actions will be performed internally:

  1. A Parallel cluster or any equivalent action will be initiated.

  2. The cluster will be registered as doPar backend (hence will be available for foreach).

  3. After finishing the execution of your code, the cluster and its relevant processes will be terminated and your foreach environment will be restored to the state before running the parapurrr function.

No matter what adaptor and cluster type you have chosen, the above steps will be performed. For a variety of reasons, you may want to manually override that. For example, you may want to register your doPar backend, perform a series of parapurrr functions, and then terminate the backend manually. Or, you want to register a cluster of multiple computers in your network. You can disable the automatic handling of backends and do that manually in two ways:

Run your parapurrr functions with adaptor = NULL

parapurrr will seek for any registered doPar backend and will hand over your job to it. If there were no registered backend, a warning will be generated and your function will be run in sequential mode(i.e. as if you were using purrr normally).

# No parallel backend was registered, so we would expect a warning:
x <- pa_map(1:3, ~Sys.getpid(), adaptor = NULL)
#> Warning: By calling manual_backend(TRUE) or providing adaptor = NULL, you have forced manual doPar backend handeling;
#> But you did `not` register any backends before calling this function:
#> *** Running your code in `Sequential` mode. ***

# we can confirm that by checking that only one PID was returned:
print(x)
#> [[1]]
#> [1] 12080
#> 
#> [[2]]
#> [1] 12080
#> 
#> [[3]]
#> [1] 12080
# we register our doParallel backend with 2 workers
library(parallel)
library(doParallel)
cl <- makePSOCKcluster(2)
registerDoParallel(cl)
x1 <- pa_map(1:10, ~Sys.getpid(), adaptor = NULL)
# you can run another parapurrr function and check the PIDs to see that the
# same cluster is being used:
x2 <- pa_map(1:10, ~Sys.getpid(), adaptor = NULL)
print(x1)
print(x2)

# finally, you should stop your cluster:
stopCluster(cl)

Force by manual_backend(TRUE)

This will force parapurrr functions to ignore any supplied “adaptor” argument. The rest will be identical to the first way: if any backend were registered, parapurrr will use it; else, your functions will be run in sequential mode. To revert to the automatic handling, simply run manual_backend(FALSE).

Note: This is different than using force_adaptor(). The manual_backend() will prevent any parapaurrr function from registering and handling a foreach adaptor. On the other hand, the force_adaptor() function make all parapurrr functions register, use and handle a specified foreach adaptor, while ignoring any adaptor supplied by the given parapurrr function.

manual_backend(TRUE)
# We expect 2 warnings here:
x <- pa_map(1:10, ~Sys.getpid(), adaptor = "doParallel")
#> Warning: adaptor = doParallel is ignored, because forcing manual backend handling was enabled.
#> To revert that and re-enable automatic backend registeration, run:manual_backend(FALSE)
#> Warning: By calling manual_backend(TRUE) or providing adaptor = NULL, you have forced manual doPar backend handeling;
#> But you did `not` register any backends before calling this function:
#> *** Running your code in `Sequential` mode. ***

# we can revert to the default mode:
manual_backend(FALSE)

Force a parallel backend with force_adaptor()

You can force parapurrr to use a specific foreach parallel adaptor and cluster type value in a session. The forced values will be respected throughout the session, and any values supplied by the adaptor and cluster_type in the individual parapurrr function will be ignored. This is useful in cases such as when you want to specify the adaptor once and not repeat it with each parapurrr function call. To revert to the automatic handling, simply run force_adaptor().

Note: This is different than using manual_backend(). The manual_backend() will prevent any parapaurrr function from registering and handling a foreach adaptor. On the other hand, the force_adaptor() function make all parapurrr functions register, use and handle a specified foreach adaptor, while ignoring any adaptor supplied by the given parapurrr function.

force_adaptor(force_adaptor = "doFuture")
# From now on, any parapurrr function will use Future parallell backend.  Even
# if you explicity specify another backend in a parapurrr function call, the
# values provided by force_adaptor() has more priority.  Thus, Any of theese
# function calls will use 'doFuture' adaptor:
pa_map(1:10, sqrt)
#> Warning: adaptor = doParallel is ignored, because forcing adaptor selection was enabled.
#> To revert that and re-enable automatic backend registeration, run:force_adaptor(FALSE)
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 1.414214
#> 
#> [[3]]
#> [1] 1.732051
#> 
#> [[4]]
#> [1] 2
#> 
#> [[5]]
#> [1] 2.236068
#> 
#> [[6]]
#> [1] 2.44949
#> 
#> [[7]]
#> [1] 2.645751
#> 
#> [[8]]
#> [1] 2.828427
#> 
#> [[9]]
#> [1] 3
#> 
#> [[10]]
#> [1] 3.162278
pa_map(1:10, sqrt, adaptor = NULL)
#> Warning: By calling manual_backend(TRUE) or providing adaptor = NULL, you have forced manual doPar backend handeling;
#> But you did `not` register any backends before calling this function:
#> *** Running your code in `Sequential` mode. ***
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 1.414214
#> 
#> [[3]]
#> [1] 1.732051
#> 
#> [[4]]
#> [1] 2
#> 
#> [[5]]
#> [1] 2.236068
#> 
#> [[6]]
#> [1] 2.44949
#> 
#> [[7]]
#> [1] 2.645751
#> 
#> [[8]]
#> [1] 2.828427
#> 
#> [[9]]
#> [1] 3
#> 
#> [[10]]
#> [1] 3.162278
pa_map(1:10, sqrt, adaptor = "doParallel")
#> Warning: adaptor = doParallel is ignored, because forcing adaptor selection was enabled.
#> To revert that and re-enable automatic backend registeration, run:force_adaptor(FALSE)
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 1.414214
#> 
#> [[3]]
#> [1] 1.732051
#> 
#> [[4]]
#> [1] 2
#> 
#> [[5]]
#> [1] 2.236068
#> 
#> [[6]]
#> [1] 2.44949
#> 
#> [[7]]
#> [1] 2.645751
#> 
#> [[8]]
#> [1] 2.828427
#> 
#> [[9]]
#> [1] 3
#> 
#> [[10]]
#> [1] 3.162278

# we can revert to the default mode by one of the -equivalent- following calls:
force_adaptor()
force_adaptor(NULL)
force_adaptor(force_adaptor = NULL, force_cluster_type = NULL)

Use doRNG for reproducibility

There is always the issue of reproducibility when it comes to random number generators. Of course, if this issue concerns you, you will not need any further elaboration on this matter here. doRNG makes it possible for foreach users to set seeds and do reproducible loops. You can use it in parapurrr by running use_doRNG(TRUE). This will internally replace %dopar% with %doRNG%. You revert to normal, run use_doRNG(FALSE).

library(doRNG)

set.seed(100)
x1 <- pa_map(1:2, runif)
set.seed(100)
x2 <- pa_map(1:2, runif)
# Here we expcet FALSE:
identical(x1, x2)
#> [1] FALSE

# But we can incorporate doRNG for reproducibility:
use_doRNG(TRUE)
set.seed(100)
y1 <- pa_map(1:2, runif)
set.seed(100)
y2 <- pa_map(1:2, runif)
# Here we expcet TRUE:
identical(y1, y2)
#> [1] TRUE

# We can revert to normal:
use_doRNG(FALSE)

FAQs

If parapurrr Is using foreach to do the job, will it not be faster to directly code using only foreach?

Of course! To execute your codes, multiple parapurr’s internal functions will be run internally just to pass your codes to foreach and then to the foreach adaptor. After that, the results will be handled by parapurrr again to provide you with outputs identical to what you could expect from that of purrr’s equivalent. A matter of fact is that you can even have more optimized, faster, and more efficient codes by dropping the foreach package and directly using packages such as parallel.

But, one should make a distinction between “CPU time” and what I will call here, “human time”. There is a famous quotation from Uwe ligges: “RAM is cheap and thinking hurts.” All of the above-mentioned overhead will be at the expense of CPU time (i.e. will make your scripts some fractions of a second longer to run and require more amount of your system’s resources). But at the same time, just putting some prefix before a function call without needing to do anything else is easier to implement and more convenient to code, thus will save some of your human time at the expense of CPU time.

In short, depending on various factors, you should establish a balance between performance (and reliability) and ease of coding. On the one end of this spectrum lies wrapper packages such as parapurrr and at the other end is implementing your codes more directly or even switching the language.

A final remark here is that one typically switches to parallel computation when the job takes a significantly long time to compute. In this scenario, the overhead caused by pluralization and in our case, parapurrr is neglectable.

Which adaptor Should I use?

I can’t make any recommendations. My strategy was to try to implement every available foreach adaptor in CRAN and let the user freely chose the suitable adaptor based on their system configuration, script, and in short, their judgment. Nevertheless, Because the package “parallel” is pre-distributed with R and thus, is available for every R user, to keep the required packages at a minimum, I have chosen to set it as the default parallel backend.

Why map() runs slower when I switch to parallel? shouldn’t be the opposite?

As explained earlier, switching to parallel computation have overhead or to simply put, initial costs. Because initializing the clusters, splitting the data, recombining the results, etc. takes some computation time. Thus, in cases where your expression or function run fast enough, the aforementioned time costs will overshadow the time you wish to save by running your code in parallel instead of sequential.

In short, only switch to parallel where your expression takes a significant time to run or your input has a high number o elements. Otherwise, sticking to sequential computation is a better strategy.

Why some of the variables (or objects) in my current R session are not available when I use parapurrr functions?

Simply put, in some parallelization methods, your process will be run in separate new R environments that have no access to their parent’s data. So, it is necessary to export the needed objects into the initiated R environments. foreach does this automatically by exporting every object that exists on the calling environment. parapurrr mimics this behavior and automatically exports any objects that exist in the environment where you have called the function. Nevertheless, in cases where for example, you call parapurrr within another function, some problem may arise. In such cases, You can manually export any needed object by providing their name in the .export argument.

I am calling functions from other packages within my .f, but parapurrr does not find them. Why?

You have two solutions: First, you can export that package (the same concept of exporting objects apply here), using the “.packages” argument. This way, foreach will load those packages in your workers; and their functions will be accessible. The second solution is to include the namespace in your function calls. For example, instead of rba_connection_test(), enter rbioapi::rba_connection_test().

Can I prevent parapurrr from handling parallel backend and foreach adaptor and control that manually?

Yes, It has been covered in this article.

What about furrr?

The fantastic furrr package by Davis Vaughan is the original work to provide a tool to make purrr’s function parallel by adding a prefix before the functions’ name. Although parapurrr follow the same principle, the implementations are different. The main difference is that furrrr use future parallel backend and is more mature, whereas parapurrr is in the alpha stage and uses foreach to make it possible to support a broader range of parallel backbends.

This package, parapurrr, is an extended version of the private codes I initially wrote to use purrr with R’s parallel package. I do not intend to submit parapurrr to CRAN currently and, of course, without asking for permission from the author of furrr package..


Session info

#> R version 4.1.2 (2021-11-01)
#> Platform: x86_64-w64-mingw32/x64 (64-bit)
#> Running under: Windows 10 x64 (build 19044)
#> 
#> Matrix products: default
#> 
#> locale:
#> [1] LC_COLLATE=English_United States.1252 
#> [2] LC_CTYPE=English_United States.1252   
#> [3] LC_MONETARY=English_United States.1252
#> [4] LC_NUMERIC=C                          
#> [5] LC_TIME=English_United States.1252    
#> system code page: 1256
#> 
#> attached base packages:
#> [1] stats     graphics  grDevices utils     datasets  methods   base     
#> 
#> other attached packages:
#> [1] doRNG_1.8.2     rngtools_1.5.2  foreach_1.5.2   parapurrr_0.1.0
#> [5] purrr_0.3.4    
#> 
#> loaded via a namespace (and not attached):
#>  [1] parallelly_1.30.0 knitr_1.37        magrittr_2.0.2    doParallel_1.0.17
#>  [5] rlang_1.0.2       fastmap_1.1.0     stringr_1.4.0     globals_0.14.0   
#>  [9] tools_4.1.2       parallel_4.1.2    xfun_0.30         cli_3.2.0        
#> [13] htmltools_0.5.2   iterators_1.0.14  yaml_2.3.5        digest_0.6.29    
#> [17] formatR_1.11      codetools_0.2-18  evaluate_0.15     rmarkdown_2.13   
#> [21] stringi_1.7.6     compiler_4.1.2    doFuture_0.12.0   future_1.24.0    
#> [25] listenv_0.8.0