jean-zay-users/jean-zay-doc

Add simple example for dask-jobqueue

lesteve opened this issue ยท 17 comments

from dask_jobqueue import SLURMCluster 
cluster = SLURMCluster(
 cores=1, memory='20GB', job_extra=['--gres=gpu:1'],
 walltime='02:00:00',
 interface='ib0', job_cpu=10)

cluster.scale(jobs=1) 

from dask.distributed import Client 
client = Client(cluster) 

def func(arg): 
    return arg + 1 

print(cluster.job_script()) 

futures = [client.submit(func, i) for i in range(100)] 
result = client.gather(futures) 
print('result:', result)

So #30 was merged, but maybe we could do a super-simple example to completement that.

Any guidance or example on running a xarray (dask parallelized) job?

I am trying to do a large bootstrap computation on a netcdf data. My cluster spawning code is following -

from dask_jobqueue import SLURMCluster 

cluster = SLURMCluster(project='myproject', cores=40, processes=40, memory='160GB', walltime='10:00:00', interface='ib0', death_timeout=300)
cluster.scale(40*20) # total 800 processes

from dask.distributed import Client 
client = Client(cluster) 

# xarray dataload with chunk
# xarray computation
.
.
.
# data save

But, when I try to do this, SLURM opens 20 separate jobs (instead of 1 slurm job with 20 nodes). Is it normal? or am I missing something in the cluster creation?

To put it differently, I am trying to achieve the following job spec to run with Dask -

#!/bin/bash
#SBATCH --partition=cpu_p1
#SBATCH --account=uhy@cpu
#SBATCH --job-name=ci
#SBATCH --ntasks=800
#SBATCH --ntasks-per-node=40
#SBATCH --hint=nomultithread
#SBATCH --time=10:00:00

Additionally, jeanzay manual indicates --mem does not have any effect, but for SLURMCluster, it seems to be a necessary parameter. How can I book the full memory of a node? Any guidance?

Thanks.

Great to see that there is some interest for Dask workflows on Jean-Zay!

But, when I try to do this, SLURM opens 20 separate jobs (instead of 1 slurm job with 20 nodes). Is it normal?

I think this is normal. dask-jobqueue launches single-node jobs as far as I know. FYI, you can do cluster.scale(jobs=20) which I find easier to grok than cluster.scale(40*20).

Additionally, jeanzay manual indicates --mem does not have any effect,

OK I did not know this indeed: according to http://www.idris.fr/eng/jean-zay/cpu/jean-zay-cpu-exec_alloc-mem-eng.html the memory is computed automatically from the number of CPUs you ask. I guess if this does not have any effect you can leave it like this, if it gives you an error, you can use the header_skip parameter see http://jobqueue.dask.org/en/latest/advanced-tips-and-tricks.html#skipping-unrecognised-line-in-submission-script-with-header-skip.

Out of interest where do you run you cluster spawning script? On the login node, inside a SLURM job, somewhere else?

A limitation of using Dask on Jean-Zay, is the following (there may be more details in dask/dask-jobqueue#471)

  • a convenient way for the user is to run the cluster spawning script on a login node
  • Jean-Zay has time CPU limits (30 minutes CPU time IIRC) on the login node, running a Dask scheduler consumes ~5% CPU, so after a while (~10 hours) you will hit the CPU time limit, your scheduler will be killed, and the jobs it manages will be killed as well. This is a problem if you have long jobs.

Hi!

Great to see that there is some interest for Dask workflows on Jean-Zay!

Thanks for your enthusiasm. Your initial script helped me a lot to setup the process. I was getting by without distributed-cluster so far on jeanzay, but now it has gone out of hand with TBs of data to process.

But, when I try to do this, SLURM opens 20 separate jobs (instead of 1 slurm job with 20 nodes). Is it normal?

I think this is normal. dask-jobqueue launches single-node jobs as far as I know. FYI, you can do cluster.scale(jobs=20) which I find easier to grok than cluster.scale(40*20).

Thanks for the tip. The official dask documentation is hard to follow sometime.

Additionally, jeanzay manual indicates --mem does not have any effect,

OK I did not know this indeed: according to http://www.idris.fr/eng/jean-zay/cpu/jean-zay-cpu-exec_alloc-mem-eng.html the memory is computed automatically from the number of CPUs you ask. I guess if this does not have any effect you can leave it like this, if it gives you an error, you can use the header_skip parameter see http://jobqueue.dask.org/en/latest/advanced-tips-and-tricks.html#skipping-unrecognised-line-in-submission-script-with-header-skip.

It seems to me that the distributor in dask needs this parameter to estimate if it can complete the job or will go out of memory. Hence I choose the highest each node at jeanzay offers. This is a guess. I will test in the coming days, and will let know here if confirmed.

A limitation of using Dask on Jean-Zay, is the following (there may be more details in dask/dask-jobqueue#471)

  • a convenient way for the user is to run the cluster spawning script on a login node
  • Jean-Zay has time CPU limits (30 minutes CPU time IIRC) on the login node, running a Dask scheduler consumes ~5% CPU, so after a while (~10 hours) you will hit the CPU time limit, your scheduler will be killed, and the jobs it manages will be killed as well. This is a problem if you have long jobs.

I read the discussion yesterday night after facing similar issues. I end up following someones comment about spawning from a another node. Finally, what I did was submitting a job to prepost node in Jeanzay to run the python script to spawn the dask cluster nodes. This is nice because prepost node is shared, hours spent on prepost node does not get accounted into the allocation, and gives access to 2.8TB total RAM (48 total CPU). The timelimit is 20 hours there though, which does not help if you need some prepost job which takes days.

It seems to me that the distributor in dask needs this parameter to estimate if it can complete the job or will go out of memory.

Actually you are right the memory parameter is used both by the Dask worker as in http://distributed.dask.org/en/latest/worker.html#memory-management and for SLURM --mem. You can look at the job script generated by dask-jobqueue with print(cluster.job_script())

Also I forgot to say and you probably know this already but you may be interested to take a look at Pangeo which is a good place for Dask and geosciences applications. For example they have a Discourse forum

Actually you are right the memory parameter is used both by the Dask worker as in http://distributed.dask.org/en/latest/worker.html#memory-management and for SLURM --mem. You can look at the job script generated by dask-jobqueue with print(cluster.job_script())

Thanks for digging it out. I keep it in notes for ref.

Also I forgot to say and you probably know this already but you may be interested to take a look at Pangeo which is a good place for Dask and geosciences applications. For example they have a Discourse forum

Thanks, I have heard about it but I did not know about the forum. I will surely check it out.

One other thing that you might have info on... while testing dask I have noticed some jobs with weird jobid pattern. For example -

         2001390_1    cpu_p1 empirica  my_name  R 1-15:38:38      1 node25 
         2001390_2    cpu_p1 empirica  my_name  R 1-15:38:38      1 node26 
         2001390_3    cpu_p1 empirica  my_name  R 1-15:38:38      1 node26 
         2001390_4    cpu_p1 empirica  my_name  R 1-15:38:38      1 node26 
         2001390_6    cpu_p1 empirica  my_name  R 1-15:38:38      1 node26 
         2001390_7    cpu_p1 empirica  my_name  R 1-15:38:38      1 node26 

Notice the same job id, but with suffix. Any idea how they are submitted? It seems to be they are some kind of distributed cpu load, submitted as a part of a single job, getting a single job-id, suffixed with another id for the number of machine. They awfully looks like the dask jobs, but with spatial job pattern. With dask, I am getting different job-id for each node.

Weird those looks like SLURM job arrays (see https://crc.ku.edu/hpc/slurm/how-to/arrays for example), but I am pretty sure dask-jobqueue does not use job-arrays. I tried with a similar script as the one in the top post and I get "standard" job ids i.e. something like 2088632, 2088633, etc ...

Maybe you have a script around dask-jobqueue that uses job arrays (sbatch --array or -a) ?

Small remark, Jean-Zay people are quite security-oriented so we try our best to avoid showing info that they deem sensitive in this github repo. Such sensitive info includes your Jean-Zay logins, compute node names, and even project name. Their main argument is that this kind of info could be used for phishing. I edited your previous posts where appropriate (e.g. squeue output).

@lesteve : Thanks for editing out the sensitive info. :)

Regarding @jamal919 question, I think that's not his jobs but rather someone else's that he saw in the queue.

Weird those looks like SLURM job arrays (see https://crc.ku.edu/hpc/slurm/how-to/arrays for example), but I am pretty sure dask-jobqueue does not use job-arrays. I tried with a similar script as the one in the top post and I get "standard" job ids i.e. something like 2088632, 2088633, etc ...

Maybe you have a script around dask-jobqueue that uses job arrays (sbatch --array or -a) ?

I do not know actually. I just found them in the global job queue (not mine).

Small remark, Jean-Zay people are quite security-oriented so we try our bests to avoid showing info that they deem sensitive in this github repo. Such sensitive info includes your Jean-Zay logins, compute node names, and even project name. Their main argument is that this kind of info could be used for phishing. I edited your previous posts where appropriate (e.g. squeue output).

Thanks for editing it out. I do agree with this security policy. My bad. I was not thinking it through. Will keep it in mind.

I do not know actually. I just found them in the global job queue (not mine).

Ah OK makes sense.

Thanks for editing it out. I do agree with this security policy. My bad. I was not thinking it through. Will keep it in mind.

No worries, this kind of things are hard to guess before you know they exist ๐Ÿ˜‰

I think I have reached some consensus about what work for me, what does not. I have managed to do a distributed bootstrap computation for 600k times 4k data points and 10k iterations. The most node I could use was 4 (x40 processes). Going above was always quitting at some point due to some timeout at a node. I any case, your scripts and discussion here was big help to finish the work.

Something I noticed on the log, which does not appear always -

distributed.utils - ERROR - addresses should be strings or tuples, got None
Traceback (most recent call last):
  File "/some/conda/location/conda/lib/python3.7/site-packages/distributed/utils.py", line 665, in log_errors
    yield
  File "/some/conda/location/conda/lib/python3.7/site-packages/distributed/scheduler.py", line 2135, in remove_worker
    address = self.coerce_address(address)
  File "/some/conda/location/conda/lib/python3.7/site-packages/distributed/scheduler.py", line 4844, in coerce_address
    raise TypeError("addresses should be strings or tuples, got %r" % (addr,))
TypeError: addresses should be strings or tuples, got None
distributed.core - ERROR - addresses should be strings or tuples, got None
Traceback (most recent call last):
  File "/some/conda/location/conda/lib/python3.7/site-packages/distributed/core.py", line 408, in handle_comm
    result = handler(comm, **msg)
  File "/some/conda/location/conda/lib/python3.7/site-packages/distributed/scheduler.py", line 2135, in remove_worker
    address = self.coerce_address(address)
  File "/some/conda/location/conda/lib/python3.7/site-packages/distributed/scheduler.py", line 4844, in coerce_address
    raise TypeError("addresses should be strings or tuples, got %r" % (addr,))
TypeError: addresses should be strings or tuples, got None

Any idea where it is coming from. Some discussion at dask repo traced it back to a dependent module - dask/distributed#3389. Their suggestion is to wrap the script in if __name__=='__main__'. Do you have such experience?

I think I have reached some consensus about what work for me, what does not. I have managed to do a distributed bootstrap computation for 600k times 4k data points and 10k iterations. The most node I could use was 4 (x40 processes). Going above was always quitting at some point due to some timeout at a node. I any case, your scripts and discussion here was big help to finish the work.

Glad you got it working for your use case and it was useful!

Going above was always quitting at some point due to some timeout at a node.

Just to make sure I am following you this was happening because the jobs don't start at the same time and the first job reach it maximum time (20h on most queues) before the "total" computation (i.e. the one that is distributed) has finished. I would have thought that the Dask scheduler gets timed out first, because this is the job that start first (inside a job on a prepost node) if I have understood your setup correctly.

I am actually wondering whether dask-mpi would be an alternative. I am not very familiar with MPI, but my understanding is that it waits before all the nodes are ready before launching the computation.

I wish using dask-jobqueue (and Dask more generally) on Jean-Zay was less cumbersome but right now this is not the case unfortunately ... suggestions on how to improve the situation more than welcome!

About the addresses should be strings or tuples, got None error, I have never seen that before, dask/distributed#3386 may be relevant as well since you seem to be getting this error in remove_worker.

I would say the if __name__ == '__main__' makes sense when using a LocalCluster (or client = Client() which is using a LocalCluster behind the scenes) with multi-processing but not so much for SlurmCluster.

I would say, as long as your computation works, you may ignore this but this may point to a problem in distributed.

By the way if you want to contribute to this doc a small example with xarray on Jean-Zay, that would be appreciated ๐Ÿ˜‰!

Hello,

dask-jobqueue is a nice tool but it is limited when it comes to platform portability. Also batch schedulers configuration is very platform dependent. As pointed out in earlier comment, dask-mpi is platform and scheduler agnostic solution for spinning up dask clusters. Essentially it uses mpi4py which in turn uses an MPI distribution to spin up a Dask cluster. We can use this externally created cluster within any Dask workflows using dask.distributed.Client.

Here is a sample SLURM script I used on JZ:

#!/bin/bash

#SBATCH --time=00:30:00
#SBATCH -J dask-test
#SBATCH --nodes=2
#SBATCH --ntasks=3
#SBATCH --no-requeue
#SBATCH --exclusive

# Load an MPI distribution here as dask-mpi needs it at runtime
module load <OpenMPI>

# I am using my own conda environment that has dask, dask-mpi
source $HOME/.bashrc
conda activate dask

# Define scheduler file path and ports
export SCHEDULER_FILE=$WORK/$SLURM_JOB_ID.json
export SCHEDULER_PORT=8786
export DASHBOARD_PORT=8787

# Remove existing dask worker space
rm -rf $SCRATCH/dask-worker-space

# We spin up a dask cluster here and the cluster meta data will be written to 
# scheduler file at $SCHEDULER_FILE. We use that scheduler file to 
# connect to this cluster in Dask workflows. Note that by default one MPI process 
# is always reserved for scheduler in dask-mpi. So if we want 2 dask workers, we
# need to launch 3 MPI processes, where one will be scheduler and 2 will be workers
# Scheduler does not need one full node. In this configuration, we use two nodes and
# one worker per node. Scheduler resides on one of the nodes.
srun --ntasks-per-node 1 --ntasks 3  dask-mpi --name=test-worker --nanny --interface=ib0 --nthreads=1 --memory-limit=0 --local-directory=$SCRATCH --scheduler-file=$SCHEDULER_FILE --scheduler-port=$SCHEDULER_PORT --dashboard-address=$DASHBOARD_PORT &

# Just to make sure cluster is up and running
sleep 20

# We call our Dask workflow here and we use the scheduler file to connect to
# the cluster we revved up above
python3 dask_test.py $SCHEDULER_FILE | tee dask_test.log

And the dask_test.py file is:

import random
import time
import sys
import os

from dask.distributed import Client

if __name__ == "__main__":

    snooze = 1.0

    def inc(x):
        time.sleep(snooze * random.random())
        return x + 1

    def dec(x):
        time.sleep(snooze * random.random())
        return x - 1

    def add(x, y):
        time.sleep(snooze * random.random())
        return x + y

    print("Starting cluster_dask_test")
    # We pass in the scheduler from the invoking script
    if len(sys.argv) > 1:
        scheduler_file = sys.argv[1]
        client = Client(scheduler_file=scheduler_file)
    else:
        client = Client()

    import dask

    inc = dask.delayed(inc)
    dec = dask.delayed(dec)
    add = dask.delayed(add)

    for i in range(1):
        zs = []
        for i in range(256):
            x = inc(i)
            y = dec(x)
            z = add(x, y)
            zs.append(z)

        zs = dask.persist(*zs)
        L = zs
        while len(L) > 1:
            new_L = []
            for i in range(0, len(L), 2):
                lazy = add(L[i], L[i + 1])  # add neighbors
                new_L.append(lazy)
            L = new_L  # swap old list for new

        result = dask.compute(L, sync=True)
        assert result[0][0] == 65536
    print("Successfully finished cluster_dask_test")

    client.wait_for_workers()
    client.shutdown()

    exit(0)

This is a scheduler agnostic approach and it allows us to have more fine grained control over how we use reserve the resources. I have tested this on JZ platform and it worked pretty well. Even if you have workflows based on xarray, dask.dataframes etc, it should work. Hope that helps!!

@mahendrapaipuri looks good, if you have some spare time and want to do a Pull Request to show how to use dask-mpi on Jean-Zay, please do!

Sure. Can do that. Where does it go? doc/examples/dask?

Yep that sounds good!