diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 85371d458..0151cca5a 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -727,3 +727,40 @@ procSuite "Peer Manager": .build(), maxFailedAttempts = 5, storage = nil) + + asyncTest "colocationLimit is enforced by pruneConnsByIp()": + # Create 5 nodes + let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) + + # Start them with relay + filter + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) + + let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) + + # 2 in connections + discard await nodes[1].peerManager.connectRelay(pInfos[0]) + discard await nodes[2].peerManager.connectRelay(pInfos[0]) + + # 2 out connections + discard await nodes[0].peerManager.connectRelay(pInfos[3]) + discard await nodes[0].peerManager.connectRelay(pInfos[4]) + + # force max 1 conn per ip + nodes[0].peerManager.colocationLimit = 1 + nodes[0].peerManager.updateIpTable() + + # table is updated and we have 4 conns (2in 2out) + check: + nodes[0].peerManager.ipTable["127.0.0.1"].len == 4 + nodes[0].peerManager.switch.connManager.getConnections().len == 4 + nodes[0].peerManager.peerStore.peers().len == 4 + + await nodes[0].peerManager.pruneConnsByIp() + + # peers are pruned, max 1 conn per ip + nodes[0].peerManager.updateIpTable() + check: + nodes[0].peerManager.ipTable["127.0.0.1"].len == 1 + nodes[0].peerManager.switch.connManager.getConnections().len == 1 + nodes[0].peerManager.peerStore.peers().len == 1 diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 06b8a2cf0..d79400891 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -10,7 +10,8 @@ import chronicles, metrics, libp2p/multistream, - libp2p/muxers/muxer + libp2p/muxers/muxer, + libp2p/nameresolving/nameresolver import ../../../common/nimchronos, ../../waku_core, @@ -54,11 +55,14 @@ const # How often the peer store is pruned PrunePeerStoreInterval = chronos.minutes(5) - # How often the peer store is updated with metrics - UpdateMetricsInterval = chronos.seconds(15) + # How often metrics and logs are shown/updated + LogAndMetricsInterval = chronos.seconds(60) - # How often to log peer manager metrics - LogSummaryInterval = chronos.seconds(60) + # Prune by ip interval + PruneByIpInterval = chronos.seconds(30) + + # Max peers that we allow from the same IP + ColocationLimit = 5 type PeerManager* = ref object of RootObj @@ -70,6 +74,8 @@ type storage: PeerStorage serviceSlots*: Table[string, RemotePeerInfo] outPeersTarget*: int + ipTable*: Table[string, seq[PeerId]] + colocationLimit*: int started: bool proc protocolMatcher*(codec: string): Matcher = @@ -278,13 +284,8 @@ proc canBeConnected*(pm: PeerManager, # Initialisation # ################## -# currently disabled. note that peer connection state connected/disconnected -# cant be tracked using this handler when more than one conn is allowed and -# when using autonat. eg if a peer has 2 conns and one is disconnected we cant -# assume that the peer is disconnected, because the other one might still be active. -# note that even with maxconn = 1, autonat forces more than one connection. +# called when a connection i) is created or ii) is closed proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = - case event.kind of ConnEventKind.Connected: let direction = if event.incoming: Inbound else: Outbound @@ -292,30 +293,51 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = of ConnEventKind.Disconnected: discard +# called when a peer i) first connects to us ii) disconnects all connections from us proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = + var direction: PeerDirection + var connectedness: Connectedness + if event.kind == PeerEventKind.Joined: - let direction = if event.initiator: Outbound else: Inbound - pm.peerStore[ConnectionBook][peerId] = Connected - pm.peerStore[DirectionBook][peerId] = direction - - if not pm.storage.isNil: - pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected) - return - + direction = if event.initiator: Outbound else: Inbound + connectedness = Connected elif event.kind == PeerEventKind.Left: - pm.peerStore[DirectionBook][peerId] = UnknownDirection - pm.peerStore[ConnectionBook][peerId] = CanConnect + direction = UnknownDirection + connectedness = CanConnect + + pm.peerStore[ConnectionBook][peerId] = connectedness + pm.peerStore[DirectionBook][peerId] = direction + if not pm.storage.isNil: + pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), connectedness, getTime().toUnix) + +proc updateIpTable*(pm: PeerManager) = + # clean table + pm.ipTable = initTable[string, seq[PeerId]]() + + # populate ip->peerIds from existing out/in connections + for peerId, conn in pm.switch.connManager.getConnections(): + if conn.len == 0: + continue + + # we may want to enable it only in inbound peers + #if conn[0].connection.transportDir != In: + # continue + + # assumes just one physical connection per peer + let observedAddr = conn[0].connection.observedAddr + if observedAddr.isSome: + # TODO: think if circuit relay ips should be handled differently + let ip = observedAddr.get.getHostname() + pm.ipTable.mgetOrPut(ip, newSeq[PeerId]()).add(peerId) - if not pm.storage.isNil: - pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix) - return proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil, initialBackoffInSec = InitialBackoffInSec, backoffFactor = BackoffFactor, - maxFailedAttempts = MaxFailedAttempts,): PeerManager = + maxFailedAttempts = MaxFailedAttempts, + colocationLimit = ColocationLimit,): PeerManager = let capacity = switch.peerStore.capacity let maxConnections = switch.connManager.inSema.size @@ -338,7 +360,8 @@ proc new*(T: type PeerManager, initialBackoffInSec: initialBackoffInSec, backoffFactor: backoffFactor, outPeersTarget: max(maxConnections div 10, 10), - maxFailedAttempts: maxFailedAttempts) + maxFailedAttempts: maxFailedAttempts, + colocationLimit: colocationLimit) proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = onConnEvent(pm, peerId, event) @@ -360,6 +383,7 @@ proc new*(T: type PeerManager, pm.peerStore[AddressBook].addHandler(peerStoreChanged) pm.serviceSlots = initTable[string, RemotePeerInfo]() + pm.ipTable = initTable[string, seq[PeerId]]() if not storage.isNil(): debug "found persistent peer storage" @@ -520,6 +544,23 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = for p in inRelayPeers[0.. pm.colocationLimit: + let connsToPrune = peersInIp.len - pm.colocationLimit + for peerId in peersInIp[0..