JaneliaSciComp/bigstream

dask variables do not fit LocalCluster

Opened this issue · 3 comments

Here you set the variables for the dask disterbuted

but they only fit

janelia_lsf_cluster
and not local_cluster
from dask.distributed import Client, LocalCluster

e.g. ncpus/threads/min_workers are not accepted variable for LocalCluster - https://distributed.dask.org/en/latest/api.html#cluster

Hi again Oren!

Yes this part of the code is very poorly documented. It's just always been a work in progress in my mind so I haven't committed to the engineering and thus haven't documented it sufficiently. Here's an explanation on this particular issue that I wrote out for someone else a few days ago:

There is a very simple solution to this issue - the code isn't bugged or anything - the major problem is that the documentation for the dask cluster aspects of the code is very very incomplete. I'm really sorry if this has caused any frustration or delay. It's simply a matter of time, it's really hard to write code that works and then in some ways even harder (and more time consuming) to describe how it works in plain language.

So the cluster_kwargs that you've used in the past eventually get passed to this function in my little ClusterWrap package: https://github.com/GFleishman/ClusterWrap/blob/0975096acda228e1443f398c715ba1b316a88d41/ClusterWrap/clusters.py#L64. You'll recognize that a lot of the arguments are briefly described in the docstring there. Any argument that goes in cluster_kwargs which is not present in that docstring would be described here instead (since kwargs get passed to the dask_jobqueue.LSFCluster object): https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.LSFCluster.html#dask_jobqueue.LSFCluster

HOWEVER - all of that only applies when you're working on an LSF cluster, like the Janelia cluster. When you're working locally you don't want to use an LSFCluster object you want to use a LocalCluster object. Bigstream works this out for you, but it doesn't (though it should) tell you that you need a different set of parameters to work with a LocalCluster object.

Here is the function that gets called to build the LocalCluster when you're working locally: https://github.com/GFleishman/ClusterWrap/blob/0975096acda228e1443f398c715ba1b316a88d41/ClusterWrap/clusters.py#L237. So you see that function doesn't take many of its own arguments, it's just a very thin wrapper around this object: https://distributed.dask.org/en/stable/api.html#distributed.LocalCluster; so those are the parameters you have to play with locally. You'll need to understand a little bit about how resources are different on a cluster vs. your workstation. Locally you have many less cpu cores to use (maybe 4, 8, 10, or 16 maximum) and far less RAM (typically between 16GB and 256GB depending on what kind of machine you're using). Let's say I had a machine with 8 cpu cores and 32GB memory. I might set up my LocalCluster like this:

cluster_kwargs = {
    'n_workers':3,
    'threads_per_worker':2,
    'memory_limit':'8GB',
}

This would create three different dask workers (allowing three alignments to run in parallel at a time) each worker having 2 threads (allowing for some multithreading within each alignment - speeds up the registrations). Note memory_limit is per worker so we're saying each worker can use 8GB RAM maximum. This configuration uses about 75% of the local machine to do the bigstream job and leaves 25% of the resources untouched to make sure you can still use your browser, Fiji, or whatever else you might be using the same machine for at the time.

Ok - I hope this is still relevant, and a lot of people have been asking for the same information so I really need to get better documentation on this into the package somehow (probably will just copy+paste this explanation on github at least for now).

Let me know if that's sufficient information to consider the issue addressed for now so I can close, otherwise I'm happy to discuss more.

Thanks a lot @GFleishman, might worth adding it to the exmaples, as most ppl will not have acces to LSF cluster and will use it on their desktop (At least for the first time).
This is the change that I've done in my code:
c = {'processes': 20, 'threads_per_worker': 1, 'n_workers': 20}