diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 21edc6633..4b58f5905 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -40,6 +40,9 @@ import ../../waku/v2/protocol/waku_archive/retention_policy, ../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity, ../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_time, + ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_filter, + ../../waku/v2/protocol/waku_lightpush, ../../waku/v2/protocol/waku_peer_exchange, ../../waku/v2/utils/peers, ../../waku/v2/utils/wakuenr, @@ -401,7 +404,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, if conf.storenode != "": try: mountStoreClient(node) - setStorePeer(node, conf.storenode) + let storenode = parseRemotePeerInfo(conf.storenode) + node.peerManager.addServicePeer(storenode, WakuStoreCodec) except: return err("failed to set node waku store peer: " & getCurrentExceptionMsg()) @@ -415,7 +419,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, if conf.lightpushnode != "": try: mountLightPushClient(node) - setLightPushPeer(node, conf.lightpushnode) + let lightpushnode = parseRemotePeerInfo(conf.lightpushnode) + node.peerManager.addServicePeer(lightpushnode, WakuLightPushCodec) except: return err("failed to set node waku lightpush peer: " & getCurrentExceptionMsg()) @@ -429,7 +434,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, if conf.filternode != "": try: await mountFilterClient(node) - setFilterPeer(node, conf.filternode) + let filternode = parseRemotePeerInfo(conf.filternode) + node.peerManager.addServicePeer(filternode, WakuFilterCodec) except: return err("failed to set node waku filter peer: " & getCurrentExceptionMsg()) @@ -442,7 +448,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, if conf.peerExchangeNode != "": try: - setPeerExchangePeer(node, conf.peerExchangeNode) + let peerExchangeNode = parseRemotePeerInfo(conf.peerExchangeNode) + node.peerManager.addServicePeer(peerExchangeNode, WakuPeerExchangeCodec) except: return err("failed to set node waku peer-exchange peer: " & getCurrentExceptionMsg()) @@ -498,6 +505,10 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, if conf.keepAlive: node.startKeepalive() + # Maintain relay connections + if conf.relay: + node.peerManager.start() + return ok() when defined(waku_exp_store_resume): diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 9d85cf06b..efbb593a1 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -15,7 +15,8 @@ import libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, libp2p/protocols/pubsub/pubsub, - libp2p/protocols/pubsub/rpc/message + libp2p/protocols/pubsub/rpc/message, + libp2p/builders import ../../waku/common/sqlite, ../../waku/v2/node/peer_manager/peer_manager, @@ -24,6 +25,8 @@ import ../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_filter, + ../../waku/v2/protocol/waku_lightpush, + ../../waku/v2/protocol/waku_peer_exchange, ../../waku/v2/protocol/waku_swap/waku_swap, ../test_helpers, ./testlib/testutils @@ -401,3 +404,93 @@ procSuite "Peer Manager": nodes[3].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == Outbound await allFutures(nodes.mapIt(it.stop())) + + asyncTest "Peer store addServicePeer() stores service peers": + # Valid peer id missing the last digit + let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D" + + let + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60932)) + peer1 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & "1") + peer2 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30301/p2p/" & basePeerId & "2") + peer3 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30302/p2p/" & basePeerId & "3") + peer4 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30303/p2p/" & basePeerId & "4") + peer5 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30303/p2p/" & basePeerId & "5") + + # service peers + node.peerManager.addServicePeer(peer1, WakuStoreCodec) + node.peerManager.addServicePeer(peer2, WakuFilterCodec) + node.peerManager.addServicePeer(peer3, WakuLightPushCodec) + node.peerManager.addServicePeer(peer4, WakuPeerExchangeCodec) + + # relay peers (should not be added) + node.peerManager.addServicePeer(peer5, WakuRelayCodec) + + # all peers are stored in the peerstore + check: + node.peerManager.peerStore.peers().anyIt(it.peerId == peer1.peerId) + node.peerManager.peerStore.peers().anyIt(it.peerId == peer2.peerId) + node.peerManager.peerStore.peers().anyIt(it.peerId == peer3.peerId) + node.peerManager.peerStore.peers().anyIt(it.peerId == peer4.peerId) + + # but the relay peer is not + node.peerManager.peerStore.peers().anyIt(it.peerId == peer5.peerId) == false + + # all service peers are added to its service slot + check: + node.peerManager.serviceSlots[WakuStoreCodec].peerId == peer1.peerId + node.peerManager.serviceSlots[WakuFilterCodec].peerId == peer2.peerId + node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peer3.peerId + node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peer4.peerId + + # but the relay peer is not + node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false + + test "selectPeer() returns the correct peer": + # Valid peer id missing the last digit + let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D" + + # Create peer manager + let pm = PeerManager.new( + switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise().build(), + storage = nil) + + # Create 3 peer infos + let peers = toSeq(1..3).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it)) + + # Add a peer[0] to the peerstore + pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs + pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterCodec] + + # When no service peers, we get one from the peerstore + let selectedPeer1 = pm.selectPeer(WakuStoreCodec) + check: + selectedPeer1.isSome() == true + selectedPeer1.get().peerId == peers[0].peerId + + # Same for other protocol + let selectedPeer2 = pm.selectPeer(WakuFilterCodec) + check: + selectedPeer2.isSome() == true + selectedPeer2.get().peerId == peers[0].peerId + + # And return none if we dont have any peer for that protocol + let selectedPeer3 = pm.selectPeer(WakuLightPushCodec) + check: + selectedPeer3.isSome() == false + + # Now we add service peers for different protocols peer[1..3] + pm.addServicePeer(peers[1], WakuStoreCodec) + pm.addServicePeer(peers[2], WakuLightPushCodec) + + # We no longer get one from the peerstore. Slots are being used instead. + let selectedPeer4 = pm.selectPeer(WakuStoreCodec) + check: + selectedPeer4.isSome() == true + selectedPeer4.get().peerId == peers[1].peerId + + let selectedPeer5 = pm.selectPeer(WakuLightPushCodec) + check: + selectedPeer5.isSome() == true + selectedPeer5.get().peerId == peers[2].peerId diff --git a/tests/v2/test_peer_store_extended.nim b/tests/v2/test_peer_store_extended.nim index 708f3919b..c4618fca2 100644 --- a/tests/v2/test_peer_store_extended.nim +++ b/tests/v2/test_peer_store_extended.nim @@ -267,16 +267,6 @@ suite "Extended nim-libp2p Peer Store": peerStore.hasPeers(protocolMatcher("/vac/waku/store/2.0.0")) not peerStore.hasPeers(protocolMatcher("/vac/waku/does-not-exist/2.0.0")) - test "selectPeer() returns if a peer supports a given protocol": - # When - let swapPeer = peerStore.selectPeer("/vac/waku/swap/2.0.0") - - # Then - check: - swapPeer.isSome() - swapPeer.get().peerId == p5 - swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] - test "getPeersByDirection()": # When let inPeers = peerStore.getPeersByDirection(Inbound) diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index ccea0e23b..cb027c8e4 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -13,7 +13,7 @@ import ../../utils/time, ../waku_node, ../peer_manager/peer_manager, - ./jsonrpc_types, + ./jsonrpc_types, ./jsonrpc_utils export jsonrpc_types @@ -30,7 +30,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Returns history for a list of content topics with optional paging debug "get_waku_v2_store_v1_messages" - let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec) + let peerOpt = node.peerManager.selectPeer(WakuStoreCodec) if peerOpt.isNone(): raise newException(ValueError, "no suitable remote store peers") @@ -52,7 +52,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = if not await queryFut.withTimeout(futTimeout): raise newException(ValueError, "No history response received (timeout)") - + let res = queryFut.read() if res.isErr(): raise newException(ValueError, $res.error) diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 9f70b197f..7a930258e 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -40,10 +40,10 @@ const InitialBackoffInSec = 120 BackoffFactor = 4 - # limit the amount of paralel dials + # Limit the amount of paralel dials MaxParalelDials = 10 - # delay between consecutive relayConnectivityLoop runs + # Delay between consecutive relayConnectivityLoop runs ConnectivityLoopInterval = chronos.seconds(30) type @@ -54,6 +54,8 @@ type backoffFactor*: int maxFailedAttempts*: int storage: PeerStorage + serviceSlots*: Table[string, RemotePeerInfo] + started: bool #################### # Helper functions # @@ -105,7 +107,10 @@ proc dialPeer(pm: PeerManager, peerId: PeerID, pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second) pm.peerStore[ConnectionBook][peerId] = CannotConnect - debug "Dialing peer failed", peerId = peerId, reason = reasonFailed, failedAttempts=failedAttempts + debug "Dialing peer failed", + peerId = peerId, + reason = reasonFailed, + failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] waku_peers_dials.inc(labelValues = [reasonFailed]) # Update storage @@ -192,13 +197,14 @@ proc new*(T: type PeerManager, initialBackoffInSec: initialBackoffInSec, backoffFactor: backoffFactor, maxFailedAttempts: maxFailedAttempts) - proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = onConnEvent(pm, peerId, event) pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected) pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected) + pm.serviceSlots = initTable[string, RemotePeerInfo]() + if not storage.isNil(): debug "found persistent peer storage" pm.loadFromStorage() # Load previously managed peers. @@ -239,6 +245,20 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = if not pm.storage.isNil: pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected) +proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = + # Do not add relay peers + if proto == WakuRelayCodec: + warn "Can't add relay peer to service peers slots" + return + + info "Adding peer to service slots", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto + + # Set peer for service slot + pm.serviceSlots[proto] = remotePeerInfo + + # TODO: Remove proto once fully refactored + pm.addPeer(remotePeerInfo, proto) + proc reconnectPeers*(pm: PeerManager, proto: string, protocolMatcher: Matcher, @@ -335,7 +355,8 @@ proc connectToNodes*(pm: PeerManager, # Ensures a healthy amount of connected relay peers proc relayConnectivityLoop*(pm: PeerManager) {.async.} = - while true: + debug "Starting relay connectivity loop" + while pm.started: let maxConnections = pm.switch.connManager.inSema.size let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len @@ -364,3 +385,37 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} = await pm.connectToNodes(outsideBackoffPeers[0.. 0: + debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto + return some(peers[0].toRemotePeerInfo()) + debug "No peer found for protocol", protocol=proto + return none(RemotePeerInfo) + + # For other protocols, we select the peer that is slotted for the given protocol + pm.serviceSlots.withValue(proto, serviceSlot): + debug "Got peer from service slots", peerId=serviceSlot[].peerId, multi=serviceSlot[].addrs[0], protocol=proto + return some(serviceSlot[]) + + # If not slotted, we select a random peer for the given protocol + if peers.len > 0: + debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto + return some(peers[0].toRemotePeerInfo()) + debug "No peer found for protocol", protocol=proto + return none(RemotePeerInfo) + +proc start*(pm: PeerManager) = + pm.started = true + asyncSpawn pm.relayConnectivityLoop() + +proc stop*(pm: PeerManager) = + pm.started = false diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index d7097ed3f..d8d026ad6 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -152,6 +152,10 @@ proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness = # TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected) +proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool = + # Returns `true` if the peer is connected + peerStore.connectedness(peerId) == Connected + proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool = # Returns `true` if peer is included in manager for the specified protocol # TODO: What if peer does not exist in the peerStore? @@ -165,20 +169,11 @@ proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool = # Returns `true` if the peerstore has any peer matching the protocolMatcher toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it))) -proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] = - # Selects the best peer for a given protocol - let peers = peerStore.peers().filterIt(it.protos.contains(proto)) - - if peers.len >= 1: - # TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned - let peerStored = peers[0] - - return some(peerStored.toRemotePeerInfo()) - else: - return none(RemotePeerInfo) - proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[StoredInfo] = return peerStore.peers.filterIt(it.direction == direction) proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] = return peerStore.peers.filterIt(it.connectedness != Connected) + +proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[StoredInfo] = + return peerStore.peers.filterIt(it.protos.contains(proto)) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 6b0d2e42a..96f16b650 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -391,9 +391,6 @@ proc startRelay*(node: WakuNode) {.async.} = protocolMatcher(WakuRelayCodec), backoffPeriod) - # Maintain relay connections - asyncSpawn node.peerManager.relayConnectivityLoop() - # Start the WakuRelay protocol await node.wakuRelay.start() @@ -529,7 +526,7 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content error "cannot register filter subscription to topic", error="waku filter client is nil" return - let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec) + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) if peerOpt.isNone(): error "cannot register filter subscription to topic", error="no suitable remote peers" return @@ -544,7 +541,7 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte error "cannot unregister filter subscription to content", error="waku filter client is nil" return - let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec) + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) if peerOpt.isNone(): error "cannot register filter subscription to topic", error="no suitable remote peers" return @@ -702,7 +699,7 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History if node.wakuStoreClient.isNil(): return err("waku store client is nil") - let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec) + let peerOpt = node.peerManager.selectPeer(WakuStoreCodec) if peerOpt.isNone(): error "no suitable remote peers" return err("peer_not_found_failure") @@ -791,7 +788,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe error "failed to publish message", error="waku lightpush client is nil" return - let peerOpt = node.peerManager.peerStore.selectPeer(WakuLightPushCodec) + let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec) if peerOpt.isNone(): error "failed to publish message", error="no suitable remote peers" return @@ -1008,5 +1005,6 @@ proc stop*(node: WakuNode) {.async.} = discard await node.stopDiscv5() await node.switch.stop() + node.peerManager.stop() node.started = false diff --git a/waku/v2/protocol/waku_peer_exchange/protocol.nim b/waku/v2/protocol/waku_peer_exchange/protocol.nim index 14fea22b4..ff33f1ffe 100644 --- a/waku/v2/protocol/waku_peer_exchange/protocol.nim +++ b/waku/v2/protocol/waku_peer_exchange/protocol.nim @@ -77,7 +77,7 @@ proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Fut return ok() proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec) + let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) if peerOpt.isNone(): waku_px_errors.inc(labelValues = [peerNotFoundFailure]) return err(peerNotFoundFailure) @@ -106,7 +106,7 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo return ok() proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec) + let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) if peerOpt.isNone(): waku_px_errors.inc(labelValues = [peerNotFoundFailure]) return err(peerNotFoundFailure) diff --git a/waku/v2/protocol/waku_store/client.nim b/waku/v2/protocol/waku_store/client.nim index b9f3549dd..f3a9cdf78 100644 --- a/waku/v2/protocol/waku_store/client.nim +++ b/waku/v2/protocol/waku_store/client.nim @@ -210,7 +210,7 @@ when defined(waku_exp_store_resume): else: debug "no candidate list is provided, selecting a random peer" # if no peerList is set then query from one of the peers stored in the peer manager - let peerOpt = w.peerManager.peerStore.selectPeer(WakuStoreCodec) + let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) if peerOpt.isNone(): warn "no suitable remote peers" waku_store_errors.inc(labelValues = [peerNotFoundFailure])