Fix subclass calls to handleDisconnect (#314)
* Fix subclass calls to handleDisconnect * add peer ID to nil peer debug message
This commit is contained in:
parent
5c986cf657
commit
9bbe5e4841
|
@ -36,7 +36,7 @@ method subscribeTopic*(f: FloodSub,
|
|||
|
||||
let peer = f.peers.getOrDefault(peerId)
|
||||
if peer == nil:
|
||||
debug "subscribeTopic on a nil peer!"
|
||||
debug "subscribeTopic on a nil peer!", peer = peerId
|
||||
return
|
||||
|
||||
if topic notin f.floodsub:
|
||||
|
@ -53,12 +53,15 @@ method subscribeTopic*(f: FloodSub,
|
|||
|
||||
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) =
|
||||
## handle peer disconnects
|
||||
for t in toSeq(f.floodsub.keys):
|
||||
if t in f.floodsub:
|
||||
f.floodsub[t].excl(peer)
|
||||
|
||||
##
|
||||
|
||||
procCall PubSub(f).handleDisconnect(peer)
|
||||
|
||||
if not(isNil(peer)) and peer.peerInfo notin f.conns:
|
||||
for t in toSeq(f.floodsub.keys):
|
||||
if t in f.floodsub:
|
||||
f.floodsub[t].excl(peer)
|
||||
|
||||
method rpcHandler*(f: FloodSub,
|
||||
peer: PubSubPeer,
|
||||
rpcMsgs: seq[RPCMsg]) {.async.} =
|
||||
|
|
|
@ -245,27 +245,31 @@ proc heartbeat(g: GossipSub) {.async.} =
|
|||
|
||||
method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
|
||||
## handle peer disconnects
|
||||
##
|
||||
|
||||
procCall FloodSub(g).handleDisconnect(peer)
|
||||
for t in toSeq(g.gossipsub.keys):
|
||||
g.gossipsub.removePeer(t, peer)
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub.peers(t).int64, labelValues = [t])
|
||||
if not(isNil(peer)) and peer.peerInfo notin g.conns:
|
||||
for t in toSeq(g.gossipsub.keys):
|
||||
g.gossipsub.removePeer(t, peer)
|
||||
|
||||
for t in toSeq(g.mesh.keys):
|
||||
g.mesh.removePeer(t, peer)
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub.peers(t).int64, labelValues = [t])
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_mesh
|
||||
.set(g.mesh.peers(t).int64, labelValues = [t])
|
||||
for t in toSeq(g.mesh.keys):
|
||||
g.mesh.removePeer(t, peer)
|
||||
|
||||
for t in toSeq(g.fanout.keys):
|
||||
g.fanout.removePeer(t, peer)
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_mesh
|
||||
.set(g.mesh.peers(t).int64, labelValues = [t])
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout.peers(t).int64, labelValues = [t])
|
||||
for t in toSeq(g.fanout.keys):
|
||||
g.fanout.removePeer(t, peer)
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout.peers(t).int64, labelValues = [t])
|
||||
|
||||
method subscribePeer*(p: GossipSub,
|
||||
conn: Connection) =
|
||||
|
@ -284,7 +288,7 @@ method subscribeTopic*(g: GossipSub,
|
|||
|
||||
let peer = g.peers.getOrDefault(peerId)
|
||||
if peer == nil:
|
||||
debug "subscribeTopic on a nil peer!"
|
||||
# floodsub method logs a debug line already
|
||||
return
|
||||
|
||||
if subscribe:
|
||||
|
|
|
@ -63,6 +63,7 @@ type
|
|||
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
|
||||
## handle peer disconnects
|
||||
##
|
||||
|
||||
if not(isNil(peer)) and peer.peerInfo notin p.conns:
|
||||
trace "deleting peer", peer = peer.id
|
||||
peer.onConnect.fire() # Make sure all pending sends are unblocked
|
||||
|
|
Loading…
Reference in New Issue