mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-28 15:00:57 +00:00
chore(networking): disconnect due to colocation ip in conn handler (#1821)
This commit is contained in:
parent
add294a9b1
commit
e12c979c4e
@ -743,29 +743,28 @@ procSuite "Peer Manager":
|
||||
|
||||
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
# force max 1 conn per ip
|
||||
nodes[0].peerManager.colocationLimit = 1
|
||||
|
||||
# 2 in connections
|
||||
discard await nodes[1].peerManager.connectRelay(pInfos[0])
|
||||
discard await nodes[2].peerManager.connectRelay(pInfos[0])
|
||||
|
||||
# but one is pruned
|
||||
check nodes[0].peerManager.switch.connManager.getConnections().len == 1
|
||||
|
||||
# 2 out connections
|
||||
discard await nodes[0].peerManager.connectRelay(pInfos[3])
|
||||
discard await nodes[0].peerManager.connectRelay(pInfos[4])
|
||||
|
||||
# force max 1 conn per ip
|
||||
nodes[0].peerManager.colocationLimit = 1
|
||||
nodes[0].peerManager.updateIpTable()
|
||||
# they are also prunned
|
||||
check nodes[0].peerManager.switch.connManager.getConnections().len == 1
|
||||
|
||||
# table is updated and we have 4 conns (2in 2out)
|
||||
check:
|
||||
nodes[0].peerManager.ipTable["127.0.0.1"].len == 4
|
||||
nodes[0].peerManager.switch.connManager.getConnections().len == 4
|
||||
nodes[0].peerManager.peerStore.peers().len == 4
|
||||
|
||||
await nodes[0].peerManager.pruneConnsByIp()
|
||||
|
||||
# peers are pruned, max 1 conn per ip
|
||||
nodes[0].peerManager.updateIpTable()
|
||||
# we should have 4 peers (2in/2out) but due to collocation limit
|
||||
# they are pruned to max 1
|
||||
check:
|
||||
nodes[0].peerManager.ipTable["127.0.0.1"].len == 1
|
||||
nodes[0].peerManager.switch.connManager.getConnections().len == 1
|
||||
nodes[0].peerManager.peerStore.peers().len == 1
|
||||
|
||||
await allFutures(nodes.mapIt(it.stop()))
|
||||
|
@ -58,9 +58,6 @@ const
|
||||
# How often metrics and logs are shown/updated
|
||||
LogAndMetricsInterval = chronos.seconds(60)
|
||||
|
||||
# Prune by ip interval
|
||||
PruneByIpInterval = chronos.seconds(30)
|
||||
|
||||
# Max peers that we allow from the same IP
|
||||
ColocationLimit = 5
|
||||
|
||||
@ -285,6 +282,18 @@ proc canBeConnected*(pm: PeerManager,
|
||||
# Initialisation #
|
||||
##################
|
||||
|
||||
proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
|
||||
if pm.switch.connManager.getConnections().hasKey(peerId):
|
||||
let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
|
||||
if conns.len != 0:
|
||||
let observedAddr = conns[0].connection.observedAddr
|
||||
let ip = observedAddr.get.getHostname()
|
||||
if observedAddr.isSome:
|
||||
# TODO: think if circuit relay ips should be handled differently
|
||||
let ip = observedAddr.get.getHostname()
|
||||
return some(ip)
|
||||
return none(string)
|
||||
|
||||
# called when a connection i) is created or ii) is closed
|
||||
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
||||
case event.kind
|
||||
@ -302,36 +311,36 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
||||
if event.kind == PeerEventKind.Joined:
|
||||
direction = if event.initiator: Outbound else: Inbound
|
||||
connectedness = Connected
|
||||
|
||||
let ip = pm.getPeerIp(peerId)
|
||||
if ip.isSome:
|
||||
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
|
||||
|
||||
let peersBehindIp = pm.ipTable[ip.get]
|
||||
if peersBehindIp.len > pm.colocationLimit:
|
||||
# in theory this should always be one, but just in case
|
||||
for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]:
|
||||
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
|
||||
asyncSpawn(pm.switch.disconnect(peerId))
|
||||
pm.peerStore.delete(peerId)
|
||||
|
||||
elif event.kind == PeerEventKind.Left:
|
||||
direction = UnknownDirection
|
||||
connectedness = CanConnect
|
||||
|
||||
# note we cant access the peerId ip here as the connection was already closed
|
||||
for ip, peerIds in pm.ipTable.pairs:
|
||||
if peerIds.contains(peerId):
|
||||
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
|
||||
if pm.ipTable[ip].len == 0:
|
||||
pm.ipTable.del(ip)
|
||||
break
|
||||
|
||||
pm.peerStore[ConnectionBook][peerId] = connectedness
|
||||
pm.peerStore[DirectionBook][peerId] = direction
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), connectedness, getTime().toUnix)
|
||||
|
||||
proc updateIpTable*(pm: PeerManager) =
|
||||
# clean table
|
||||
pm.ipTable = initTable[string, seq[PeerId]]()
|
||||
|
||||
# populate ip->peerIds from existing out/in connections
|
||||
for peerId, conn in pm.switch.connManager.getConnections():
|
||||
if conn.len == 0:
|
||||
continue
|
||||
|
||||
# we may want to enable it only in inbound peers
|
||||
#if conn[0].connection.transportDir != In:
|
||||
# continue
|
||||
|
||||
# assumes just one physical connection per peer
|
||||
let observedAddr = conn[0].connection.observedAddr
|
||||
if observedAddr.isSome:
|
||||
# TODO: think if circuit relay ips should be handled differently
|
||||
let ip = observedAddr.get.getHostname()
|
||||
pm.ipTable.mgetOrPut(ip, newSeq[PeerId]()).add(peerId)
|
||||
|
||||
|
||||
proc new*(T: type PeerManager,
|
||||
switch: Switch,
|
||||
maxRelayPeers: int = 50,
|
||||
@ -556,24 +565,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
|
||||
let connsToPrune = min(amount, inRelayPeers.len)
|
||||
|
||||
for p in inRelayPeers[0..<connsToPrune]:
|
||||
await pm.switch.disconnect(p)
|
||||
|
||||
proc pruneConnsByIp*(pm: PeerManager) {.async.} =
|
||||
## prunes connections based on ip colocation, allowing no more
|
||||
## than ColocationLimit inbound connections from same ip
|
||||
##
|
||||
|
||||
# update the table tracking ip and the connected peers
|
||||
pm.updateIpTable()
|
||||
|
||||
# trigger disconnections based on colocationLimit
|
||||
for ip, peersInIp in pm.ipTable.pairs:
|
||||
if peersInIp.len > pm.colocationLimit:
|
||||
let connsToPrune = peersInIp.len - pm.colocationLimit
|
||||
for peerId in peersInIp[0..<connsToPrune]:
|
||||
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
|
||||
await pm.switch.disconnect(peerId)
|
||||
pm.peerStore.delete(peerId)
|
||||
asyncSpawn(pm.switch.disconnect(p))
|
||||
|
||||
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
||||
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||
@ -658,12 +650,6 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
|
||||
debug "No peer found for protocol", protocol=proto
|
||||
return none(RemotePeerInfo)
|
||||
|
||||
proc pruneConnsByIpLoop(pm: PeerManager) {.async.} =
|
||||
debug "Starting prune peer by ip loop"
|
||||
while pm.started:
|
||||
await pm.pruneConnsByIp()
|
||||
await sleepAsync(PruneByIpInterval)
|
||||
|
||||
# Prunes peers from peerstore to remove old/stale ones
|
||||
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
|
||||
debug "Starting prune peerstore loop"
|
||||
@ -709,7 +695,6 @@ proc start*(pm: PeerManager) =
|
||||
pm.started = true
|
||||
asyncSpawn pm.relayConnectivityLoop()
|
||||
asyncSpawn pm.prunePeerStoreLoop()
|
||||
asyncSpawn pm.pruneConnsByIpLoop()
|
||||
asyncSpawn pm.logAndMetrics()
|
||||
|
||||
proc stop*(pm: PeerManager) =
|
||||
|
Loading…
x
Reference in New Issue
Block a user