ssec/polar2grid

workers within the geo2grid.sh command?

Closed this issue · 2 comments

Hi all -- A couple of questions: I worked with the --num-workers argument to geo2grid.sh, in an attempt to speed things up. I was generating a sequence of 4 large (RadF) 5500 X 5500 natural_color images using GOES-16 ABI imagery. The imagery generated was stunning! A few interesting things popped up. I was wondering if you had any comment(s):

1.) In my simple tests, clock time decreased steadily going from 4 to 8 to 12 workers. I saw about a 20-25% decrease in clock time, overall when using 12 workers. Then the clock time improvements stagnated. The clock time remained fairly steady for 16, 20 and even 40 workers, which I found interesting.
2.) Perhaps even more interesting was watching the output from the Linux top command. No matter how many workers I specified on the geo2grid.sh commandline, it appears that only one (1) process ever appeared in the top output -- even with 40 workers! So I don't understand what workers actually are. Furthermore, the memory usage indicated in the VIRT and RES columns did not increase linearly with number of workers. The 4 worker job got up to as much as 30GB RES and a little more than that for VIRT. But even the 40 worker job only got up to about 37GB RES and mid-40's or so for VIRT. This surprised me a bit.
3.) On another topic -- kudos for attacking the generation of more GOES Level-2 products! FYI, I am going to be testing to see if geo2grid.sh can handle generation of GOES L2 CMIP imagery. Do you know if by chance there is a Geo2Grid reader out there that deals with GOES CMIP imagery?

Thank you for all the work you are doing with Geo2Grid!

Sincerely,

Jim

Remember, you asked for this. 😉 But really, I might give you more information than you were expecting, but hopefully it answers the follow-up questions you haven't asked yet.

Workers

Geo2Grid uses Satpy which uses Dask to handle all the low-level parallelization of the array operations performed. Low-level not only in the sense that it handles the multi-threading/multi-processing of the operations, but dask is also designed to split things up at an array-chunk-level not at an entire-product level. So while a high-level solution might say "we want C01, C02, and C03; push each one to a separate worker and process them", dask says "take the first MxN pixels of C01 and process them on worker 1, MxN pixels of C02 on worker 2, and so on". As we add more dask "tasks", we take the result from the previous tasks, give them to a worker, and produce the next result. For example, "read a MxN chunk of C01 from disk" might be one task, "add 1.0" might be another, "multiply by scale_factor" could be another. Note that tasks are not only by operation ("add 1") but also by array chunk.

The reason I bring this up is that the performance of a dask-based library like Satpy/Geo2Grid/Polar2Grid is ultimately a balance between how many tasks need to be processed, how many chunks an array/image is broken up into, and how many workers the tasks can be split across. There are more complications to this "simple" set of variables, but we'll ignore them for the most part for this discussion.

Chunk size

So if we have a really large chunk size the size of the whole full disk array and we're processing one single channel, then no matter how many workers we've allocated to use we'll only ever have 1 task to work on at any one time since each task depends on the output of the previous task. For example:

a = ... load from disk ...
b = a * scale_factor
c = b + add_offset

We can't do c until b is done and we can't do b until a is done. This happens at a larger scale and with multiple dependencies for things like a true_color with rayleigh correction of / cos(SZA) applied, but dask/Satpy figure it out.

You can also imagine if your chunk size is an entire full disk image (so not splitting the array into smaller pieces) that your memory usage is decently high; you're holding the entire full disk array in memory and doing calculations on it, likely producing copies of the original array.

On the other hand, if your arrays are split in to many small chunks you will have a lot of tasks and not very many workers to work on them. Another downside to this is that you're asking dask to manage many more "things" and that ultimately has 10s to 100s of milliseconds overhead for each task. Small chunks means smaller memory usage per task, but modern hardware is probably capable of processing larger amounts of data in a single go. Off the top of my head, ABI in Geo2Grid uses a chunk size of 1356x1356 pixels so that it evenly divides the full disk data arrays (and is a multiple of the on-disk NetCDF chunks - but that's another story). This is configurable, but I honestly am not sure how useful it would be to play with this number any more than I and the rest of the Satpy team already have.

Number of workers

With the ideas mentioned above about chunk sizes, when we get to the point of how many workers are most effective we have to consider how much memory we have available, how many CPU cores we have, and how many tasks we'll be working on.

Dask, as configured by Geo2Grid, uses threads (not processes) to execute workers. It wouldn't make sense to ask for more threads than your system had logical cores because your system would have to swap between these workloads rather than having a real physical/logical processing unit capable of handling it. Given the chunk size used by Geo2Grid and assuming a common "C01-C16 + true_color + other composites" product list, you'll have enough tasks to occupy a large number of workers but you'll be using W workers * C chunks of data per task => Memory Usage. Where some basic operations may have 1 input and 1 output chunk of data, others might have 3 or 4 inputs and 1 output and a lot of intermediate work in between. So that's potentially W * 5C amount of memory being used at any one time.

As you increase the number of workers (and you have enough memory on your system), you'll hit a point of diminishing returns. You'll have more workers, but not enough tasks that are ready to be processed to engage all those workers. So you end up with workers sitting idle while tasks wait for the outputs of the previous tasks they depend on. You can also hit cases where multiple workers are trying to write to the same disk at the same time (writing output geotiffs for example) and saturating the write bandwidth of the disk.

top output

I think you need to do top -H to have top list threads rather than just processes. As mentioned above, the dask workers are threads.

ABI CMIP

The abi_l2_nc reader should be able to read these files and should make the datasets available as C01 - C16 just like the Level 1 files.

jpnIII commented

Hi Dave -- Thank you for your exhaustive discussion! I am digesting it now. FYI, many years ago (late 1980's -> early 1990's), I worked with DG (Data General) computer systems, and did a little bit of Fortran programming that had something where you could spawn off a separate "task" within an already-executing Fortran program. It was really pretty neat, I recall thinking at the time. You would get output from each task simultaneously (and interspersed on the screen, if I remember it correctly). I suspect in retrospect that this was probably launching a separate thread in today's world. I just checked, and DG was bought out by EMC/Dell in 1999, so DG has essentially been gone now for more than 20 years. I did not know that. Kathy Strabala and I both worked at the same company back then -- Kavouras, Inc. in MSP. IMHO, it is sad when a company gets bought out by another company. Many people can lose their jobs. I always thought that DG computers were very nice, and enjoyed working on them. I know that they were instrumental in the operations of Kavouras, in any case! I really enjoy your responses, Dave. You are a pleasure to work with, and SSEC is fortunate to have you! Thank you again for all of your help! Sincerely, Jim