Blosc/python-blosc2

Multithreading issues with postfilters

FrancescAlted opened this issue · 0 comments

We are facing issues when using multithreading in combination with postfilters (with prefilters probably too). The next simple script hangs forever (probaly due to some pthread lock waiting to be unlocked):

from time import time
import blosc2
import numpy as np


# Size and dtype of super-chunks
# nchunks = 20_000
# chunkshape = 50_000
nchunks = 1
chunkshape = 20_000
dtype = np.dtype(np.int32)
chunksize = chunkshape * dtype.itemsize

# Set the compression and decompression parameters
cparams = {"clevel": 1, "codec": blosc2.Codec.LZ4HC, "typesize": 4, "nthreads": 8}
dparams = {"nthreads": 8}
storage = {"cparams": cparams, "dparams": dparams}

schunk = blosc2.SChunk(chunksize=chunksize, **storage)

data = np.zeros(chunkshape, dtype=dtype)
t0 = time()
for i in range(nchunks):
    schunk.append_data(data)
print(f"time append: {time() - t0:.2f}s")
print(f"cratio: {schunk.cratio:.2f}x")

# Associate a postfilter to schunk
@blosc2.postfilter(schunk, np.dtype(dtype))
def py_postfilter(input, output, offset):
    output[:] = input + 1
    #pass


t0 = time()
sum = 0
for chunk in schunk.iterchunks(dtype):
    sum += chunk.sum()
print(f"time sum (postfilter): {time() - t0:.2f}s")
print(sum)

The output is:

time append: 0.00s
cratio: 2500.00x

Process finished with exit code 143 (interrupted by signal 15: SIGTERM)

Above the time for sum is not printed because I needed to kill the process (killall python). However, if we comment out the postfilter registration, the script finishes correctly:

time append: 0.00s
cratio: 2500.00x
time sum (postfilter): 0.00s
0

Furthermore, I have been able to create a C program doing exactly the same thing and I cannot reproduce the blocking:

#include <stdio.h>
#include <assert.h>
#include <blosc2.h>

#define KB  1024.
#define MB  (1024*KB)
#define GB  (1024*MB)

#define CHUNKSIZE (20 * 1000)
//#define NCHUNKS 1000
#define NCHUNKS 1
#define NTHREADS 8


int postfilter_func(blosc2_postfilter_params *postparams) {
  int nelems = postparams->size / postparams->typesize;
  int32_t *in = ((int32_t *)(postparams->in));
  int32_t *out = ((int32_t *)(postparams->out));
  for (int i = 0; i < nelems; i++) {
    out[i] = in[i] + 1;
  }
  return 0;
}


int main(void) {

  blosc2_init();

  static int32_t data[CHUNKSIZE];
  static int32_t data_dest[CHUNKSIZE];
  int32_t isize = CHUNKSIZE * sizeof(int32_t);
  int i, nchunk;
  int64_t nchunks;
  blosc_timestamp_t last;

  printf("Blosc version info: %s (%s)\n",
         BLOSC2_VERSION_STRING, BLOSC2_VERSION_DATE);

  /* Create a super-chunk container */
  blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS;
  cparams.typesize = sizeof(int32_t);
  cparams.compcode = BLOSC_LZ4HC;
  cparams.clevel = 1;
  cparams.nthreads = NTHREADS;
  blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS;
  dparams.nthreads = NTHREADS;
  // Set some postfilter parameters and function
  dparams.postfilter = (blosc2_postfilter_fn)postfilter_func;
  // We need to zero the contents of the postparams
  blosc2_postfilter_params postparams = {0};
  // In this case we are not passing any additional input in tpostparams
  dparams.postparams = &postparams;

  blosc2_storage storage = {.cparams=&cparams, .dparams=&dparams};
  blosc2_schunk* schunk = blosc2_schunk_new(&storage);

  // Add some data
  blosc_set_timestamp(&last);
  for (nchunk = 0; nchunk < NCHUNKS; nchunk++) {
    for (i = 0; i < CHUNKSIZE; i++) {
      data[i] = i + nchunk * CHUNKSIZE;
    }
    nchunks = blosc2_schunk_append_buffer(schunk, data, isize);
    assert(nchunks == nchunk + 1);
  }

  /* Retrieve and decompress the chunks from the super-chunks and compare values */
  for (nchunk = 0; nchunk < NCHUNKS; nchunk++) {
    int32_t dsize = blosc2_schunk_decompress_chunk(schunk, nchunk, data_dest, isize);
    if (dsize < 0) {
      printf("Decompression error in schunk.  Error code: %d\n", dsize);
      return dsize;
    }
    /* Check integrity of this chunk */
    for (i = 0; i < CHUNKSIZE; i++) {
      assert (data_dest[i] == 1 + i + nchunk * CHUNKSIZE);
    }
  }
  printf("Successful roundtrip schunk <-> frame <-> fileframe !\n");

  /* Free resources */
  blosc2_schunk_free(schunk);
  blosc2_destroy();

  return 0;
}

This runs perfectly well (even with 1000 chunks):

Blosc version info: 2.4.4.dev ($Date:: 2022-10-23 #$)
Successful roundtrip schunk <-> frame <-> fileframe !

Process finished with exit code 0

So the issues should be in python-blosc2. I strongly suspect that we are not correctly holding the GIL during the invocation of the postfilter, or, for some reason, calling Python code from multithreaded code is not safe. Needs some investigation. One possibility is to disable multithreading completely for pre-/post-filters.