avoid leaking in peersInIP, don't depend on sendConn
This commit is contained in:
parent
67d0926e89
commit
7b2727d930
|
@ -196,11 +196,11 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
|
|||
return
|
||||
|
||||
# remove from peer IPs collection too
|
||||
if pubSubPeer.sendConn != nil:
|
||||
g.peersInIP.withValue(pubSubPeer.sendConn.observedAddr, s):
|
||||
if pubSubPeer.address.isSome():
|
||||
g.peersInIP.withValue(pubSubPeer.address.get(), s):
|
||||
s[].excl(pubSubPeer.peerId)
|
||||
if s[].len == 0:
|
||||
g.peersInIP.del(pubSubPeer.sendConn.observedAddr)
|
||||
g.peersInIP.del(pubSubPeer.address.get())
|
||||
|
||||
for t in toSeq(g.gossipsub.keys):
|
||||
g.gossipsub.removePeer(t, pubSubPeer)
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import std/[tables, strutils, sets, algorithm]
|
||||
import std/[tables, strutils, sets, algorithm, options]
|
||||
import chronos, chronicles, metrics
|
||||
import "."/[types]
|
||||
import ".."/[pubsubpeer]
|
||||
|
@ -39,18 +39,15 @@ func `/`(a, b: Duration): float64 =
|
|||
func byScore*(x,y: PubSubPeer): int = system.cmp(x.score, y.score)
|
||||
|
||||
proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
|
||||
if peer.sendConn == nil:
|
||||
trace "colocationFactor, no connection", peer
|
||||
if peer.address.isNone():
|
||||
0.0
|
||||
else:
|
||||
let
|
||||
address = peer.sendConn.observedAddr
|
||||
|
||||
address = peer.address.get()
|
||||
g.peersInIP.mgetOrPut(address, initHashSet[PeerID]()).incl(peer.peerId)
|
||||
let
|
||||
ipPeers = g.peersInIP[address]
|
||||
len = ipPeers.len.float64
|
||||
|
||||
if len > g.parameters.ipColocationFactorThreshold:
|
||||
trace "colocationFactor over threshold", peer, address, len
|
||||
let over = len - g.parameters.ipColocationFactorThreshold
|
||||
|
|
|
@ -48,6 +48,7 @@ type
|
|||
onEvent*: OnEvent # Connectivity updates for peer
|
||||
codec*: string # the protocol that this peer joined from
|
||||
sendConn*: Connection # cached send connection
|
||||
address*: Option[MultiAddress]
|
||||
peerId*: PeerID
|
||||
handler*: RPCHandler
|
||||
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
|
||||
|
@ -159,6 +160,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
|
|||
|
||||
trace "Get new send connection", p, newConn
|
||||
p.sendConn = newConn
|
||||
p.address = some(p.sendConn.observedAddr)
|
||||
|
||||
if p.onEvent != nil:
|
||||
p.onEvent(p, PubsubPeerEvent(kind: PubSubPeerEventKind.Connected))
|
||||
|
@ -170,6 +172,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
|
|||
await p.sendConn.close()
|
||||
|
||||
p.sendConn = nil
|
||||
p.address = none(MultiAddress)
|
||||
if p.onEvent != nil:
|
||||
p.onEvent(p, PubsubPeerEvent(kind: PubSubPeerEventKind.Disconnected))
|
||||
|
||||
|
|
Loading…
Reference in New Issue