mirror of https://github.com/waku-org/nwaku.git
feat: curate peers shared over px protocol (#1671)
This commit is contained in:
parent
d5ef9331e7
commit
14305c610a
|
@ -577,9 +577,6 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
|
|||
except CatchableError:
|
||||
return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg())
|
||||
|
||||
if conf.peerExchange:
|
||||
asyncSpawn runPeerExchangeDiscv5Loop(node.wakuPeerExchange)
|
||||
|
||||
# retrieve px peers and add the to the peer store
|
||||
if conf.peerExchangeNode != "":
|
||||
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
|
||||
|
|
|
@ -126,15 +126,16 @@ procSuite "Waku Peer Exchange":
|
|||
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||
await allFutures([node1.startDiscv5(), node2.startDiscv5()])
|
||||
|
||||
# Give disv5 some time to discover each other
|
||||
await sleepAsync(5000.millis)
|
||||
|
||||
# node2 can be connected, so will be returned by peer exchange
|
||||
require (await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo()))
|
||||
|
||||
# Mount peer exchange
|
||||
await node1.mountPeerExchange()
|
||||
await node3.mountPeerExchange()
|
||||
|
||||
# Give the algorithm some time to work its magic
|
||||
await sleepAsync(3000.millis)
|
||||
|
||||
asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop()
|
||||
|
||||
let connOpt = await node3.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
|
||||
check:
|
||||
connOpt.isSome
|
||||
|
|
|
@ -100,7 +100,7 @@ proc insertOrReplace(ps: PeerStorage,
|
|||
warn "failed to store peers", err = res.error
|
||||
waku_peers_errors.inc(labelValues = ["storage_failure"])
|
||||
|
||||
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) =
|
||||
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownOrigin) =
|
||||
# Adds peer to manager for the specified protocol
|
||||
|
||||
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
|
||||
|
@ -120,6 +120,10 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) =
|
|||
|
||||
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
|
||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
|
||||
pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin
|
||||
|
||||
if remotePeerInfo.enr.isSome():
|
||||
pm.peerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get()
|
||||
|
||||
# Add peer to storage. Entry will subsequently be updated with connectedness information
|
||||
if not pm.storage.isNil:
|
||||
|
|
|
@ -128,3 +128,6 @@ proc getConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
|
|||
|
||||
proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] =
|
||||
return peerStore.peers.filterIt(it.protocols.contains(proto))
|
||||
|
||||
proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
|
||||
return peerStore.peers.filterIt(it.connectedness == CanConnect or it.connectedness == Connected)
|
||||
|
|
|
@ -760,11 +760,7 @@ when defined(rln):
|
|||
proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
info "mounting waku peer exchange"
|
||||
|
||||
var discv5Opt: Option[WakuDiscoveryV5]
|
||||
if not node.wakuDiscV5.isNil():
|
||||
discv5Opt = some(node.wakuDiscV5)
|
||||
|
||||
node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, discv5Opt)
|
||||
node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager)
|
||||
|
||||
if node.started:
|
||||
await node.wakuPeerExchange.start()
|
||||
|
@ -780,13 +776,13 @@ proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [D
|
|||
let pxPeersRes = await node.wakuPeerExchange.request(amount)
|
||||
if pxPeersRes.isOk:
|
||||
var validPeers = 0
|
||||
for pi in pxPeersRes.get().peerInfos:
|
||||
let peers = pxPeersRes.get().peerInfos
|
||||
for pi in peers:
|
||||
var record: enr.Record
|
||||
if enr.fromBytes(record, pi.enr):
|
||||
# TODO: Add source: PX
|
||||
node.peerManager.addPeer(record.toRemotePeerInfo().get)
|
||||
node.peerManager.addPeer(record.toRemotePeerInfo().get, PeerExcahnge)
|
||||
validPeers += 1
|
||||
info "Retrieved peer info via peer exchange protocol", validPeers = validPeers
|
||||
info "Retrieved peer info via peer exchange protocol", validPeers = validPeers, totalPeers = peers.len
|
||||
else:
|
||||
warn "Failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error
|
||||
|
||||
|
@ -871,7 +867,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
|
|||
|
||||
# Add all peers, new ones and already seen (in case their addresses changed)
|
||||
for peer in discoveredPeers:
|
||||
node.peerManager.addPeer(peer)
|
||||
node.peerManager.addPeer(peer, Discv5)
|
||||
|
||||
# Discovery `queryRandom` can have a synchronous fast path for example
|
||||
# when no peers are in the routing table. Don't run it in continuous loop.
|
||||
|
|
|
@ -38,6 +38,7 @@ type
|
|||
UnknownOrigin,
|
||||
Discv5,
|
||||
Static,
|
||||
PeerExcahnge,
|
||||
Dns
|
||||
|
||||
PeerDirection* = enum
|
||||
|
|
|
@ -10,6 +10,7 @@ import
|
|||
import
|
||||
../node/peer_manager,
|
||||
../waku_core,
|
||||
../utils/heartbeat,
|
||||
../waku_discv5,
|
||||
./rpc,
|
||||
./rpc_codec
|
||||
|
@ -29,8 +30,8 @@ const
|
|||
# We add a 64kB safety buffer for protocol overhead.
|
||||
# 10x-multiplier also for safety
|
||||
MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs...
|
||||
MaxCacheSize = 1000
|
||||
CacheCleanWindow = 200
|
||||
MaxPeersCacheSize = 60
|
||||
CacheRefreshInterval = 15.minutes
|
||||
|
||||
WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1"
|
||||
|
||||
|
@ -47,7 +48,6 @@ type
|
|||
|
||||
WakuPeerExchange* = ref object of LPProtocol
|
||||
peerManager*: PeerManager
|
||||
wakuDiscv5: Option[WakuDiscoveryV5]
|
||||
enrCache*: seq[enr.Record] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
|
||||
|
||||
proc request*(wpx: WakuPeerExchange, numPeers: uint64, conn: Connection): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} =
|
||||
|
@ -95,42 +95,42 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection): Fu
|
|||
|
||||
return ok()
|
||||
|
||||
proc cleanCache(wpx: WakuPeerExchange) {.gcsafe.} =
|
||||
wpx.enrCache.delete(0..CacheCleanWindow-1)
|
||||
|
||||
proc runPeerExchangeDiscv5Loop*(wpx: WakuPeerExchange) {.async, gcsafe.} =
|
||||
## Runs a discv5 loop adding new peers to the px peer cache
|
||||
if wpx.wakuDiscv5.isNone():
|
||||
warn "Trying to run discovery v5 (for PX) while it's disabled"
|
||||
return
|
||||
|
||||
info "Starting peer exchange discovery v5 loop"
|
||||
|
||||
while wpx.wakuDiscv5.get().listening:
|
||||
trace "Running px discv5 discovery loop"
|
||||
let discoveredPeers = await wpx.wakuDiscv5.get().findRandomPeers()
|
||||
info "Discovered px peers via discv5", count=discoveredPeers.get().len()
|
||||
if discoveredPeers.isOk():
|
||||
for dp in discoveredPeers.get():
|
||||
if dp.enr.isSome() and not wpx.enrCache.contains(dp.enr.get()):
|
||||
wpx.enrCache.add(dp.enr.get())
|
||||
|
||||
if wpx.enrCache.len() >= MaxCacheSize:
|
||||
wpx.cleanCache()
|
||||
|
||||
## This loop "competes" with the loop in wakunode2
|
||||
## For the purpose of collecting px peers, 30 sec intervals should be enough
|
||||
await sleepAsync(30.seconds)
|
||||
|
||||
proc getEnrsFromCache(wpx: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} =
|
||||
randomize()
|
||||
if wpx.enrCache.len() == 0:
|
||||
debug "peer exchange ENR cache is empty"
|
||||
return @[]
|
||||
for i in 0..<min(numPeers, wpx.enrCache.len().uint64()):
|
||||
let ri = rand(0..<wpx.enrCache.len())
|
||||
# TODO: Note that duplicated peers can be returned here
|
||||
result.add(wpx.enrCache[ri])
|
||||
|
||||
# copy and shuffle
|
||||
randomize()
|
||||
var shuffledCache = wpx.enrCache
|
||||
shuffledCache.shuffle()
|
||||
|
||||
# return numPeers or less if cache is smaller
|
||||
return shuffledCache[0..<min(shuffledCache.len.int, numPeers.int)]
|
||||
|
||||
proc populateEnrCache(wpx: WakuPeerExchange) =
|
||||
# share only peers that i) are reachable ii) come from discv5
|
||||
let withEnr = wpx.peerManager.peerStore
|
||||
.getReachablePeers()
|
||||
.filterIt(it.origin == Discv5)
|
||||
.filterIt(it.enr.isSome)
|
||||
|
||||
# either what we have or max cache size
|
||||
var newEnrCache = newSeq[enr.Record](0)
|
||||
for i in 0..<min(withEnr.len, MaxPeersCacheSize):
|
||||
newEnrCache.add(withEnr[i].enr.get())
|
||||
|
||||
# swap cache for new
|
||||
wpx.enrCache = newEnrCache
|
||||
|
||||
proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} =
|
||||
# try more aggressively to fill the cache at startup
|
||||
while wpx.enrCache.len < MaxPeersCacheSize:
|
||||
wpx.populateEnrCache()
|
||||
await sleepAsync(5.seconds)
|
||||
|
||||
heartbeat "Updating px enr cache", CacheRefreshInterval:
|
||||
wpx.populateEnrCache()
|
||||
|
||||
proc initProtocolHandler(wpx: WakuPeerExchange) =
|
||||
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
|
@ -159,11 +159,10 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
|
|||
wpx.codec = WakuPeerExchangeCodec
|
||||
|
||||
proc new*(T: type WakuPeerExchange,
|
||||
peerManager: PeerManager,
|
||||
wakuDiscv5: Option[WakuDiscoveryV5] = none(WakuDiscoveryV5)): T =
|
||||
peerManager: PeerManager): T =
|
||||
let wpx = WakuPeerExchange(
|
||||
peerManager: peerManager,
|
||||
wakuDiscv5: wakuDiscv5
|
||||
)
|
||||
wpx.initProtocolHandler()
|
||||
asyncSpawn wpx.updatePxEnrCache()
|
||||
return wpx
|
||||
|
|
Loading…
Reference in New Issue