how to use custom messages, Custom serialization per message: on fix_unpack_dictionary empty serialized_dictionary
magowiz opened this issue · 2 comments
Hi,
I'm trying to extend the overlay tutorial adding a new kind of message, which should be a dictionary containing data, so I used the example provided here: https://py-ipv8.readthedocs.io/en/latest/reference/serialization.html#custom-serialization-per-message, and in particular the dataclass variant, so in a separate module I have:
`
import json
from ipv8.messaging.payload_dataclass import dataclass
@DataClass(msg_id=2)
class TextMessage:
dictionary: str
def fix_pack_dictionary(self, the_dictionary: dict) -> str:
return json.dumps(the_dictionary)
@classmethod
def fix_unpack_dictionary(cls,
serialized_dictionary: str) -> dict:
return json.loads(serialized_dictionary)
In main module I implemented another method to send this kind of messages:
async def send_message_to_all(self) -> None:
for peer in self.get_peers():
sentence = gen_random_sentence()
sentence = {'dictionary': 'hello'}
self.ez_send(peer, TextMessage(sentence))
`
which I call here, in started method of community:
`
def started(self) -> None:
async def start_communication() -> None:
if not self.lamport_clock:
for p in self.get_peers():
self.ez_send(p, MyMessage(self.lamport_clock))
else:
self.cancel_pending_task("start_communication")
await self.send_message_to_all()
self.register_task("start_communication", start_communication, interval=10.0, delay=0)
`
the problem is that when fix_unpack_dictionary
gets called from mine TextMessage class, I always receive an empty string as serialized_dictionary
parameter` and so a JSONDecodeError occurs.
I also show you mine on_message
handler
@lazy_wrapper(TextMessage) def on_message(self, peer: Peer, p_load: TextMessage) -> None: if p_load.msg_id == 2: sentence = p_load.sentence.decode('utf-8') print(sentence) print(f"message from {peer}: {p_load.sentence}")
What I am doing wrong? My purpose is to be able to send an arbitrary dict/json content from a peer to another
I'm using:
pyipv8 2.12.0
python: 3.11.6
Please tell me if you need further info.
Hi. Your code is a bit hard to read but I assume this is what you are trying to achieve:
import json
import os
from asyncio import run
from ipv8.community import Community, CommunitySettings
from ipv8.configuration import ConfigBuilder, WalkerDefinition, Strategy, default_bootstrap_defs
from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.payload_dataclass import dataclass
from ipv8.peer import Peer
from ipv8.util import run_forever
from ipv8_service import IPv8
@dataclass(msg_id=2)
class TextMessage:
dictionary: str
def fix_pack_dictionary(self, the_dictionary: dict) -> str:
return json.dumps(the_dictionary)
@classmethod
def fix_unpack_dictionary(cls,
serialized_dictionary: str) -> dict:
return json.loads(serialized_dictionary)
class MyCommunity(Community):
community_id = os.urandom(20)
def __init__(self, settings: CommunitySettings) -> None:
super().__init__(settings)
self.add_message_handler(TextMessage, self.on_message)
def send_message_to_all(self) -> None:
for peer in self.get_peers():
self.ez_send(peer, TextMessage({'dictionary': 'hello'}))
def started(self) -> None:
self.register_task("start_communication", self.send_message_to_all, interval=10.0, delay=0)
@lazy_wrapper(TextMessage)
def on_message(self, peer: Peer, p_load: TextMessage) -> None:
sentence = p_load.dictionary['dictionary']
print(f"message from {peer}: {sentence}")
async def start_communities() -> None:
for i in [1, 2, 3]:
builder = ConfigBuilder().clear_keys().clear_overlays()
builder.add_key("my peer", "medium", f"ec{i}.pem")
builder.add_overlay("MyCommunity", "my peer",
[WalkerDefinition(Strategy.RandomWalk,
10, {'timeout': 3.0})],
default_bootstrap_defs, {}, [('started',)])
await IPv8(builder.finalize(),
extra_communities={'MyCommunity': MyCommunity}).start()
await run_forever()
run(start_communities())
As a quick note: at delay=0
nobody will (probably) be connected yet, so you'll have to wait 10 seconds for the first messages to come in.
@qstokkink thank you very much, your complete example clarified better what I miss and what I did wrong, in mine example I tried to provide not the whole code and of course reading a segmented code is harder, I understand, and also the fact wasn't working and I tried some changes, may have complicated this.
What I missed:
- to register mine message handler :
self.add_message_handler(TextMessage, self.on_text_message)
- I changed the delay from 0 to 10 like you suggested me
- rewrote the handler to have also another method name (I don't know if it is a good thing if collides with lamport clock ones) and fix it to use your logic
thank you very much for this fine example, I think my problem basically was I forgot to register a new message handler.