earthlab/rslurm

Specify number of cores per node for parallel MCMC chains on each node

Opened this issue · 10 comments

I am just switching over from the slurmR package to this package, because I started getting strange errors with slurmR. rslurm seems to be working for me well, so far. One of the things that I need to do is fit a Bayesian model (using cmdstanr) on multiple data sets. Each model for each data set will be estimated on its own node. So, if I have 50 data sets, I want to estimate 50 models in parallel. But each model estimation needs 4 cores for 4 parallel MCMC chains. I do not think the cpus_per_node argument will help, because my guess is it will try to put one data set on each node/cpu. I am thinking the process_per_node might work, but it doesn't seem like it - the documentation isn't entirely clear. (In slurmR I was able to specify an mc.cores=4 argument so that I could have 4 parallel chains within each node. Here's the call to cmdstan where I request 4 parallel chains:

 fit <- mod$sample(
    data = data_list,
    refresh = 0,
    chains = 4L,
    parallel_chains = 4L,
    iter_warmup = 500,
    iter_sampling = 2500,
    step_size = 0.1,
    show_messages = FALSE
  )

Thanks so much for any advice you might have. And if you'd prefer I post these questions in another forum, let me know.

Update: It looks like I don't need to specify anything as long as mc.cores is specified in my sample statement.

Hi there, I greatly apologize for not seeing this a while ago. I have moved institutions and rslurm doesn't really have a regular maintainer at the moment -- the developers have all moved on to other positions and we only occasionally look into the issues when we have time.

That said, we have gotten several different people asking about this kind of nested situation where you are using rslurm to run a function in parallel that itself is running across multiple cores. rslurm is not really set up well to deal with that. I see from your edit that you have a workable solution. Would you mind sharing your code so I can see what you did? Thanks!

No worries. The code is a little involved, but I'll include here. There is a blog post describing what I doing, and includes all of the code.

bayes_fit <- function(dx, p_sigma, m_effect, decision_rule, x) {
  
  # 1: estimate model
  
  data_list <- list(N = nrow(dx), y = dx$y, rx = dx$rx, p_mu = 0, p_sigma = p_sigma)

  fit <- mod$sample(
    data = data_list,
    refresh = 0,
    chains = 4L,
    parallel_chains = 4L,    # <------- this is where I am able to create an "internal" parallel process
    iter_warmup = 500,
    iter_sampling = 2500,
    step_size = 0.1,
    show_messages = FALSE
  )
  
  # 2: collect sample of betas from posterior
  
  df <- data.frame(as_draws_rvars(fit$draws(variables = "beta")))
  
  # 3: evaluate success based on desired decision rule
  
  if (decision_rule == 1) {
    return((mean(df$beta > 0) > 0.95))
  } else { # decision_rule == 2
    return( ((mean(df$beta > 0) > 0.95) & (mean(df$beta > m_effect ) > 0.5)) )  
  }
}

s_replicate <- function(iter, p_sigma, decision_rule, m_effect, seq) {
  
  set_cmdstan_path(path = "/gpfs/.../cmdstan/2.25.0")
  
  def <- defData(varname = "rx", formula = "1;1", dist = "trtAssign")
  def <- defData(def, varname = "y", formula = 0, variance = 1, dist = "normal")
  
  dd <- genData(end, def)
  
  freq_ps <- sapply(seq(start, end, by = by), function(x) freq_fit(dd[1:x]))
  freq_effect <- any(freq_ps < 0.05)
  
  bayes_ci <- sapply(seq(start, end, by = by), 
    function(x) bayes_fit(dd[1:x], p_sigma, m_effect, decision_rule, x))
  bayes_effect <- any(bayes_ci)
  
  return(data.table(seq, iter, p_sigma, m_effect, decision_rule, 
    freq_effect, bayes_effect))  
}

### Set simulation parameters

scenario_dt <- function(...) {
  argdt <- data.table(expand.grid(...))
  argdt[, seq := .I]
  argdt[]
}

iter <- c(1:1000)
p_sigma <- c(1, 5, 10)
decision_rule = 2
m_effect <- c(0.2, 0.3, 0.4) # if decision_rule = 2

start <- 100L
end <- 1000L
by <- 100L

scenarios <- scenario_dt(
  iter = iter, 
  p_sigma = p_sigma, 
  decision_rule = decision_rule,
  m_effect = m_effect
)

### Compile stan code

set_cmdstan_path(path = "/gpfs/.../cmdstan/2.25.0")
mod <- cmdstan_model("multiple.stan")

### Set rslurm arguments

sopts <- list(time = '12:00:00', partition = "cpu_short", `mem-per-cpu` = "5G")
sobjs <- c("freq_fit", "bayes_fit", "mod", "start", "end", "by")

### Replicate over iterations

sjob <- slurm_apply(
  f = s_replicate, # the function
  params = scenarios, # a data frame
  jobname = 'mult_i',
  nodes = 50, 
  slurm_options = sopts,
  global_objects = sobjs,
  submit = TRUE
)

### Collect the results and save them

res <- get_slurm_out(sjob, outtype = 'table', wait = TRUE)
save(res, file = "/gpfs/.../mult.rda")

So, I've done a little benchmarking with a simpler example (see at the end for the R code), and the results are indeed curious. When run with 4 parallel chains as specified in the sample statement, the chains do indeed run in parallel, but each chain takes over 3 seconds. Here is a snippet from one of the slurm_xx.out files:

Running MCMC with 4 parallel chains...

Chain 2 finished in 3.2 seconds.
Chain 1 finished in 3.5 seconds.
Chain 3 finished in 3.5 seconds.
Chain 4 finished in 3.5 seconds.

All 4 chains finished successfully.
Mean chain execution time: 3.4 seconds.
Total execution time: 3.7 seconds.

When the chains are set to run sequentially, each chain is much faster - perhaps because there is less overhead? Each chain runs under 1 second, and the total is not that different from the parallel execution. This is puzzling - maybe will make sense to you.

Running MCMC with 4 sequential chains...

Chain 1 finished in 0.9 seconds.
Chain 2 finished in 0.9 seconds.
Chain 3 finished in 0.9 seconds.
Chain 4 finished in 0.9 seconds.

All 4 chains finished successfully.
Mean chain execution time: 0.9 seconds.
Total execution time: 4.0 seconds.

I did notice that rslurm was running slower than slurmR (which I had been using for a while, but for some reason started failing), and this might be the reason.

Here is the code that I used to test this out:

library(simstudy)
library(data.table)
library(parallel)
library(cmdstanr)
library(posterior)
library(rslurm)

bayes_fit <- function(dx) {
  
  data_list <- list(N=nrow(dx), y=dx$y, rx=dx$rx, p_mu=0, p_sigma=10)

  fit <- mod$sample(
    data = data_list,
    refresh = 0,
    chains = 4L,
    parallel_chains = 4,    # <--------------- to compare with serial chains, set to 1
    iter_warmup = 1000,
    iter_sampling = 4000,
    show_messages = FALSE
  )
  
  df <- data.frame(as_draws_rvars(fit$draws(variables = "beta")))
  median(df$beta)
  
}

s_replicate <- function(iter) {
  
  set_cmdstan_path(path = "/gpfs/share/apps/cmdstan/2.25.0")
  
  def <- defData(varname = "rx", formula = "1;1", dist = "trtAssign")
  def <- defData(def, varname = "y", formula = "0 + 5*rx", variance = 25, dist = "normal")
  
  dd <- genData(250, def)
  
  bayes_fit(dd)
}

set_cmdstan_path(path = "/gpfs/share/apps/cmdstan/2.25.0")
mod <- cmdstan_model("simple.stan")

###

sopts <- list(time = '12:00:00', partition = "cpu_short", `mem-per-cpu` = "5G")
sobjs <- c("bayes_fit", "mod")

sjob <- slurm_map(
  x = as.list(1:1000),
  f = s_replicate, # the function
  jobname = 'simp_i',
  nodes = 50, 
  slurm_options = sopts,
  global_objects = sobjs,
  submit = TRUE
)

res <- data.table(get_slurm_out(sjob, outtype = 'table', wait = TRUE))

path <- "/gpfs/.../data/"
fn <- "simple.rda"

save(res, file = paste0(path, fn))

Thanks a lot for testing that out. I think unfortunately that the "nested" parallelism is not working properly for you. The default cpus_per_node for slurm_map() is 2. So your job as written will use 100 cores total. But rslurm will only allocate 1 core per element of slurm_map(), or dataframe row if you were using slurm_apply(). So cmdstan "sees" that it is supposed to run 4 chains in parallel but only has access to one core and runs them sequentially.

I definitely appreciate that it would be a useful feature to have nested parallelism. I just looked back at old issues and recalled #61 which is essentially the same issue. You can see there that I decided not to try to add that feature. I think it would require rewriting a lot of the code. The underlying problem is that rslurm was written a few years ago when the best available parallel back-end was the parallel package which cannot really nest in that way. Since then the future package has really taken off. It would probably be better to modernize rslurm by replacing the old back-end with a new one based on future, which means we could allow nested parallel structure like you want to use. In fact I use future to fit Stan or brms models in parallel myself, though I cannot use rslurm for that and have to submit the job scripts to the remote cluster "the old fashioned way."

I really would like to implement that major change, but I am not sure I will be able to commit the time anytime soon. I hate that this is an unsatisfying answer!

I guess no single package is ever perfect, and there is never enough time to make it so (unless you are Hadley Wickham and get paid to do it). I face that issue myself. And now, I have found one key difference between rslurm and slurmR, where nested parallel processes have been implemented - I think using parallel, but I could be wrong. (The other key one being that slurmR stopped working for some unknown reason - and the developer also has a new job and no time.) I guess I will have to learn how to do this the "old-fashioned" way, since I had the advantage of using rslurm and slurmR from the start. Thanks again for your help.

Well I would start here: https://rpubs.com/mvuorre/brms-parallel . That's what I used to get started. That requires you to write the R script that manually sets up the backend with future, then write a shell script to submit it to the cluster. Annoying but for now that would probably be the best way to solve your problem. I really do want to revisit this when I have time and try to add that feature ... I'll leave this issue open to remind myself to do it at some point!

If I understand well, the timings you report are generated by brms itself from running the 4 chains sequentially vs. in parallel, so presumably each case is within one execution of bayes_fit. In that case it should not include the rslurm overhead since this occurs outside the replicated function. It would, however, include the overhead cost of parallelizing in Stan. The fact that the totals match may be a coincidence, which you can check by doubling the # of iterations for example (this should not affect the overhead portion of the timing).

Just to make sure, did you run the bayesfit function on your cluster by calling sbatch manually (i.e. outside of any R-SLURM interaction package) to see what the execution time was?

Just to be clear, I am not using brms, but coding directly in Stan. But, that is not really relevant.

Yes, each case is within one execution of bayes_fit. As for overhead, I was imagining the overhead associated with the call to Stan, but that seems minimal. I can tell that it is running in sequence because when I specify 4 parallel chains, each chain seems to wait for the others to finish before finishing. This is the same behavior I observe on my local machine when I specify 4 parallel chains but limit myself to a single core. When I specify 4 cores on my local machine, the total time is under 1 second (just longer than the longest chain), so it is clear that on my local machine the chains are running parallel.

Hi, I misunderstood the issue with the new code, it's not the overhead but still the lack of parallelization within a replicate. I wonder if it would make a difference if used in RStan vs. cmdstan since RStan uses the R parallelization infrastructure just like rslurm does (I could check later.)

However, if you're going to run 1000 replicates, it might not matter that much whether your individual replicates are parallelized, since it's just a matter of running more replicates at once serially vs. fewer replicates at once with nested parallelism, for the same computing resources.

The original design for rslurm was based on the idea that if you're going to run a function say 1000 times, it's wasteful in terms of overhead to create 1000 slurm jobs in an array (in fact most clusters would put a limit on that number), especially if each function execution takes a few seconds. Therefore, rslurm automatically packs the replicates of the function into fewer jobs based on how many processes you realistically expect to run in parallel. I haven't checked slurmR in detail, but it might be better suited for the case where each replicate of the function is more complex and could be considered one job in the array.

Yes - I have been considering each line of my data frame (using apply) or each element of the list (using MAP) that contain the replication parameters as its own job in the array - much easier for me to conceptualize, especially when things get more complicated. And if I have 1000 replications (i.e. lines in my data frame or elements in my list), I typically spread them out over 50 nodes (so, 20 jobs run in sequence on each node), but I can use 4 cores on each node, so that I can run a parallel process for each replication. slurmR works great and supports this structure, but I've been getting some strange errors lately related to memory allocation. Just like you guys, they have limited time to focus on this. I appreciate that you're thinking about this.