diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index e2fe3a57c..b543228fb 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -30,6 +30,7 @@ import ./v2/test_waku_filter, ./v2/test_wakunode_filter, ./v2/test_waku_peer_exchange, + ./v2/test_peer_store_extended, ./v2/test_waku_payload, ./v2/test_waku_swap, ./v2/test_utils_peers, diff --git a/tests/v2/test_peer_store_extended.nim b/tests/v2/test_peer_store_extended.nim index 2fd8d2567..431a51ca5 100644 --- a/tests/v2/test_peer_store_extended.nim +++ b/tests/v2/test_peer_store_extended.nim @@ -44,6 +44,7 @@ suite "Extended nim-libp2p Peer Store": peerStore[ConnectionBook][p1] = Connected peerStore[DisconnectBook][p1] = 0 peerStore[SourceBook][p1] = Discv5 + peerStore[DirectionBook][p1] = Inbound # Peer2: Connected peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()] @@ -54,6 +55,7 @@ suite "Extended nim-libp2p Peer Store": peerStore[ConnectionBook][p2] = Connected peerStore[DisconnectBook][p2] = 0 peerStore[SourceBook][p2] = Discv5 + peerStore[DirectionBook][p2] = Inbound # Peer3: Connected peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()] @@ -64,6 +66,7 @@ suite "Extended nim-libp2p Peer Store": peerStore[ConnectionBook][p3] = Connected peerStore[DisconnectBook][p3] = 0 peerStore[SourceBook][p3] = Discv5 + peerStore[DirectionBook][p3] = Inbound # Peer4: Added but never connected peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()] @@ -74,6 +77,7 @@ suite "Extended nim-libp2p Peer Store": peerStore[ConnectionBook][p4] = NotConnected peerStore[DisconnectBook][p4] = 0 peerStore[SourceBook][p4] = Discv5 + peerStore[DirectionBook][p4] = Inbound # Peer5: Connecteed in the past peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()] @@ -84,6 +88,7 @@ suite "Extended nim-libp2p Peer Store": peerStore[ConnectionBook][p5] = CanConnect peerStore[DisconnectBook][p5] = 1000 peerStore[SourceBook][p5] = Discv5 + peerStore[DirectionBook][p5] = Outbound test "get() returns the correct StoredInfo for a given PeerId": # When @@ -113,7 +118,7 @@ suite "Extended nim-libp2p Peer Store": storedInfoPeer6.protoVersion == "" storedInfoPeer6.connectedness == NotConnected storedInfoPeer6.disconnectTime == 0 - storedInfoPeer6.origin == Unknown + storedInfoPeer6.origin == UnknownOrigin test "peers() returns all StoredInfo of the PeerStore": # When @@ -254,3 +259,24 @@ suite "Extended nim-libp2p Peer Store": swapPeer.isSome() swapPeer.get().peerId == p5 swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] + + test "getPeersByDirection()": + # When + let inPeers = peerStore.getPeersByDirection(Inbound) + let outPeers = peerStore.getPeersByDirection(Outbound) + + # Then + check: + inPeers.len == 4 + outPeers.len == 1 + + test "getNotConnectedPeers()": + # When + let disconnedtedPeers = peerStore.getNotConnectedPeers() + + # Then + check: + disconnedtedPeers.len == 2 + disconnedtedPeers.anyIt(it.peerId == p4) + disconnedtedPeers.anyIt(it.peerId == p5) + not disconnedtedPeers.anyIt(it.connectedness == Connected) diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 68f69e65a..087962b59 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -5,13 +5,14 @@ else: import - std/[options, sets, sequtils, times], + std/[options, sets, sequtils, times, random], chronos, chronicles, metrics, libp2p/multistream import ../../utils/peers, + ../../waku/v2/protocol/waku_relay, ./peer_store/peer_storage, ./waku_peer_store @@ -36,6 +37,12 @@ const # TODO: Make configurable DefaultDialTimeout = chronos.seconds(10) + # limit the amount of paralel dials + MaxParalelDials = 10 + + # delay between consecutive relayConnectivityLoop runs + ConnectivityLoopInterval = chronos.seconds(30) + #################### # Helper functions # #################### @@ -179,19 +186,21 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = # Do not attempt to manage our unmanageable self return - debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto - - # ...known addresses - for multiaddr in remotePeerInfo.addrs: - pm.peerStore[AddressBook][remotePeerInfo.peerId] = pm.peerStore[AddressBook][remotePeerInfo.peerId] & multiaddr - # ...public key var publicKey: PublicKey discard remotePeerInfo.peerId.extractPublicKey(publicKey) + if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and + pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey: + # Peer already managed + return + + debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs, proto = proto + + pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey - # nim-libp2p identify overrides this + # TODO: Remove this once service slots is ready pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto # Add peer to storage. Entry will subsequently be updated with connectedness information @@ -284,3 +293,34 @@ proc connectToNodes*(pm: PeerManager, # This issue was known to Dmitiry on nim-libp2p and may be resolvable # later. await sleepAsync(chronos.seconds(5)) + +# Ensures a healthy amount of connected relay peers +proc relayConnectivityLoop*(pm: PeerManager) {.async.} = + while true: + + let maxConnections = pm.switch.connManager.inSema.size + let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len + let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len + let numConPeers = numInPeers + numOutPeers + + # TODO: Enforce a given in/out peers ratio + + # Leave some room for service peers + if numConPeers >= (maxConnections - 5): + await sleepAsync(ConnectivityLoopInterval) + continue + + # TODO: Respect backoff before attempting to connect a relay peer + var disconnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) + shuffle(disconnectedPeers) + + let numPeersToConnect = min(min(maxConnections - numConPeers, disconnectedPeers.len), MaxParalelDials) + + info "Relay connectivity loop", + connectedPeers = numConPeers, + targetConnectedPeers = maxConnections, + availableDisconnectedPeers = disconnectedPeers.len + + await pm.connectToNodes(disconnectedPeers[0.. 0: - debug "Connecting to newly discovered peers", count=newPeers.len() - await node.connectToNodes(newPeers, "discv5") + for peer in discoveredPeers.get(): + # TODO: proto: WakuRelayCodec will be removed from add peer + node.peerManager.addPeer(peer, WakuRelayCodec) # Discovery `queryRandom` can have a synchronous fast path for example # when no peers are in the routing table. Don't run it in continuous loop.