pangeo-data/pangeo

Missing rectangles in zarr dataset loaded from GCS

Closed this issue · 55 comments

I have had some issue with loading slices from some zarr dataset stored on GCS. I don't know where the problem could be coming from so am posting it here in case someone has experienced something similar.

The figure below shows what is going on. Upon slicing and loading some zarr dataset stored on GCS using xarray, I'm getting missing squares (often in the northern hemisphere). The squares change their positions each time I reload the same slice, sometimes they appear for one single variable, sometimes for more than one (at different locations), sometimes they don't happen at all. The rectangle sizes match the chunking which for this dataset corresponds to 25 degrees in lat/lon so it looks like entire chunks are randomly failing to be transferred for some reason.

Has someone seen something similar or would have an idea about this please?

Thanks

Screenshot from 2019-08-12 14-12-41

Hi @rafa-guedes - could you share more information about your dataset? Like the xarray repr and the zarr .info property of one of the arrays?

Hi @rabernat please find some more info below.

In [1]: import xarray as xr

In [2]: import zarr

In [3]: from fsspec import get_mapper

In [4]: fsmap = get_mapper('gs://oceanum-era5/wind_10m.zarr')

In [5]: dset = xr.open_zarr(fsmap, consolidated=True)

In [6]: dset
Out[6]: 
<xarray.Dataset>
Dimensions:    (latitude: 721, longitude: 1440, time: 354264)
Coordinates:
  * latitude   (latitude) float32 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0
  * longitude  (longitude) float32 0.0 0.25 0.5 0.75 ... 359.25 359.5 359.75
  * time       (time) datetime64[ns] 1979-01-01 ... 2019-05-31T23:00:00
Data variables:
    u10        (time, latitude, longitude) float32 dask.array<shape=(354264, 721, 1440), chunksize=(744, 100, 100)>                                                                                         
    v10        (time, latitude, longitude) float32 dask.array<shape=(354264, 721, 1440), chunksize=(744, 100, 100)>                                                                                         
Attributes:
    Conventions:  CF-1.6
    history:      2019-08-01 10:44:37 GMT by grib_to_netcdf-2.10.0: /opt/ecmw...

In [7]: g = zarr.open_group(fsmap)

In [8]: g.info
Out[8]: 
Name        : /
Type        : zarr.hierarchy.Group
Read-only   : False
Store type  : fsspec.mapping.FSMap
No. members : 5
No. arrays  : 5
No. groups  : 0
Arrays      : latitude, longitude, time, u10, v10

In [9]: g.u10.info
Out[9]: 
Name               : /u10
Type               : zarr.core.Array
Data type          : int16
Shape              : (354264, 721, 1440)
Chunk shape        : (744, 100, 100)
Order              : C
Read-only          : False
Compressor         : Blosc(cname='zstd', clevel=3, shuffle=BITSHUFFLE,
                   : blocksize=0)
Store type         : fsspec.mapping.FSMap
No. bytes          : 735622110720 (685.1G)
Chunks initialized : 57240/57240

In [10]: g.v10.info
Out[10]: 
Name               : /v10
Type               : zarr.core.Array
Data type          : int16
Shape              : (354264, 721, 1440)
Chunk shape        : (744, 100, 100)
Order              : C
Read-only          : False
Compressor         : Blosc(cname='zstd', clevel=3, shuffle=BITSHUFFLE,
                   : blocksize=0)
Store type         : fsspec.mapping.FSMap
No. bytes          : 735622110720 (685.1G)
Chunks initialized : 57240/57240

In [11]: dset.u10.encoding
Out[11]: 
{'chunks': (744, 100, 100),
 'compressor': Blosc(cname='zstd', clevel=3, shuffle=BITSHUFFLE, blocksize=0),
 'filters': None,
 '_FillValue': -32767,
 'scale_factor': 0.01,
 'dtype': dtype('int16')}

Perhaps GCS is not returning the data reliably? So @martindurant might have some suggestions on how to get some debugging out of gcsfs. I seem to recall a trick to turn on http logging.

You can set the level gcsfs.core.logger.set_level("DEBUG"), so long as you have some logging initialised (like logging.basicConfig()).

Sorry for taking long to reply, I am only now getting back on this. Thanks for the suggestions @rabernat, @martindurant. There is nothing obvious from those logs though, and the problem still persists, intermittently. I may try and recreate this zarr dataset and check again. I'd be interested to hear if someone has found some similar behaviour in the past, this looks like a bit of a serious issue and I don't know where it would be coming from..

I'm seeing similar intermittent errors with data in the gs://pangeo-parcels bucket.

Here's a gist showing that calculations fail with group not found at path ValueErrors and, if computation is done with retries=10, the statistics change intermittently:

https://nbviewer.jupyter.org/gist/willirath/b7af4bc9a93a79b81772910f8ee5c630

Is there a way of forcing Zarr to retry if a chunk cannot be read rather than setting the chunk to all NaNs?

Does it happen if you set the number of threads per worker to one?

Does it happen if you set the number of threads per worker to one?

Yes, configuring KubeCluster to use --nthreads 2 does not seem to change anything.

and --nthreads 1 ?

Errr, yes, I meant and --nthreads 1.

I honestly cannot think of why this is happening - help appreciated in debugging when/how this happens, the exact error, etc. If it happens in one thread, apparently not a race condition or effect of session sharing. If we are lucky, it is merely one particular exception from the remote backend which we don't consider as retriable, but should (e.g., API rate limit).

Is there an easy way of preventing GCSFS from retrying at all?

Not super-easy: the GCSFileSystem class has an attribute .retries, so this can be set to 1 in every worker with a client.run call

def set_gcsfs_retry():
    import gcsfs
    gcsfs.GCSFileSystem.retries = 1

client.run(set_gcsfs_retry)

I would also run with GCSFS_DEBUG=true

gcsfs.utils.is_retriable is the place where errors are sorted.

One possible way to debug could be to access the data using fsspec's httpfilesystem, bypassing gcsfs completely. fsspec/filesystem_spec#144 shows how you might do this (but also reveals a bug, which has been fixed in the latest fsspec master.)

Can the data be public? I can spin up my own cluster (on pangeo!) and see if I can reproduce? Any other dataset on gcs known to show this?

Can the data be public? I can spin up my own cluster (on pangeo!) and see if I can reproduce? Any other dataset on gcs known to show this?

It's public: https://nbviewer.jupyter.org/gist/willirath/b7af4bc9a93a79b81772910f8ee5c630

dataset_version = "v2019.09.11.2"
bucket = f"pangeo-parcels/med_sea_connectivity_{dataset_version}/traj_data_without_stokes.zarr"

Do we know if this is tied to dask / distributed, or can it be reproduced purely at the gcsfs / zarr level?

(I was trying 'gs://oceanum-era5/wind_10m.zarr', sorry)

Agree, @rabernat , would be worthwhile paging through all of the pieces of a zarr in a single thread or a local 4/8-thread dask scheduler.

Here is an interesting snippet on mitigating request-rate related errors:
https://cloud.google.com/storage/docs/request-rate#ramp-up

Do we know if this is tied to dask / distributed, or can it be reproduced purely at the gcsfs / zarr level?

I started looking into this question, will post it here once I get some insight

(I was trying 'gs://oceanum-era5/wind_10m.zarr', sorry)

I could make this one public if needed for further tests, only need a way to ensure it will be pulled from us-central region to avoid egress costs..

@martindurant @rabernat I have done a few tests - the problem only happens with me when I load data from GCS on dask distributed cluster.

I loaded a spatial slice (always the same slice) from my dataset under 3 different conditions: [1] no dask distributed cluster, [2] distributed cluster scaled up to 2 workers and [3] up to 4 workers. Testing script is here, the machine where I ran it is described below. Each test was run 170 times.

[1] No dask distributed cluster

There were no missing cells reading zarr from GCS by running tests without distributed cluster.

[2] Local dask cluster with 2 workers

from dask.distributed import Client
client = Client()
client.cluster.scale(2)

I did not observe entire chunks missing using 2 workers. However, over 3 (1.8%) out of 170 cases, there were missing values within a chunk. These 3 cases are shown below, there were 2100, 2100 and 840 missing cells in these 3 (out of 1036800 cells).

missing_chunking_parts

[3] Local dask cluster with 4 workers

When using 4 workers:

client.scale(4)

I observed entire chunks missing over 41 out of 170 times (24%) as shown below. Interesting to note that usually the chunks missing are in the first latitude row (this dataset has reversed coordinates, [90:-90]). This is in contrast to the missing partial chunks in [2] which occurred always in the last row.

missing_chunks_4workers

VM used to run testing

$lscpu
Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
Address sizes:       46 bits physical, 48 bits virtual
CPU(s):              8
On-line CPU(s) list: 0-7
Thread(s) per core:  2
Core(s) per socket:  4
Socket(s):           1
NUMA node(s):        1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               63
Model name:          Intel(R) Xeon(R) CPU @ 2.30GHz
Stepping:            0
CPU MHz:             2300.000
BogoMIPS:            4600.00
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:           32K
L1i cache:           32K
L2 cache:            256K
L3 cache:            46080K
NUMA node0 CPU(s):   0-7
Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm invpcid_single pti ssbd ibrs ibpb stibp fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt arat md_clear arch_capabilities

$free -h
              total        used        free      shared  buff/cache   available
Mem:           29Gi       521Mi       9.6Gi       0.0Ki        19Gi        28Gi
Swap:            0B          0B          0B

Missing pieces within a chunk is totally out-of-the-world strange. Can you tell if the NaNs for a contiguous block in the original data?

What was your worker setup, please, e.g., threads/process?

Another quick question: are there any NaNs in the whole dataset, or is any NaN coming at any point necessarily a symptom of the syndrome?

There are certainly no NaNs in this slice I am loading in these tests in the original data. There should be no NaNs in the entire dataset which covers 40 years but I need to check that..

I can check the cluster setup a bit later on.

What was your worker setup, please, e.g., threads/process?

The two setups looked like this:

In [1]: from dask.distributed import Client

In [2]: client = Client()

In [3]: client.cluster.scale(2)

In [4]: client
Out[4]: <Client: 'tcp://127.0.0.1:45395' processes=2 threads=4, memory=15.81 GB>

In [5]: client.cluster.scale(4)

In [6]: client
Out[6]: <Client: 'tcp://127.0.0.1:45395' processes=4 threads=8, memory=31.61 GB>

Hm, tried it now with middling (10 workers, 20 cores) and small (2 workers) cluster now, and got no missing data, working on the dask arrays directly. Note that the chunking I chose matches the zarr inetrnal chunking, which is what I think xarray does automatically :|
Running it now on the smaller one 100 times.

Screen Shot 2019-10-07 at 10 28 27

(I get the same success with a local cluster too; all of this running in a google cluster)

I only saw these errors with fairly large kubernetes clusters. This is a fully working example that returned different counts of NaNs (which all shouldn't be there) on ocean.pangeo.io a few minutes ago:

https://nbviewer.jupyter.org/gist/willirath/b40b0b6f281cb1a46fcf848150ca0367

In my case I was not using a kubernetes cluster but a single 8-core virtual machine, with small local dask cluster setup. Also, I was working with another dataset recently and did not notice this problem (though I did not test it extensively) so this may perhaps be related to some characteristics of the dataset, just guessing...

@martindurant I interpreted case [2] (missing intra-chunks) in those tests wrong. My coordinates (longitude=1440, latitude=721) are not divisible by my chunk sizes (longitude=100, latitude=100), and the last chunk row and column have less cells. It turns out entire chunks are also missing for those 3 cases I mentioned, see plot.

Screenshot from 2019-10-08 14-01-15

<xarray.Dataset>
Dimensions:    (latitude: 721, longitude: 1440, time: 354264)
Coordinates:
  * latitude   (latitude) float32 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0
  * longitude  (longitude) float32 0.0 0.25 0.5 0.75 ... 359.25 359.5 359.75
  * time       (time) datetime64[ns] 1979-01-01 ... 2019-05-31T23:00:00
Data variables:
    u10        (time, latitude, longitude) float64 dask.array<chunksize=(744, 100, 100), meta=np.ndarray>
    v10        (time, latitude, longitude) float64 dask.array<chunksize=(744, 100, 100), meta=np.ndarray>

Just a bit more info in case it is useful.

  • From those 3 out of 170 cases where I got missing chunks on a 2-worker cluster, all missing chunks occurred in the last (thinner) chunk row as discussed above.

  • From those 41 out of 170 cases where I got missing chunks on a 4-worker cluster:

    • In 35 cases missing chunks occurred in the first latitude row.
    • In only 1 case missing chunk occurred in the last row (the thinner chunk row).

Screenshot from 2019-10-08 20-02-46

Can you please try again with fsspec and gcsfs from master?

It looks great @martindurant - no missing chunks on either a 2- or a 4-worker distributed cluster, repeating the tests 100 times. Thanks for looking into this!

hurray!

@willirath could you please check those also fix it for you?

I get KilledWorker exceptions after pip installing gcsfs and fsspec master on hub.pangeo.io.

I'll have a closer look tomorrow...

I get KilledWorker exceptions

You will need the worker and client environments to agree, which would mean remaking the image in this case, a bit of a pain...

I am running the same thing in a single process and not finding any errors so far, but I have no idea how long this will take (did not think to make a progress bar...).

Finished without a NaN:

ds_combined.distance.data.sum().compute()
2445454000000.0

(I do this via the dask array so that I don't have to worry about interpretation of NaN - even one would make the result be NaN)

Could we close this?

Waiting for the OK from @willirath ; who may want to run with Client() (i.e., a LocalCluster of dask processes, launched from the same environment) to test

I've just created a setup with KubeCluster on the pangeo binder. Stay tuned...

I still get NaN's.

Gist (still running, so there will be an update) is here: https://gist.github.com/willirath/b40b0b6f281cb1a46fcf848150ca0367

My impression is that the errors occur a lot less frequent. But this might be due to adapted rate limits on the GCS side?

results = []
for n in range(20):
    
    vlist = [
        ds[vname].isnull().data.sum()
        for vname in ds.data_vars.keys()
    ]
    
    results.append(sum(vlist).compute(retries=10))
    
    print(n, results[-1])
    
    sleep(20) # allow for scaling down
0 96200000
1 0
2 0
3 0
4 0
5 0
6 0
7 0
8 0
9 0
10 0
11 0
12 0
13 181433200
14 0
15 0

I repeated my tests 1000 times now on a 4-worker local cluster, no nans at all for me

OK, so closing this, and maybe @willirath is facing a different issue (another rate limit?). More logging may tell.

I've raised zarr-developers/zarr-python#486 hoping to find out how to more easily trigger an Exception upon errors when loading chunks.

Zarr deciding that where a chunk does not exist, it should be nan is the right behaviour. The question is, what error is actually coming out of gcsfs (FileNotFound instead of some permission error in the file-system layer, and/or IndexError in the mapper) and what errors would be right for zarr. Is gcsfs retrying the underlying error? This info should be in the logs, or else the logging should be improved!

Just to say we're hitting something like this currently, looking at the fsspec code I think there may still be a possibility that transient errors from storage are getting propagated as KeyErrors to zarr, which then fills in the chunk with missing data. Have raised an issue here: fsspec/filesystem_spec#255.

We're currently using this workaround which can be used where you know all chunks should be there for an array, i.e., you never expect a KeyError to get raised from the store for any reason.

For information, changes merged upstream in fsspec that should protect against this happening: fsspec/filesystem_spec#259

Thanks for tracking this down @alimanfoo!

Over in pydata/xarray#3831, we have been discussing how to set up ecosystem-wide integration testing for all of the interconnected packages one needs to use xarray + zarr on the cloud. I'd love to get your thoughts on how best to do this.