From 14305c610a10e967dc9db62822478f6792d4a64e Mon Sep 17 00:00:00 2001 From: Alvaro Revuelta Date: Wed, 19 Apr 2023 16:12:00 +0200 Subject: [PATCH] feat: curate peers shared over px protocol (#1671) --- apps/wakunode2/wakunode2.nim | 3 - tests/v2/test_waku_peer_exchange.nim | 11 +-- waku/v2/node/peer_manager/peer_manager.nim | 6 +- waku/v2/node/peer_manager/waku_peer_store.nim | 3 + waku/v2/node/waku_node.nim | 16 ++-- waku/v2/utils/peers.nim | 1 + waku/v2/waku_peer_exchange/protocol.nim | 75 +++++++++---------- 7 files changed, 58 insertions(+), 57 deletions(-) diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index db61141d2..bfe6012c0 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -577,9 +577,6 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, except CatchableError: return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()) - if conf.peerExchange: - asyncSpawn runPeerExchangeDiscv5Loop(node.wakuPeerExchange) - # retrieve px peers and add the to the peer store if conf.peerExchangeNode != "": let desiredOutDegree = node.wakuRelay.parameters.d.uint64() diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index 7b9b5e851..ee22d1a0a 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -126,15 +126,16 @@ procSuite "Waku Peer Exchange": await allFutures([node1.start(), node2.start(), node3.start()]) await allFutures([node1.startDiscv5(), node2.startDiscv5()]) + # Give disv5 some time to discover each other + await sleepAsync(5000.millis) + + # node2 can be connected, so will be returned by peer exchange + require (await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo())) + # Mount peer exchange await node1.mountPeerExchange() await node3.mountPeerExchange() - # Give the algorithm some time to work its magic - await sleepAsync(3000.millis) - - asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop() - let connOpt = await node3.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) check: connOpt.isSome diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index d5312d056..e7fe447fb 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -100,7 +100,7 @@ proc insertOrReplace(ps: PeerStorage, warn "failed to store peers", err = res.error waku_peers_errors.inc(labelValues = ["storage_failure"]) -proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) = +proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownOrigin) = # Adds peer to manager for the specified protocol if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: @@ -120,6 +120,10 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) = pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey + pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin + + if remotePeerInfo.enr.isSome(): + pm.peerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get() # Add peer to storage. Entry will subsequently be updated with connectedness information if not pm.storage.isNil: diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index 28baa8cfe..8d614caed 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -128,3 +128,6 @@ proc getConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] = proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] = return peerStore.peers.filterIt(it.protocols.contains(proto)) + +proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] = + return peerStore.peers.filterIt(it.connectedness == CanConnect or it.connectedness == Connected) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index b57f1593c..dcaceb36c 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -760,11 +760,7 @@ when defined(rln): proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = info "mounting waku peer exchange" - var discv5Opt: Option[WakuDiscoveryV5] - if not node.wakuDiscV5.isNil(): - discv5Opt = some(node.wakuDiscV5) - - node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, discv5Opt) + node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager) if node.started: await node.wakuPeerExchange.start() @@ -780,13 +776,13 @@ proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [D let pxPeersRes = await node.wakuPeerExchange.request(amount) if pxPeersRes.isOk: var validPeers = 0 - for pi in pxPeersRes.get().peerInfos: + let peers = pxPeersRes.get().peerInfos + for pi in peers: var record: enr.Record if enr.fromBytes(record, pi.enr): - # TODO: Add source: PX - node.peerManager.addPeer(record.toRemotePeerInfo().get) + node.peerManager.addPeer(record.toRemotePeerInfo().get, PeerExcahnge) validPeers += 1 - info "Retrieved peer info via peer exchange protocol", validPeers = validPeers + info "Retrieved peer info via peer exchange protocol", validPeers = validPeers, totalPeers = peers.len else: warn "Failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error @@ -871,7 +867,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} = # Add all peers, new ones and already seen (in case their addresses changed) for peer in discoveredPeers: - node.peerManager.addPeer(peer) + node.peerManager.addPeer(peer, Discv5) # Discovery `queryRandom` can have a synchronous fast path for example # when no peers are in the routing table. Don't run it in continuous loop. diff --git a/waku/v2/utils/peers.nim b/waku/v2/utils/peers.nim index 1f0384f91..d6b4bab23 100644 --- a/waku/v2/utils/peers.nim +++ b/waku/v2/utils/peers.nim @@ -38,6 +38,7 @@ type UnknownOrigin, Discv5, Static, + PeerExcahnge, Dns PeerDirection* = enum diff --git a/waku/v2/waku_peer_exchange/protocol.nim b/waku/v2/waku_peer_exchange/protocol.nim index 8a034cd8e..230b3a3cc 100644 --- a/waku/v2/waku_peer_exchange/protocol.nim +++ b/waku/v2/waku_peer_exchange/protocol.nim @@ -10,6 +10,7 @@ import import ../node/peer_manager, ../waku_core, + ../utils/heartbeat, ../waku_discv5, ./rpc, ./rpc_codec @@ -29,8 +30,8 @@ const # We add a 64kB safety buffer for protocol overhead. # 10x-multiplier also for safety MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs... - MaxCacheSize = 1000 - CacheCleanWindow = 200 + MaxPeersCacheSize = 60 + CacheRefreshInterval = 15.minutes WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1" @@ -47,7 +48,6 @@ type WakuPeerExchange* = ref object of LPProtocol peerManager*: PeerManager - wakuDiscv5: Option[WakuDiscoveryV5] enrCache*: seq[enr.Record] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ proc request*(wpx: WakuPeerExchange, numPeers: uint64, conn: Connection): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} = @@ -95,42 +95,42 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection): Fu return ok() -proc cleanCache(wpx: WakuPeerExchange) {.gcsafe.} = - wpx.enrCache.delete(0..CacheCleanWindow-1) - -proc runPeerExchangeDiscv5Loop*(wpx: WakuPeerExchange) {.async, gcsafe.} = - ## Runs a discv5 loop adding new peers to the px peer cache - if wpx.wakuDiscv5.isNone(): - warn "Trying to run discovery v5 (for PX) while it's disabled" - return - - info "Starting peer exchange discovery v5 loop" - - while wpx.wakuDiscv5.get().listening: - trace "Running px discv5 discovery loop" - let discoveredPeers = await wpx.wakuDiscv5.get().findRandomPeers() - info "Discovered px peers via discv5", count=discoveredPeers.get().len() - if discoveredPeers.isOk(): - for dp in discoveredPeers.get(): - if dp.enr.isSome() and not wpx.enrCache.contains(dp.enr.get()): - wpx.enrCache.add(dp.enr.get()) - - if wpx.enrCache.len() >= MaxCacheSize: - wpx.cleanCache() - - ## This loop "competes" with the loop in wakunode2 - ## For the purpose of collecting px peers, 30 sec intervals should be enough - await sleepAsync(30.seconds) - proc getEnrsFromCache(wpx: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} = - randomize() if wpx.enrCache.len() == 0: debug "peer exchange ENR cache is empty" return @[] - for i in 0..