Blosc/python-blosc2

Issues with user-defined codecs

aleixalcacer opened this issue · 4 comments

I'm facing issues when creating a simple codec that just makes a copy of the data to get familiar with Blosc's registering machinery. I attach the code:

import blosc2
import numpy as np

# Create an User-defined codec (just a memcpy)

def encoder(input, output, meta, schunk: blosc2.SChunk):
    print(f"Encoder output size: {output.size}")
    output[:schunk.blocksize] = input[:schunk.blocksize]
    return schunk.blocksize

def decoder(input, output, meta, schunk: blosc2.SChunk):
    output[:schunk.blocksize] = input[:schunk.blocksize]
    return schunk.blocksize

# Register the codec
codec_id = 200
blosc2.register_codec('test1', codec_id, encoder, decoder)

# Compress this array with the new codec

shape = (100, 100)
a = np.ones(shape, dtype=np.int64)


cparams = {
    'codec': codec_id,
    'nthreads': 1,
    'filters': [],
    'splitmode': blosc2.SplitMode.NEVER_SPLIT,
}

dparams = {
    'nthreads': 1,
}

chunks = shape
blocks = (50, 50)

c_a = blosc2.asarray(a, chunks=chunks, blocks=blocks, cparams=cparams, dparams=dparams)

However, when I run the previous code, I get the following:

Encoder output size: 20000
Encoder output size: 20000
Encoder output size: 20000
Encoder output size: 19968

ValueError: could not broadcast input array from shape (20000,) into shape (19968,)

Looking at this, it appears that the last block of the chunk is smaller than the others. Do you know what is happening? Is there something I'm doing wrong?

I have had a look into this. The problem in your approach is that you was trying to copy everything without testing whether there is available space. This can be fixed with something like:

def encoder(input, output, meta, schunk: blosc2.SChunk):
    try:
        output[:schunk.blocksize] = input[:schunk.blocksize]
    except:
        return 0
    return schunk.blocksize

But the best solution to copy is to just return 0; this way, blosc will know that the codec could not compress and it will copy the original buffer as-is:

def encoder(input, output, meta, schunk: blosc2.SChunk):
    # By returning 0, we are saying that we are not compressing anything
    return 0

With the next script:

import blosc2
import numpy as np
import sys

# Create a User-defined codec (just a memcpy)

def encoder(input, output, meta, schunk: blosc2.SChunk):
    #print(f"Encoder output size: {output.size, input.size, schunk.blocksize}")
    # By returning 0, we are saying that we are not compressing anything
    return 0
    # Alternative, but more time-consuming:
    try:
        output[:schunk.blocksize] = input[:schunk.blocksize]
    except:
        return 0
    return schunk.blocksize

def decoder(input, output, meta, schunk: blosc2.SChunk):
    #print(f"Decoder output size: {output.size, input.size, schunk.blocksize}")
    output[:schunk.blocksize] = input[:schunk.blocksize]
    return schunk.blocksize

# Register the codec
codec_id = 200
blosc2.register_codec('test1', codec_id, encoder, decoder)

# Compress this array with the new codec

shape = (100, 100)
a = np.ones(shape, dtype=np.int64)


cparams = {
    'codec': codec_id,
    'nthreads': 1,
    'filters': [],
    'splitmode': blosc2.SplitMode.NEVER_SPLIT,
}

dparams = {
    'nthreads': 1,
}

chunks = shape
blocks = (50, 50)

c_a = blosc2.asarray(a, chunks=chunks, blocks=blocks, cparams=cparams, dparams=dparams)
print(c_a.info)
print(c_a[:])

I am getting this output:

type    : NDArray
shape   : (100, 100)
chunks  : (100, 100)
blocks  : (50, 50)
dtype   : int64
cratio  : 1.00
cparams : {
    'blocksize': 20000,
    'clevel': 1,
    'codec': 200,
    'codec_meta': 0,
    'filters': [
        <Filter.NOFILTER: 0>,
        <Filter.NOFILTER: 0>,
        <Filter.NOFILTER: 0>,
        <Filter.NOFILTER: 0>,
        <Filter.NOFILTER: 0>,
        <Filter.NOFILTER: 0>
    ],
    'filters_meta': [0, 0, 0, 0, 0, 0],
    'nthreads': 1,
    'splitmode': <SplitMode.NEVER_SPLIT: 2>,
    'typesize': 8,
    'use_dict': 0
}
dparams : {'nthreads': 1}

[[1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]
 ...
 [1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]]
Error in sys.excepthook:

Original exception was:

Which is what you want. There is still the error:

Error in sys.excepthook:

Original exception was:

that frankly, I don't know where it comes from...

FWIW, here it is a working version that does not raise the sys.excepthook exception (based on pytest-dev/execnet#30):

import blosc2
import numpy as np
import sys

# Create a User-defined codec (just a memcpy)

def encoder(input, output, meta, schunk: blosc2.SChunk):
    # By returning 0, we are saying that we are not compressing anything
    return 0

def decoder(input, output, meta, schunk: blosc2.SChunk):
    output[:schunk.blocksize] = input[:schunk.blocksize]
    return schunk.blocksize

def main():
    # Register the codec
    codec_id = 200
    blosc2.register_codec('test1', codec_id, encoder, decoder)

    # Compress this array with the new codec

    shape = (100, 100)
    a = np.ones(shape, dtype=np.int64)

    cparams = {
        'codec': codec_id,
        'nthreads': 1,
        'filters': [],
        'splitmode': blosc2.SplitMode.NEVER_SPLIT,
    }

    dparams = {
        'nthreads': 1,
    }

    chunks = shape
    blocks = (50, 50)

    c_a = blosc2.asarray(a, chunks=chunks, blocks=blocks, cparams=cparams, dparams=dparams)
    print(c_a.info)
    print(c_a[:])

if __name__ == '__main__':
    try:
        sys.exit(main())
    finally:
        # This block is crucial to avoid having issues with
        # Python spitting non-sense thread exceptions. We have already
        # handled what we could, so close stderr and stdout.
        try:
            sys.stdout.close()
        except:
            pass
        try:
            sys.stderr.close()
        except:
            pass

After thinking twice, we should try to make your original code working, because we want a user-defined codec to continue working even when it cannot compress a chunk (which is not the case currently). Unfortunately, this requires some important changes in the underlying C-Blosc2 library (which is equally affected), and should be postponed til we have time. If you want to tackle this one, that would be great ;-)

Thanks for the response Francesc, I think the same that you. when I have time I will be happy to try to fix it :)