libp2p/go-libp2p-kad-dht

Question: Understanding Peer Removal and Synchronization from Kademlia DHT Routing Table

teslashibe opened this issue · 7 comments

Hello,

I'm currently working with the github.com/libp2p/go-libp2p-kad-dht package and I have some unexpected behavior and I'm not sure what I am doing wrong. Specifically, I've observed that a peer is added to the DHT and then immediately removed. I've added logging to track when peers are added and removed, and I've also checked the usefulness of a peer before and after it's added to the routing table using the UsefulNewPeer function.

One hypothesis I have is that the peer is being removed because my node doesn't establish its own connection to the peer, even though it adds the peer to the routing table when a new stream is received from that peer. I've tried to mitigate this by manually establishing a connection to the peer when a new stream is received, but I'm still seeing the same behavior.

I am also not sure why the nodes don't synchronize their DHT so that they discover new nodes that join through the boot node.

I have a repository with just the code that is runnable to observe the behavior.

https://github.com/masa-finance/masa-node-kdht

I'm wondering if anyone has any insights into why a peer might be removed immediately after being added to the DHT. Are there any other conditions under which a peer might be considered "not useful" and removed from the routing table? Any advice or suggestions would be greatly appreciated.

Thank you!

I'm the Masa dev for this and will be tracking and POC. Thanks so much!

Added a console export here:

masa-node-kdht % ./masa-node-kdht /ip4/127.0.0.1/tcp/58598/p2p/16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf 
time="2023-11-16T16:03:03-08:00" level=info msg="arg size is 2"
time="2023-11-16T16:03:03-08:00" level=info msg="found arg: /ip4/127.0.0.1/tcp/58598/p2p/16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf"
time="2023-11-16T16:03:03-08:00" level=info msg="Starting node with ID: /ip4/127.0.0.1/tcp/58629/p2p/16Uiu2HAmLic5RYCrUHoFR4WaAM6kqu2JZHY5xiYyPsasznzs2WhC"
time="2023-11-16T16:03:03-08:00" level=info msg="Getting bootnodes from /ip4/127.0.0.1/tcp/58598/p2p/16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf"
time="2023-11-16T16:03:03-08:00" level=info msg="Peer added to DHT: 16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf"
time="2023-11-16T16:03:03-08:00" level=info msg="Successfully added bootstrap peer 16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf to DHT"
time="2023-11-16T16:03:03-08:00" level=info msg="Peer Event for:{{16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf: []} PeerAdded kdht}, Action:PeerAdded"
time="2023-11-16T16:03:03-08:00" level=info msg="Connection established with node:{16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf: [/ip4/127.0.0.1/tcp/58598]}"
kdht: readData: StreamHandler: writeData: Hello from /ip4/127.0.0.1/tcp/58598/p2p/16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf
> time="2023-11-16T16:03:03-08:00" level=info msg="Got a new stream!"
time="2023-11-16T16:03:03-08:00" level=warning msg="Failed to add peer 16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf to routing table"
time="2023-11-16T16:03:03-08:00" level=info msg="Is peer 16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf useful after adding: false"
time="2023-11-16T16:03:03-08:00" level=info msg="Routing table size: 1"
StreamHandler: readData: kdht: writeData: Hello from /ip4/127.0.0.1/tcp/58598/p2p/16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf
> kdht: readData: StreamHandler: writeData: Hello from /ip4/127.0.0.1/tcp/58598/p2p/16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf
> StreamHandler: readData: kdht: writeData: Hello from /ip4/127.0.0.1/tcp/58598/p2p/16Uiu2HAmRkmSZRN5d6fSz7VUk5FybiYUhFnYw9nqNapuNiR5iNjf

@teslashibe we don't do a connection open when we find a new peer, AFAIK there only is

err := dht.lookupCheck(livelinessCtx, p)
which execute a get closest peers:
peerids, err := dht.protoMessenger.GetClosestPeers(ctx, p, p)
which might open a new stream on the existing connection.

Your question does not make a lot of sense to me, the logs you show are from your peer, afait it's your routing table that isn't being added. This should be easy to solve since the logs and failure are only happening on your machine. (printf or delve)
The kbucket docs says:

A return value of false with error=nil indicates that the peer ALREADY exists in the Routing Table.

Could it be that ?

Or am I missing something here ?

Also I wouldn't go poke around the internal routing table buckets of the DHT, the go-libp2p-kad-dht already maintains peers using a go-libp2p notify, ReadOnly is fine, but I would not write, so if someone connects to you and support the same DHT protocol you do, you will add them in your routing table magically.

I guess that:

  • the connection is open
  • go-libp2p-kad-dht get new connection notification, add the peer in it's routing table
  • you then receive your stream, try to add the peer, but it returns added=false, err=nil because the peer is already in the routing table

That would explain Routing table size: 1 in your logs.

I am also not sure why the nodes don't synchronize their DHT so that they discover new nodes that join through the boot node.

This smell like an XY problem, what are you trying to do ? Whats your originally issue ?
If you need to call dht.RoutingTable() this is a failure of us.
go-libp2p-kad-dht should handle all your dht needs for you. (we use it once in kubo for the ipfs stats dht endpoint, this allows us to pretty-print the buckets for debugging purposes)

@Jorropo, thanks for the response here - we found the issue. I will have @restevens402 post the fix here so it can help the community. It was something simple that we missed in the documentation and found in examples. Thanks so much.

@Jorropo Thank you so much.
I was struggling to figure out why the DHT wasn't automatically updating with new peers as expected. I didn't know what it was that I didn't know. So in trying to figure it out, I added everything I could find to interact with the DHT and log it.

I figured it out Friday evening. Brendan shared the pubsub chat example from the example repo. It sets the ProtocolPrefix in the DHT options. I added a prefix value and everything started working beautifully. I was able to remove all of that code interacting with the DHT and found the internal loop that handles the synchronizing.

I did think it was interesting that it works perfectly with the prefix set. It looks like if not specified it will use the default "/ipfs". I'm not clear why it didn't work with the default, but I'm so happy it works now.