diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index f3ab93118..f6f26f5af 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -743,29 +743,28 @@ procSuite "Peer Manager": let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) + # force max 1 conn per ip + nodes[0].peerManager.colocationLimit = 1 + # 2 in connections discard await nodes[1].peerManager.connectRelay(pInfos[0]) discard await nodes[2].peerManager.connectRelay(pInfos[0]) + # but one is pruned + check nodes[0].peerManager.switch.connManager.getConnections().len == 1 + # 2 out connections discard await nodes[0].peerManager.connectRelay(pInfos[3]) discard await nodes[0].peerManager.connectRelay(pInfos[4]) - # force max 1 conn per ip - nodes[0].peerManager.colocationLimit = 1 - nodes[0].peerManager.updateIpTable() + # they are also prunned  + check nodes[0].peerManager.switch.connManager.getConnections().len == 1 - # table is updated and we have 4 conns (2in 2out) - check: - nodes[0].peerManager.ipTable["127.0.0.1"].len == 4 - nodes[0].peerManager.switch.connManager.getConnections().len == 4 - nodes[0].peerManager.peerStore.peers().len == 4 - - await nodes[0].peerManager.pruneConnsByIp() - - # peers are pruned, max 1 conn per ip - nodes[0].peerManager.updateIpTable() + # we should have 4 peers (2in/2out) but due to collocation limit + # they are pruned to max 1 check: nodes[0].peerManager.ipTable["127.0.0.1"].len == 1 nodes[0].peerManager.switch.connManager.getConnections().len == 1 nodes[0].peerManager.peerStore.peers().len == 1 + + await allFutures(nodes.mapIt(it.stop())) diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index dfc626f14..6289d8071 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -58,9 +58,6 @@ const # How often metrics and logs are shown/updated LogAndMetricsInterval = chronos.seconds(60) - # Prune by ip interval - PruneByIpInterval = chronos.seconds(30) - # Max peers that we allow from the same IP ColocationLimit = 5 @@ -285,6 +282,18 @@ proc canBeConnected*(pm: PeerManager, # Initialisation # ################## +proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] = + if pm.switch.connManager.getConnections().hasKey(peerId): + let conns = pm.switch.connManager.getConnections().getOrDefault(peerId) + if conns.len != 0: + let observedAddr = conns[0].connection.observedAddr + let ip = observedAddr.get.getHostname() + if observedAddr.isSome: + # TODO: think if circuit relay ips should be handled differently + let ip = observedAddr.get.getHostname() + return some(ip) + return none(string) + # called when a connection i) is created or ii) is closed proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = case event.kind @@ -302,36 +311,36 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: direction = if event.initiator: Outbound else: Inbound connectedness = Connected + + let ip = pm.getPeerIp(peerId) + if ip.isSome: + pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) + + let peersBehindIp = pm.ipTable[ip.get] + if peersBehindIp.len > pm.colocationLimit: + # in theory this should always be one, but just in case + for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]: + debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip + asyncSpawn(pm.switch.disconnect(peerId)) + pm.peerStore.delete(peerId) + elif event.kind == PeerEventKind.Left: direction = UnknownDirection connectedness = CanConnect + # note we cant access the peerId ip here as the connection was already closed + for ip, peerIds in pm.ipTable.pairs: + if peerIds.contains(peerId): + pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId) + if pm.ipTable[ip].len == 0: + pm.ipTable.del(ip) + break + pm.peerStore[ConnectionBook][peerId] = connectedness pm.peerStore[DirectionBook][peerId] = direction if not pm.storage.isNil: pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), connectedness, getTime().toUnix) -proc updateIpTable*(pm: PeerManager) = - # clean table - pm.ipTable = initTable[string, seq[PeerId]]() - - # populate ip->peerIds from existing out/in connections - for peerId, conn in pm.switch.connManager.getConnections(): - if conn.len == 0: - continue - - # we may want to enable it only in inbound peers - #if conn[0].connection.transportDir != In: - # continue - - # assumes just one physical connection per peer - let observedAddr = conn[0].connection.observedAddr - if observedAddr.isSome: - # TODO: think if circuit relay ips should be handled differently - let ip = observedAddr.get.getHostname() - pm.ipTable.mgetOrPut(ip, newSeq[PeerId]()).add(peerId) - - proc new*(T: type PeerManager, switch: Switch, maxRelayPeers: int = 50, @@ -556,24 +565,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = let connsToPrune = min(amount, inRelayPeers.len) for p in inRelayPeers[0.. pm.colocationLimit: - let connsToPrune = peersInIp.len - pm.colocationLimit - for peerId in peersInIp[0..