Raw integer as key cause Topic.publish_message crash
iblislin opened this issue ยท 9 comments
I suspect the keysize
calculation is problematic for the raw integer key.
Line 412 in 9a1cf92
The key could be deserialized correctly with Serdes.Integer()
in Java.
I try to consume the message with faust, invoke group_by
and get errors:
topic = app.topic('topic', key_serializer='raw', key_type=bytes)
@app.agent(topic)
async def f(records):
async for i in records.group_by(...):
print(i)
Traceback (most recent call last):
File "/home/iblis/venv/lib/python3.11/site-packages/faust/agents/agent.py", line 674, in _execute_actor
await coro
File "/home/iblis/git/./src/app.py", line 64, in f
async for i in records.group_by(...):
File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 1061, in _c_aiter
value, sensor_state = await it.next() # noqa: B305
^^^^^^^^^^^^^^^
File "faust/_cython/streams.pyx", line 90, in next
File "/home/iblis/venv/lib/python3.11/site-packages/faust/channels.py", line 529, in __anext__
return await self.queue.get()
^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 214, in get
return await super().get()
^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 176, in get
return cast(_T, await super().get())
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/queues.py", line 158, in get
await getter
File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 756, in _passive_drainer
async for item in self: # pragma: no cover
File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 1061, in _c_aiter
value, sensor_state = await it.next() # noqa: B305
^^^^^^^^^^^^^^^
File "faust/_cython/streams.pyx", line 97, in next
File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/futures.py", line 138, in maybe_async
return await res
^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 889, in repartition
await event.forward(channel, key=new_key)
File "/home/iblis/venv/lib/python3.11/site-packages/faust/events.py", line 190, in forward
return await self._send(
^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/events.py", line 218, in _send
return await cast(_App, self.app)._attachments.maybe_put(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/app/_attached.py", line 130, in maybe_put
return await send(
^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/app/base.py", line 1537, in send
return await chan.send(
^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/topics.py", line 189, in send
return await self._send_now(
^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/channels.py", line 314, in _send_now
return await self.publish_message(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/topics.py", line 413, in publish_message
keysize=len(key) if key else 0,
^^^^^^^^
TypeError: object of type 'int' has no len()
Versions
-
Python version: 3.11
-
Faust version: 0.10.13
-
Operating system:
% uname -a
Linux gaebolg 6.1.27-1-lts #1 SMP PREEMPT_DYNAMIC Mon, 01 May 2023 18:18:27 +0000 x86_64 GNU/Linux
-
Kafka version: 3.4.1
-
RocksDB version (if applicable)
Using integers as topic keys is uncommon but not necessarily unheard of. I believe this could work if you set key_serializer="pickle"
as specified in the existing codecs: https://faust-streaming.github.io/faust/userguide/models.html#codecs
I should probably test this myself to be sure.
Yup, it works:
#!/usr/bin/env python
import faust
app = faust.App(
'hello-world',
broker='kafka://localhost:9092',
)
greetings_topic = app.topic('greetings', value_type=str, key_serializer='pickle')
@app.agent(greetings_topic)
async def print_greetings(greetings):
async for greeting in greetings:
print(greeting)
@app.timer(5)
async def produce():
for i in range(100):
await greetings_topic.send(value=f'hello {i}', key=1)
if __name__ == '__main__':
app.main()
Just a serialization issue. ๐ I'll go ahead and close this.
Hi @wbarnha ,
I got KeyDeocdeError if I use the group_by
method:
import faust
app = faust.App(
'hello-world',
broker='kafka://localhost:9092',
)
class Foo(faust.Record):
cate: str
greetings_topic = app.topic('greetings', value_type=Foo, key_serializer='pickle')
@app.agent(greetings_topic)
async def print_greetings(greetings: faust.Stream):
async for i in greetings.group_by(Foo.cate):
print(i)
@app.timer(5)
async def produce():
for i in range(100):
foo = Foo()
foo.cate = 'bar'
await greetings_topic.send(value=foo, key=i)
if __name__ == '__main__':
app.main()
Can you provide a traceback? I'm unavailable to test at the moment.
sure,
[2023-06-08 10:10:59,752] [1869829] [ERROR] [^----Agent*: __main__.f]: Crashed reason=KeyDecodeError('Invalid base64-encoded string: number of data characters (1) cannot be 1 more than a multiple of 4')
Traceback (most recent call last):
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/registry.py", line 56, in loads_key
payload = self._loads(serializer, key)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/registry.py", line 64, in _loads
return loads(serializer, data)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 364, in loads
return get_codec(codec).loads(s) if codec else s
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 231, in loads
s = cast(Codec, node)._loads(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 297, in _loads
return b64decode(s)
^^^^^^^^^^^^
File "/usr/lib/python3.11/base64.py", line 88, in b64decode
return binascii.a2b_base64(s, strict_mode=validate)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
binascii.Error: Invalid base64-encoded string: number of data characters (1) cannot be 1 more than a multiple of 4
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/iblis/venv/lib/python3.11/site-packages/faust/agents/agent.py", line 674, in _execute_actor
await coro
File "/home/iblis/git/py/./app.py", line 64, in f
async for i in visit_records.group_by(VisitRecord.VisitNo):
File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 1061, in _c_aiter
value, sensor_state = await it.next() # noqa: B305
^^^^^^^^^^^^^^^
File "faust/_cython/streams.pyx", line 90, in next
File "/home/iblis/venv/lib/python3.11/site-packages/faust/channels.py", line 529, in __anext__
return await self.queue.get()
^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 214, in get
return await super().get()
^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 176, in get
return cast(_T, await super().get())
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/queues.py", line 158, in get
await getter
File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 756, in _passive_drainer
async for item in self: # pragma: no cover
File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 1061, in _c_aiter
value, sensor_state = await it.next() # noqa: B305
^^^^^^^^^^^^^^^
File "faust/_cython/streams.pyx", line 90, in next
File "/home/iblis/venv/lib/python3.11/site-packages/faust/channels.py", line 529, in __anext__
return await self.queue.get()
^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 214, in get
return await super().get()
^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 176, in get
return cast(_T, await super().get())
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/queues.py", line 158, in get
await getter
File "faust/transport/_cython/conductor.pyx", line 73, in faust.transport._cython.conductor.ConductorHandler.__call__
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/schemas.py", line 174, in decode
k: K = schema_loads_key(app, message, loads=loads_key)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/schemas.py", line 88, in loads_key
loads(
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/registry.py", line 61, in loads_key
raise KeyDecodeError(str(exc)).with_traceback(sys.exc_info()[2]) from exc
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/registry.py", line 56, in loads_key
payload = self._loads(serializer, key)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/registry.py", line 64, in _loads
return loads(serializer, data)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 364, in loads
return get_codec(codec).loads(s) if codec else s
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 231, in loads
s = cast(Codec, node)._loads(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 297, in _loads
return b64decode(s)
^^^^^^^^^^^^
File "/usr/lib/python3.11/base64.py", line 88, in b64decode
return binascii.a2b_base64(s, strict_mode=validate)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
faust.exceptions.KeyDecodeError: Invalid base64-encoded string: number of data characters (1) cannot be 1 more than a multiple of 4
Interesting, I forgot to consider usage with group_by
. To be honest, I wouldn't recommend using an integer as a message key. A string would be much easier to work with.
Ah, okay. (well, in my case, I cannot change the types of message key, since the producer programs are not controlled by me. and there are tons of streams using Int32 and Int64.)
Could this issue ticket be re-opened ?
Ah, okay. (well, in my case, I cannot change the types of message key, since the producer programs is not controlled by me. and there are tons of streams using Int32 and Int64.)
Could this issue ticket be re-opened ?
Sure, sorry I can't help further at the moment. Also, please accept my condolences for having to deal with that constraint beyond your control. I know that pain all too well.
Can you possibly emit to a new topic where you convert the int key to a string and then group by on the new topic? As a workaround.