Issues with user-defined codecs
aleixalcacer opened this issue · 4 comments
I'm facing issues when creating a simple codec that just makes a copy of the data to get familiar with Blosc's registering machinery. I attach the code:
import blosc2
import numpy as np
# Create an User-defined codec (just a memcpy)
def encoder(input, output, meta, schunk: blosc2.SChunk):
print(f"Encoder output size: {output.size}")
output[:schunk.blocksize] = input[:schunk.blocksize]
return schunk.blocksize
def decoder(input, output, meta, schunk: blosc2.SChunk):
output[:schunk.blocksize] = input[:schunk.blocksize]
return schunk.blocksize
# Register the codec
codec_id = 200
blosc2.register_codec('test1', codec_id, encoder, decoder)
# Compress this array with the new codec
shape = (100, 100)
a = np.ones(shape, dtype=np.int64)
cparams = {
'codec': codec_id,
'nthreads': 1,
'filters': [],
'splitmode': blosc2.SplitMode.NEVER_SPLIT,
}
dparams = {
'nthreads': 1,
}
chunks = shape
blocks = (50, 50)
c_a = blosc2.asarray(a, chunks=chunks, blocks=blocks, cparams=cparams, dparams=dparams)
However, when I run the previous code, I get the following:
Encoder output size: 20000
Encoder output size: 20000
Encoder output size: 20000
Encoder output size: 19968
ValueError: could not broadcast input array from shape (20000,) into shape (19968,)
Looking at this, it appears that the last block of the chunk is smaller than the others. Do you know what is happening? Is there something I'm doing wrong?
I have had a look into this. The problem in your approach is that you was trying to copy everything without testing whether there is available space. This can be fixed with something like:
def encoder(input, output, meta, schunk: blosc2.SChunk):
try:
output[:schunk.blocksize] = input[:schunk.blocksize]
except:
return 0
return schunk.blocksize
But the best solution to copy is to just return 0
; this way, blosc will know that the codec could not compress and it will copy the original buffer as-is:
def encoder(input, output, meta, schunk: blosc2.SChunk):
# By returning 0, we are saying that we are not compressing anything
return 0
With the next script:
import blosc2
import numpy as np
import sys
# Create a User-defined codec (just a memcpy)
def encoder(input, output, meta, schunk: blosc2.SChunk):
#print(f"Encoder output size: {output.size, input.size, schunk.blocksize}")
# By returning 0, we are saying that we are not compressing anything
return 0
# Alternative, but more time-consuming:
try:
output[:schunk.blocksize] = input[:schunk.blocksize]
except:
return 0
return schunk.blocksize
def decoder(input, output, meta, schunk: blosc2.SChunk):
#print(f"Decoder output size: {output.size, input.size, schunk.blocksize}")
output[:schunk.blocksize] = input[:schunk.blocksize]
return schunk.blocksize
# Register the codec
codec_id = 200
blosc2.register_codec('test1', codec_id, encoder, decoder)
# Compress this array with the new codec
shape = (100, 100)
a = np.ones(shape, dtype=np.int64)
cparams = {
'codec': codec_id,
'nthreads': 1,
'filters': [],
'splitmode': blosc2.SplitMode.NEVER_SPLIT,
}
dparams = {
'nthreads': 1,
}
chunks = shape
blocks = (50, 50)
c_a = blosc2.asarray(a, chunks=chunks, blocks=blocks, cparams=cparams, dparams=dparams)
print(c_a.info)
print(c_a[:])
I am getting this output:
type : NDArray
shape : (100, 100)
chunks : (100, 100)
blocks : (50, 50)
dtype : int64
cratio : 1.00
cparams : {
'blocksize': 20000,
'clevel': 1,
'codec': 200,
'codec_meta': 0,
'filters': [
<Filter.NOFILTER: 0>,
<Filter.NOFILTER: 0>,
<Filter.NOFILTER: 0>,
<Filter.NOFILTER: 0>,
<Filter.NOFILTER: 0>,
<Filter.NOFILTER: 0>
],
'filters_meta': [0, 0, 0, 0, 0, 0],
'nthreads': 1,
'splitmode': <SplitMode.NEVER_SPLIT: 2>,
'typesize': 8,
'use_dict': 0
}
dparams : {'nthreads': 1}
[[1 1 1 ... 1 1 1]
[1 1 1 ... 1 1 1]
[1 1 1 ... 1 1 1]
...
[1 1 1 ... 1 1 1]
[1 1 1 ... 1 1 1]
[1 1 1 ... 1 1 1]]
Error in sys.excepthook:
Original exception was:
Which is what you want. There is still the error:
Error in sys.excepthook:
Original exception was:
that frankly, I don't know where it comes from...
FWIW, here it is a working version that does not raise the sys.excepthook
exception (based on pytest-dev/execnet#30):
import blosc2
import numpy as np
import sys
# Create a User-defined codec (just a memcpy)
def encoder(input, output, meta, schunk: blosc2.SChunk):
# By returning 0, we are saying that we are not compressing anything
return 0
def decoder(input, output, meta, schunk: blosc2.SChunk):
output[:schunk.blocksize] = input[:schunk.blocksize]
return schunk.blocksize
def main():
# Register the codec
codec_id = 200
blosc2.register_codec('test1', codec_id, encoder, decoder)
# Compress this array with the new codec
shape = (100, 100)
a = np.ones(shape, dtype=np.int64)
cparams = {
'codec': codec_id,
'nthreads': 1,
'filters': [],
'splitmode': blosc2.SplitMode.NEVER_SPLIT,
}
dparams = {
'nthreads': 1,
}
chunks = shape
blocks = (50, 50)
c_a = blosc2.asarray(a, chunks=chunks, blocks=blocks, cparams=cparams, dparams=dparams)
print(c_a.info)
print(c_a[:])
if __name__ == '__main__':
try:
sys.exit(main())
finally:
# This block is crucial to avoid having issues with
# Python spitting non-sense thread exceptions. We have already
# handled what we could, so close stderr and stdout.
try:
sys.stdout.close()
except:
pass
try:
sys.stderr.close()
except:
pass
After thinking twice, we should try to make your original code working, because we want a user-defined codec to continue working even when it cannot compress a chunk (which is not the case currently). Unfortunately, this requires some important changes in the underlying C-Blosc2 library (which is equally affected), and should be postponed til we have time. If you want to tackle this one, that would be great ;-)
Thanks for the response Francesc, I think the same that you. when I have time I will be happy to try to fix it :)