diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index e5bd542be..9b6feb772 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -36,7 +36,7 @@ method subscribeTopic*(f: FloodSub, let peer = f.peers.getOrDefault(peerId) if peer == nil: - debug "subscribeTopic on a nil peer!" + debug "subscribeTopic on a nil peer!", peer = peerId return if topic notin f.floodsub: @@ -53,12 +53,15 @@ method subscribeTopic*(f: FloodSub, method handleDisconnect*(f: FloodSub, peer: PubSubPeer) = ## handle peer disconnects - for t in toSeq(f.floodsub.keys): - if t in f.floodsub: - f.floodsub[t].excl(peer) - + ## + procCall PubSub(f).handleDisconnect(peer) + if not(isNil(peer)) and peer.peerInfo notin f.conns: + for t in toSeq(f.floodsub.keys): + if t in f.floodsub: + f.floodsub[t].excl(peer) + method rpcHandler*(f: FloodSub, peer: PubSubPeer, rpcMsgs: seq[RPCMsg]) {.async.} = diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index f0418d294..373e94ab9 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -245,27 +245,31 @@ proc heartbeat(g: GossipSub) {.async.} = method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = ## handle peer disconnects + ## + procCall FloodSub(g).handleDisconnect(peer) - for t in toSeq(g.gossipsub.keys): - g.gossipsub.removePeer(t, peer) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.peers(t).int64, labelValues = [t]) + if not(isNil(peer)) and peer.peerInfo notin g.conns: + for t in toSeq(g.gossipsub.keys): + g.gossipsub.removePeer(t, peer) - for t in toSeq(g.mesh.keys): - g.mesh.removePeer(t, peer) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.peers(t).int64, labelValues = [t]) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(t).int64, labelValues = [t]) + for t in toSeq(g.mesh.keys): + g.mesh.removePeer(t, peer) - for t in toSeq(g.fanout.keys): - g.fanout.removePeer(t, peer) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(t).int64, labelValues = [t]) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(t).int64, labelValues = [t]) + for t in toSeq(g.fanout.keys): + g.fanout.removePeer(t, peer) + + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(t).int64, labelValues = [t]) method subscribePeer*(p: GossipSub, conn: Connection) = @@ -284,7 +288,7 @@ method subscribeTopic*(g: GossipSub, let peer = g.peers.getOrDefault(peerId) if peer == nil: - debug "subscribeTopic on a nil peer!" + # floodsub method logs a debug line already return if subscribe: diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 0fd75dffd..0415150a1 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -63,6 +63,7 @@ type method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = ## handle peer disconnects ## + if not(isNil(peer)) and peer.peerInfo notin p.conns: trace "deleting peer", peer = peer.id peer.onConnect.fire() # Make sure all pending sends are unblocked