mschubert/clustermq

Trouble using clustermq as the backend for BiocParallel

bhayete-empress opened this issue · 6 comments

To begin, I've read the description here:
https://mschubert.github.io/clustermq/articles/userguide.html#troubleshooting
I've just cross-posted the issue here:
https://support.bioconductor.org/p/9152460/
The crux of the matter is that the example in the troubleshooting guide for ClusterMq works for foreach but bplapply generates errors along the liness of

(Error #1) could not find function ".bpworker_EXEC"

For reference, here's the example that I tried to reproduce:
clustermq::register_dopar_cmq(n_jobs=2, memory=1024) # this accepts same arguments as Q
x = foreach(i=1:3) %dopar% sqrt(i) # this will be executed as jobs
#> Running sequentially ('LOCAL') ...
As BiocParallel supports foreach too, this means we can run all packages that use BiocParallel on the cluster as well via DoparParam.

library(BiocParallel)
register(DoparParam()) # after register_dopar_cmq(...)
bplapply(1:3, sqrt)

I thought I'd add a small, complete, reproducible example, along with the template file:


library(foreach)
library(clustermq)
library(doParallel)
library(BiocParallel)

TIMEOUT = 10000
NJOBS = 100
options(
  clustermq.scheduler = "slurm",
  clustermq.template = 'slurmMq.tmpl',
  clustermq.data.warning=5000 #megabytes
)
register_dopar_cmq(n_jobs=NJOBS,
                   fail_on_error=FALSE,
                   verbose=TRUE,
                   log_worker=TRUE,
                   timeout = TIMEOUT, #how long to wait on MQ side
                   pkgs=c('BiocParallel'), 
                   template=list(
                     timeout=TIMEOUT, #how long to wait on SLURM side
                     memory=5000,
                     cores=1,#how many cores to use (to throttle down memory usage),
                     partition = 'compute',
                     r_path = file.path(R.home("bin"), "R")
                   )  
)

print(paste(getDoParWorkers(), "workers", sep = '_'))
p <- DoparParam()
register(p, default=TRUE)
x = foreach(i=1:300) %dopar% sqrt(i)
bpoptions(bplog=TRUE, log=TRUE, packages='BiocParallel', workers = NJOBS, tasks = NJOBS,
          exportglobals=TRUE, exportvariables=TRUE)
x2 = bplapply(1:300, sqrt, BPPARAM=p)

#!/bin/bash

#SBATCH --job-name={{ job_name }}
#SBATCH --output={{ log_file | /dev/null }}
#SBATCH --error={{ log_file | /dev/null }}
#SBATCH --mem-per-cpu={{ memory | 7000 }}
#SBATCH --partition={{ partition }} #intentionally no default - be cognizant of where you are running!
#SBATCH --array=1-{{ n_jobs }}
#SBATCH --cpus-per-task={{ cores | 1 }}
#SBATCH --time={{ timeout }}
##SBATCH --log_file="/path/to.file.%a"

CMQ_AUTH={{ auth }} {{ r_path }} --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

Yes, that's a bug - I can confirm on my machine with the CRAN version

Is this fixed in the GitHub version, or a problem across the code base?

The underlying issue is that the following expression is to be evaluated on workers

{
    task <- .task_remake(task, const.value)
    if (task$type == "EXEC")
        value <- .bpworker_EXEC(task)
    else value <- NULL
    list(value = value)
}

but foreach::getexports(expr, e=export_env, env=envir) does not correctly identify .task_remake and .bpworker_EXEC as required on the workers (both are in namespace:BiocParallel, the latter unexported).

By contrast, future:::getGlobalsAndPackages(expr, envir=envir) does identify both .bpworker_EXEC and the BiocParallel package. Using the globals package, the use should probably be:

globals::globalsOf(expr, envir=envir)

but for now I'm getting:

Error: Identified global objects via static code inspection ({; task <- .task_remake(task, const.value); if (task$type== "EXEC"); value <- .bpworker_EXEC(task); else value <- NULL; list(value = value); }). Failed to locate global object in the relevant environments: ‘task’

A possible workaround is:

register(DoparParam())
register_dopar_cmq(n_jobs=2, memory=1024, pkgs="BiocParallel", export=list(
    .bpworker_EXEC=BiocParallel:::.bpworker_EXEC,
    .log_buffer_get=BiocParallel:::.log_buffer_get,
    .log_data=BiocParallel:::.log_data,
    .log_buffer_init=BiocParallel:::.log_buffer_init,
    .VALUE=BiocParallel:::.VALUE
))
bplapply(1:3, sqrt)

Pinging @HenrikBengtsson, what would your current recommendation be to get DoparParam() working with the missing BiocParallel exports? (ideally minimizing the number of dependencies outside of foreach)

Thank you, this works for the test example; will now try in real code to double-check, but it's looking great!

This is now fixed in the current version of develop and will be included in the next CRAN release.

Thanks for reporting!