pangeo-data/rechunker

workers exceeding max_mem setting

rsignell-usgs opened this issue ยท 20 comments

A colleague and I were struggling to rechunk a zarr dataset using this workflow: https://nbviewer.jupyter.org/gist/rsignell-usgs/89b61c3dc53d5107e70cf5574fc3c833

After much trial and error, we discovered that we needed to increase the worker size to 8GB and decrease max_mem to 3GB to avoid workers running out of memory and the cluster dying with "killed_worker".

Watching the dask dashboard shows a number of the workers spiking over 5GB, despite setting max_mem to 3GB:
2021-10-04_16-16-26

When we looked at the worker logs we saw tons of these warnings:

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 5.66 GiB -- Worker memory limit: 8.00 GiB

Is this expected behavior?

Thanks for reporting Rich!

This ability to track unmanaged memory is new for Dask. So overall this is progress. At least we know that rechunker is not explicitly using this memory, as it is designed not to!

I would try with the trick discussed here: dask/distributed#2602 (comment)

Set {"MALLOC_TRIM_THRESHOLD_": "0"} in the environment variables on your dask workers. See if that improves things.

Thanks @rabernat for the idea.

I'm not sure I enabled it correctly, however.

I added this line to my notebook just after I imported dask, before I created the cluster:

dask.config.set({"MALLOC_TRIM_THRESHOLD_": "0"})

Didn't seem to have any impact -- I'm getting the same behavior as before, with memory going way above 3GB and lots of the same unmanaged memory warnings in the logs.

Do I need a dask worker plugin or something?

Yeah that's not right. You need to set an environment variable on the workers. The way you do this depends on how you are creating your dask cluster.

How are you creating your dask cluster? Dask gateway?

@rabernat, yes, Dask Gateway. But the Dask Gateway on Qhub for some reason is not configured to take a dict of environment variables on cluster creation (right @dharhas?)

So this time I created the cluster and then did client.run() on the workers to set that environment variable on the Dask workers:

def set_env(k,v):
    import os
    os.environ[k]=v
    
client.run(set_env,'MALLOC_TRIM_THRESHOLD_','0')

but in the worker logs I still see lots of:

distributed.worker - INFO - Run out-of-band function 'set_env'

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 5.68 GiB -- Worker memory limit: 8.00 GiB

BTW, the workflow should be reproducible, as we are reading from an S3 requester-pays bucket. One would need to supply one's own S3 bucket for writing, of course.

client.run(set_env,'MALLOC_TRIM_THRESHOLD_','0')

Still not the right way to do it. You need to use cluster options (as in the notebook linked from the issue linked above).

from dask_gateway import Gateway
g = Gateway()
options = g.cluster_options()
options.environment = {"MALLOC_TRIM_THRESHOLD_": "0"}
cluster = g.new_cluster(options)

@rsignell-usgs you can set environment variables on dask_gateway in QHub via environment_vars kwarg but looks like you need to upgrade to qhub 0.3.12

Would it be fair to say that max_mem, which defines the upper limit of a chunk memory footprint, is expected to be much smaller than the actual peak memory usage? There will always be temporary arrays allocated and bytes objects during remote writes.

Would it be fair to say that max_mem, which defines the upper limit of a chunk memory footprint, is expected to be much smaller than the actual peak memory usage? There will always be temporary arrays allocated and bytes objects during remote writes.

Yes, I think that's correct. But here I think we have a bigger problem related to garbage collection.

Quansight is releasing a new version of Qhub later this week, at which point I will upgrade the ESIP qhub, and we will have a much easier way to set environment variables on the workers, which will facilitate trying out {"MALLOC_TRIM_THRESHOLD_": "0"} idea.

I'm struggling a lot with distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak at the moment.

Before I report the issue in more detail, I wanted to try this MALLOC_TRIM_THRESHOLD_ method - does someone know how I can set in on a LocalCluster instance? The dask doc is not particularly helpful and I don't need dask_gateway as in your examples @rabernat (which, btw, also seem to fail on recent dask_gateway as far as I could try with a local gateway server)

Thanks a lot!

The details of how I was setting MALLOC_TRIM_THRESHOLD_: dask/distributed#2602 (comment)

It really does seem to be a magic flag for Dask. I don't know how to set it on a LocalCluster.

Thanks Ryan - I've asked on Dask Discourse and will report back.

Thanks for your help. I'll add my voice to the issues raised there. I wanted to showcase dask for my advanced programming class but this really won't help. Even the simplest of computations (that would fit in memory) are killing my local cluster: https://nbviewer.ipython.org/gist/fmaussion/5212e3155256e84e53d033e61085ca30

Try chunking inside the open_*dataset calls by passing chunks. Otherwise, it will read the whole file into memory and then chunk and distribute it.

I guess we should say this here.

Try chunking inside the open_*dataset calls by passing chunks. Otherwise, it will read the whole file into memory and then chunk and distribute it.

Well this is embarrassing - I did not know that. This saved my class if not my dignity ;-)

I guess we should say this here.

I will open a PR immediately. This should be priority number one for mf datasets. I guess this problem does not occur with single file open_datasets?

I guess this problem does not occur with single file open_datasets?

I think it still does. "chunk as early as possible, and avoid rechunking as much as possible" is the principle in my experience.

I think it still does

Why? Per default the variables are lazy loaded and not dask arrays, and a subsequent call to .chunk() will convert them?

"chunk as early as possible, and avoid rechunking as much as possible" is the principle in my experience.

This just made it to the docs ;-)

Why? Per default the variables are lazy loaded and not dask arrays, and a subsequent call to .chunk() will convert them?

makes sense!

I am following on this issue (although I am not sure whether this is the correct place -- maybe this is more dask-related).
I am still struggling with this error: unmanaged memory getting very high, raising warnings and errors eventually causing the workers to be killed. I am usually using a LocalCluster or PBSCluster. Is setting MALLOC_TRIM_TRESHOLD_ to 0 still the way to go? How to do so? Beyond the issue mentioned by Ryan above, I have been going through various Dask issues (this one, seemingly fixed by that one) and parts of the doc, but my brain does not manage to reach a conclusion and get me through this.