diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 30dc4e405..75ae2d913 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -314,3 +314,64 @@ procSuite "Peer Manager": nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == Connected await allFutures(nodes.mapIt(it.stop())) + + asyncTest "Peer store keeps track of incoming connections": + # Create 4 nodes + var nodes: seq[WakuNode] + for i in 0..<4: + let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + let node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60865 + i)) + nodes &= node + + # Start them + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) + + # Get all peer infos + let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) + + # all nodes connect to peer 0 + discard await nodes[1].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds) + discard await nodes[2].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds) + discard await nodes[3].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds) + + check: + # Peerstore track all three peers + nodes[0].peerManager.peerStore.peers().len == 3 + + # Inbound/Outbound number of peers match + nodes[0].peerManager.peerStore.getPeersByDirection(Inbound).len == 3 + nodes[0].peerManager.peerStore.getPeersByDirection(Outbound).len == 0 + nodes[1].peerManager.peerStore.getPeersByDirection(Inbound).len == 0 + nodes[1].peerManager.peerStore.getPeersByDirection(Outbound).len == 1 + nodes[2].peerManager.peerStore.getPeersByDirection(Inbound).len == 0 + nodes[2].peerManager.peerStore.getPeersByDirection(Outbound).len == 1 + nodes[3].peerManager.peerStore.getPeersByDirection(Inbound).len == 0 + nodes[3].peerManager.peerStore.getPeersByDirection(Outbound).len == 1 + + # All peer ids are correct + nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].switch.peerInfo.peerId) + nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[2].switch.peerInfo.peerId) + nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[3].switch.peerInfo.peerId) + + # All peers support the relay protocol + nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(WakuRelayCodec) + nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(WakuRelayCodec) + nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(WakuRelayCodec) + + # All peers are connected + nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == Connected + nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == Connected + nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == Connected + + # All peers are Inbound in peer 0 + nodes[0].peerManager.peerStore[DirectionBook][nodes[1].switch.peerInfo.peerId] == Inbound + nodes[0].peerManager.peerStore[DirectionBook][nodes[2].switch.peerInfo.peerId] == Inbound + nodes[0].peerManager.peerStore[DirectionBook][nodes[3].switch.peerInfo.peerId] == Inbound + + # All peers have an Outbound connection with peer 0 + nodes[1].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == Outbound + nodes[2].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == Outbound + nodes[3].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == Outbound + + await allFutures(nodes.mapIt(it.stop())) diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index ca9f928de..2a5bc1aab 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -17,11 +17,10 @@ import export waku_peer_store, peer_storage, peers - declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"] declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] - +declarePublicGauge waku_connected_peers, "Number of connected peers per direction: inbound|outbound", ["direction"] logScope: topics = "waku node peer_manager" @@ -118,18 +117,22 @@ proc loadFromStorage(pm: PeerManager) = ################## proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = - if not pm.peerStore[AddressBook].contains(peerId): - ## We only consider connection events if we - ## already track some addresses for this peer - return case event.kind of ConnEventKind.Connected: + let direction = if event.incoming: Inbound else: Outbound pm.peerStore[ConnectionBook][peerId] = Connected + pm.peerStore[DirectionBook][peerId] = direction + + waku_connected_peers.inc(1, labelValues=[$direction]) + if not pm.storage.isNil: pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected) return of ConnEventKind.Disconnected: + waku_connected_peers.dec(1, labelValues=[$pm.peerStore[DirectionBook][peerId]]) + + pm.peerStore[DirectionBook][peerId] = UnknownDirection pm.peerStore[ConnectionBook][peerId] = CanConnect if not pm.storage.isNil: pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix) diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index 41e5da88d..646dca7cc 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -27,11 +27,16 @@ type Connected PeerOrigin* = enum - Unknown, + UnknownOrigin, Discv5, Static, Dns + Direction* = enum + UnknownDirection, + Inbound, + Outbound + # Keeps track of the Connectedness state of a peer ConnectionBook* = ref object of PeerBook[Connectedness] @@ -41,6 +46,9 @@ type # Keeps track of the origin of a peer SourceBook* = ref object of PeerBook[PeerOrigin] + # Direction + DirectionBook* = ref object of PeerBook[Direction] + StoredInfo* = object # Taken from nim-libp2 peerId*: PeerId @@ -54,6 +62,7 @@ type connectedness*: Connectedness disconnectTime*: int64 origin*: PeerOrigin + direction*: Direction ################## # Peer Store API # @@ -75,8 +84,10 @@ proc get*(peerStore: PeerStore, connectedness: peerStore[ConnectionBook][peerId], disconnectTime: peerStore[DisconnectBook][peerId], origin: peerStore[SourceBook][peerId], + direction: peerStore[DirectionBook][peerId], ) +# TODO: Rename peers() to getPeersByProtocol() proc peers*(peerStore: PeerStore): seq[StoredInfo] = ## Get all the stored information of every peer. let allKeys = concat(toSeq(peerStore[AddressBook].book.keys()), @@ -129,3 +140,6 @@ proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] = return some(peerStored.toRemotePeerInfo()) else: return none(RemotePeerInfo) + +proc getPeersByDirection*(peerStore: PeerStore, direction: Direction): seq[StoredInfo] = + return peerStore.peers().filterIt(it.direction == direction) diff --git a/waku/v2/node/wakuswitch.nim b/waku/v2/node/wakuswitch.nim index ca11abcf1..13e8cda5f 100644 --- a/waku/v2/node/wakuswitch.nim +++ b/waku/v2/node/wakuswitch.nim @@ -14,6 +14,9 @@ import libp2p/builders, libp2p/transports/[transport, tcptransport, wstransport] +# override nim-libp2p default value (which is also 1) +const MaxConnectionsPerPeer* = 1 + proc withWsTransport*(b: SwitchBuilder): SwitchBuilder = b.withTransport(proc(upgr: Upgrade): Transport = WsTransport.new(upgr))