Tribler/py-ipv8

how to use custom messages, Custom serialization per message: on fix_unpack_dictionary empty serialized_dictionary

magowiz opened this issue · 2 comments

Hi,
I'm trying to extend the overlay tutorial adding a new kind of message, which should be a dictionary containing data, so I used the example provided here: https://py-ipv8.readthedocs.io/en/latest/reference/serialization.html#custom-serialization-per-message, and in particular the dataclass variant, so in a separate module I have:
`
import json
from ipv8.messaging.payload_dataclass import dataclass

@DataClass(msg_id=2)
class TextMessage:
dictionary: str

def fix_pack_dictionary(self, the_dictionary: dict) -> str:
    return json.dumps(the_dictionary)

@classmethod
def fix_unpack_dictionary(cls,
                          serialized_dictionary: str) -> dict:
    return json.loads(serialized_dictionary)

In main module I implemented another method to send this kind of messages:
async def send_message_to_all(self) -> None:
for peer in self.get_peers():
sentence = gen_random_sentence()
sentence = {'dictionary': 'hello'}
self.ez_send(peer, TextMessage(sentence))
`

which I call here, in started method of community:
`
def started(self) -> None:
async def start_communication() -> None:
if not self.lamport_clock:
for p in self.get_peers():
self.ez_send(p, MyMessage(self.lamport_clock))
else:
self.cancel_pending_task("start_communication")
await self.send_message_to_all()

    self.register_task("start_communication", start_communication, interval=10.0, delay=0)

`

the problem is that when fix_unpack_dictionary gets called from mine TextMessage class, I always receive an empty string as serialized_dictionary parameter` and so a JSONDecodeError occurs.

I also show you mine on_message handler
@lazy_wrapper(TextMessage) def on_message(self, peer: Peer, p_load: TextMessage) -> None: if p_load.msg_id == 2: sentence = p_load.sentence.decode('utf-8') print(sentence) print(f"message from {peer}: {p_load.sentence}")

What I am doing wrong? My purpose is to be able to send an arbitrary dict/json content from a peer to another

I'm using:
pyipv8 2.12.0
python: 3.11.6

Please tell me if you need further info.

Hi. Your code is a bit hard to read but I assume this is what you are trying to achieve:

import json
import os
from asyncio import run

from ipv8.community import Community, CommunitySettings
from ipv8.configuration import ConfigBuilder, WalkerDefinition, Strategy, default_bootstrap_defs
from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.payload_dataclass import dataclass
from ipv8.peer import Peer
from ipv8.util import run_forever
from ipv8_service import IPv8


@dataclass(msg_id=2)
class TextMessage:
    dictionary: str

    def fix_pack_dictionary(self, the_dictionary: dict) -> str:
        return json.dumps(the_dictionary)

    @classmethod
    def fix_unpack_dictionary(cls,
                              serialized_dictionary: str) -> dict:
        return json.loads(serialized_dictionary)


class MyCommunity(Community):
    community_id = os.urandom(20)


    def __init__(self, settings: CommunitySettings) -> None:
        super().__init__(settings)
        self.add_message_handler(TextMessage, self.on_message)

    def send_message_to_all(self) -> None:
        for peer in self.get_peers():
            self.ez_send(peer, TextMessage({'dictionary': 'hello'}))

    def started(self) -> None:
        self.register_task("start_communication", self.send_message_to_all, interval=10.0, delay=0)

    @lazy_wrapper(TextMessage)
    def on_message(self, peer: Peer, p_load: TextMessage) -> None:
        sentence = p_load.dictionary['dictionary']
        print(f"message from {peer}: {sentence}")


async def start_communities() -> None:
    for i in [1, 2, 3]:
        builder = ConfigBuilder().clear_keys().clear_overlays()
        builder.add_key("my peer", "medium", f"ec{i}.pem")
        builder.add_overlay("MyCommunity", "my peer",
                            [WalkerDefinition(Strategy.RandomWalk,
                                              10, {'timeout': 3.0})],
                            default_bootstrap_defs, {}, [('started',)])
        await IPv8(builder.finalize(),
                   extra_communities={'MyCommunity': MyCommunity}).start()
    await run_forever()

run(start_communities())

As a quick note: at delay=0 nobody will (probably) be connected yet, so you'll have to wait 10 seconds for the first messages to come in.

@qstokkink thank you very much, your complete example clarified better what I miss and what I did wrong, in mine example I tried to provide not the whole code and of course reading a segmented code is harder, I understand, and also the fact wasn't working and I tried some changes, may have complicated this.

What I missed:

  • to register mine message handler : self.add_message_handler(TextMessage, self.on_text_message)
  • I changed the delay from 0 to 10 like you suggested me
  • rewrote the handler to have also another method name (I don't know if it is a good thing if collides with lamport clock ones) and fix it to use your logic

thank you very much for this fine example, I think my problem basically was I forgot to register a new message handler.