From 1c46b61402ec2a04fba5fc11b054d67c0e9d01a3 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Wed, 2 Nov 2022 09:45:21 +0100 Subject: [PATCH] refactor(peer-exchange): move peer management to waku_node module --- apps/wakunode2/wakunode2.nim | 2 +- tests/v2/test_waku_peer_exchange.nim | 6 +- waku/v2/node/waku_node.nim | 11 +-- waku/v2/protocol/waku_peer_exchange.nim | 7 +- .../v2/protocol/waku_peer_exchange/client.nim | 9 --- .../protocol/waku_peer_exchange/protocol.nim | 72 +++++++++---------- 6 files changed, 45 insertions(+), 62 deletions(-) delete mode 100644 waku/v2/protocol/waku_peer_exchange/client.nim diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index e7c6d1903..66064448d 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -433,7 +433,7 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, # waku peer exchange setup if (conf.peerExchangeNode != "") or (conf.peerExchange): try: - await mountWakuPeerExchange(node) + await mountPeerExchange(node) except: return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index 0594682a2..8b072d226 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -124,14 +124,14 @@ procSuite "Waku Peer Exchange": await allFutures([node1.startDiscv5(), node2.startDiscv5()]) # Mount peer exchange - await node1.mountWakuPeerExchange() - await node3.mountWakuPeerExchange() + await node1.mountPeerExchange() + await node3.mountPeerExchange() await sleepAsync(3000.millis) # Give the algorithm some time to work its magic asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop() - node3.wakuPeerExchange.setPeer(node1.switch.peerInfo.toRemotePeerInfo()) + node3.setPeerExchangePeer(node1.peerInfo.toRemotePeerInfo()) ## When discard waitFor node3.wakuPeerExchange.request(1) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 9fc039a69..c3fdee770 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -41,6 +41,7 @@ declarePublicGauge waku_node_filters, "number of content filter subscriptions" declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"] declarePublicGauge waku_lightpush_peers, "number of lightpush peers" declarePublicGauge waku_store_peers, "number of store peers" +declarePublicGauge waku_px_peers, "number of peers (in the node's peerManager) supporting the peer exchange protocol" logScope: @@ -697,20 +698,21 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe ## Waku peer-exchange -proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = +proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = info "mounting waku peer exchange" var discv5Opt: Option[WakuDiscoveryV5] if not node.wakuDiscV5.isNil(): discv5Opt = some(node.wakuDiscV5) - node.wakuPeerExchange = WakuPeerExchange.init(node.peerManager, discv5Opt) + + node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, discv5Opt) if node.started: - # Node has started already. Let's start Waku peer exchange too. await node.wakuPeerExchange.start() node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec)) +# TODO: Move to application module (e.g., wakunode2.nim) proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} = if node.wakuPeerExchange.isNil(): error "could not set peer, waku peer-exchange is nil" @@ -720,7 +722,8 @@ proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: let remotePeer = when peer is string: parseRemotePeerInfo(peer) else: peer - node.wakuPeerExchange.setPeer(remotePeer) + node.peerManager.addPeer(remotePeer, WakuPeerExchangeCodec) + waku_px_peers.inc() ## Other protocols diff --git a/waku/v2/protocol/waku_peer_exchange.nim b/waku/v2/protocol/waku_peer_exchange.nim index 3cb1f9af0..e8b61e25b 100644 --- a/waku/v2/protocol/waku_peer_exchange.nim +++ b/waku/v2/protocol/waku_peer_exchange.nim @@ -3,11 +3,8 @@ import ./waku_peer_exchange/rpc, ./waku_peer_exchange/rpc_codec, - ./waku_peer_exchange/protocol, - ./waku_peer_exchange/client - + ./waku_peer_exchange/protocol export rpc, rpc_codec, - protocol, - client + protocol diff --git a/waku/v2/protocol/waku_peer_exchange/client.nim b/waku/v2/protocol/waku_peer_exchange/client.nim deleted file mode 100644 index 6de8dad67..000000000 --- a/waku/v2/protocol/waku_peer_exchange/client.nim +++ /dev/null @@ -1,9 +0,0 @@ -{.push raises: [Defect].} - -import - std/[tables, sequtils], - chronicles -import - ../waku_message, - ./rpc - diff --git a/waku/v2/protocol/waku_peer_exchange/protocol.nim b/waku/v2/protocol/waku_peer_exchange/protocol.nim index 719097f12..55c5a4db7 100644 --- a/waku/v2/protocol/waku_peer_exchange/protocol.nim +++ b/waku/v2/protocol/waku_peer_exchange/protocol.nim @@ -17,7 +17,6 @@ import ./rpc_codec -declarePublicGauge waku_px_peers, "number of peers (in the node's peerManager) supporting the peer exchange protocol" declarePublicGauge waku_px_peers_received_total, "number of ENRs received via peer exchange" declarePublicGauge waku_px_peers_received_unknown, "number of previously unknown ENRs received via peer exchange" declarePublicGauge waku_px_peers_sent, "number of ENRs sent to peer exchange requesters" @@ -107,7 +106,7 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo return ok() -proc respond*(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = +proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) if peerOpt.isNone(): waku_px_errors.inc(labelValues = [peerNotFoundFailure]) @@ -115,48 +114,47 @@ proc respond*(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExch return await wpx.respond(enrs, peerOpt.get()) -proc cleanCache(px: WakuPeerExchange) {.gcsafe.} = - px.enrCache.delete(0, CacheCleanWindow-1) +proc cleanCache(wpx: WakuPeerExchange) {.gcsafe.} = + wpx.enrCache.delete(0..CacheCleanWindow-1) -proc runPeerExchangeDiscv5Loop*(px: WakuPeerExchange) {.async, gcsafe.} = +proc runPeerExchangeDiscv5Loop*(wpx: WakuPeerExchange) {.async, gcsafe.} = ## Runs a discv5 loop adding new peers to the px peer cache - if px.wakuDiscv5.isNone(): + if wpx.wakuDiscv5.isNone(): warn "Trying to run discovery v5 (for PX) while it's disabled" return info "Starting peer exchange discovery v5 loop" - while px.wakuDiscv5.get().listening: + while wpx.wakuDiscv5.get().listening: trace "Running px discv5 discovery loop" - let discoveredPeers = await px.wakuDiscv5.get().findRandomPeers() + let discoveredPeers = await wpx.wakuDiscv5.get().findRandomPeers() info "Discovered px peers via discv5", count=discoveredPeers.get().len() - if discoveredPeers.isOk: + if discoveredPeers.isOk(): for dp in discoveredPeers.get(): - if dp.enr.isSome() and not px.enrCache.contains(dp.enr.get()): - px.enrCache.add(dp.enr.get()) + if dp.enr.isSome() and not wpx.enrCache.contains(dp.enr.get()): + wpx.enrCache.add(dp.enr.get()) - if px.enrCache.len() >= MaxCacheSize: - px.cleanCache() + if wpx.enrCache.len() >= MaxCacheSize: + wpx.cleanCache() ## This loop "competes" with the loop in wakunode2 ## For the purpose of collecting px peers, 30 sec intervals should be enough await sleepAsync(30.seconds) -proc getEnrsFromCache(px: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} = +proc getEnrsFromCache(wpx: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} = randomize() - if px.enrCache.len() == 0: + if wpx.enrCache.len() == 0: debug "peer exchange ENR cache is empty" return @[] - for i in 0.. 0: waku_px_peers_received_unknown.inc(newPeers.len().int64()) debug "Connecting to newly discovered peers", count=newPeers.len() - await px.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange") + await wpx.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange") - px.handler = handler - px.codec = WakuPeerExchangeCodec + wpx.handler = handler + wpx.codec = WakuPeerExchangeCodec -proc init*(T: type WakuPeerExchange, - peerManager: PeerManager, - wakuDiscv5: Option[WakuDiscoveryV5] = none(WakuDiscoveryV5) - ): T = - let px = WakuPeerExchange( +proc new*(T: type WakuPeerExchange, + peerManager: PeerManager, + wakuDiscv5: Option[WakuDiscoveryV5] = none(WakuDiscoveryV5)): T = + let wpx = WakuPeerExchange( peerManager: peerManager, wakuDiscv5: wakuDiscv5 ) - px.initProtocolHandler() - return px - -proc setPeer*(wpx: WakuPeerExchange, peer: RemotePeerInfo) = - wpx.peerManager.addPeer(peer, WakuPeerExchangeCodec) - waku_px_peers.inc() - + wpx.initProtocolHandler() + return wpx