refactor(peer-exchange): move peer management to waku_node module

This commit is contained in:
Lorenzo Delgado 2022-11-02 09:45:21 +01:00 committed by GitHub
parent cd73029a0c
commit 1c46b61402
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 45 additions and 62 deletions

View File

@ -433,7 +433,7 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
# waku peer exchange setup
if (conf.peerExchangeNode != "") or (conf.peerExchange):
try:
await mountWakuPeerExchange(node)
await mountPeerExchange(node)
except:
return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())

View File

@ -124,14 +124,14 @@ procSuite "Waku Peer Exchange":
await allFutures([node1.startDiscv5(), node2.startDiscv5()])
# Mount peer exchange
await node1.mountWakuPeerExchange()
await node3.mountWakuPeerExchange()
await node1.mountPeerExchange()
await node3.mountPeerExchange()
await sleepAsync(3000.millis) # Give the algorithm some time to work its magic
asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop()
node3.wakuPeerExchange.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
node3.setPeerExchangePeer(node1.peerInfo.toRemotePeerInfo())
## When
discard waitFor node3.wakuPeerExchange.request(1)

View File

@ -41,6 +41,7 @@ declarePublicGauge waku_node_filters, "number of content filter subscriptions"
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
declarePublicGauge waku_store_peers, "number of store peers"
declarePublicGauge waku_px_peers, "number of peers (in the node's peerManager) supporting the peer exchange protocol"
logScope:
@ -697,20 +698,21 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
## Waku peer-exchange
proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
info "mounting waku peer exchange"
var discv5Opt: Option[WakuDiscoveryV5]
if not node.wakuDiscV5.isNil():
discv5Opt = some(node.wakuDiscV5)
node.wakuPeerExchange = WakuPeerExchange.init(node.peerManager, discv5Opt)
node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, discv5Opt)
if node.started:
# Node has started already. Let's start Waku peer exchange too.
await node.wakuPeerExchange.start()
node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))
# TODO: Move to application module (e.g., wakunode2.nim)
proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
if node.wakuPeerExchange.isNil():
error "could not set peer, waku peer-exchange is nil"
@ -720,7 +722,8 @@ proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises:
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
node.wakuPeerExchange.setPeer(remotePeer)
node.peerManager.addPeer(remotePeer, WakuPeerExchangeCodec)
waku_px_peers.inc()
## Other protocols

View File

@ -3,11 +3,8 @@
import
./waku_peer_exchange/rpc,
./waku_peer_exchange/rpc_codec,
./waku_peer_exchange/protocol,
./waku_peer_exchange/client
./waku_peer_exchange/protocol
export
rpc,
rpc_codec,
protocol,
client
protocol

View File

@ -1,9 +0,0 @@
{.push raises: [Defect].}
import
std/[tables, sequtils],
chronicles
import
../waku_message,
./rpc

View File

@ -17,7 +17,6 @@ import
./rpc_codec
declarePublicGauge waku_px_peers, "number of peers (in the node's peerManager) supporting the peer exchange protocol"
declarePublicGauge waku_px_peers_received_total, "number of ENRs received via peer exchange"
declarePublicGauge waku_px_peers_received_unknown, "number of previously unknown ENRs received via peer exchange"
declarePublicGauge waku_px_peers_sent, "number of ENRs sent to peer exchange requesters"
@ -107,7 +106,7 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo
return ok()
proc respond*(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
@ -115,48 +114,47 @@ proc respond*(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExch
return await wpx.respond(enrs, peerOpt.get())
proc cleanCache(px: WakuPeerExchange) {.gcsafe.} =
px.enrCache.delete(0, CacheCleanWindow-1)
proc cleanCache(wpx: WakuPeerExchange) {.gcsafe.} =
wpx.enrCache.delete(0..CacheCleanWindow-1)
proc runPeerExchangeDiscv5Loop*(px: WakuPeerExchange) {.async, gcsafe.} =
proc runPeerExchangeDiscv5Loop*(wpx: WakuPeerExchange) {.async, gcsafe.} =
## Runs a discv5 loop adding new peers to the px peer cache
if px.wakuDiscv5.isNone():
if wpx.wakuDiscv5.isNone():
warn "Trying to run discovery v5 (for PX) while it's disabled"
return
info "Starting peer exchange discovery v5 loop"
while px.wakuDiscv5.get().listening:
while wpx.wakuDiscv5.get().listening:
trace "Running px discv5 discovery loop"
let discoveredPeers = await px.wakuDiscv5.get().findRandomPeers()
let discoveredPeers = await wpx.wakuDiscv5.get().findRandomPeers()
info "Discovered px peers via discv5", count=discoveredPeers.get().len()
if discoveredPeers.isOk:
if discoveredPeers.isOk():
for dp in discoveredPeers.get():
if dp.enr.isSome() and not px.enrCache.contains(dp.enr.get()):
px.enrCache.add(dp.enr.get())
if dp.enr.isSome() and not wpx.enrCache.contains(dp.enr.get()):
wpx.enrCache.add(dp.enr.get())
if px.enrCache.len() >= MaxCacheSize:
px.cleanCache()
if wpx.enrCache.len() >= MaxCacheSize:
wpx.cleanCache()
## This loop "competes" with the loop in wakunode2
## For the purpose of collecting px peers, 30 sec intervals should be enough
await sleepAsync(30.seconds)
proc getEnrsFromCache(px: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} =
proc getEnrsFromCache(wpx: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} =
randomize()
if px.enrCache.len() == 0:
if wpx.enrCache.len() == 0:
debug "peer exchange ENR cache is empty"
return @[]
for i in 0..<min(numPeers, px.enrCache.len().uint64()):
let ri = rand(0..<px.enrCache.len())
result.add(px.enrCache[ri])
proc initProtocolHandler*(px: WakuPeerExchange) =
for i in 0..<min(numPeers, wpx.enrCache.len().uint64()):
let ri = rand(0..<wpx.enrCache.len())
result.add(wpx.enrCache[ri])
proc initProtocolHandler(wpx: WakuPeerExchange) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let message = await conn.readLp(MaxRpcSize.int)
let buff = await conn.readLp(MaxRpcSize.int)
let res = PeerExchangeRpc.init(message)
let res = PeerExchangeRpc.init(buff)
if res.isErr():
waku_px_errors.inc(labelValues = [decodeRpcFailure])
return
@ -166,8 +164,8 @@ proc initProtocolHandler*(px: WakuPeerExchange) =
# handle peer exchange request
if rpc.request != PeerExchangeRequest():
trace "peer exchange request received"
let enrs = px.getEnrsFromCache(rpc.request.numPeers)
discard await px.respond(enrs, conn.peerId)
let enrs = wpx.getEnrsFromCache(rpc.request.numPeers)
discard await wpx.respond(enrs, conn.peerId)
waku_px_peers_sent.inc(enrs.len().int64())
# handle peer exchange response
@ -182,28 +180,22 @@ proc initProtocolHandler*(px: WakuPeerExchange) =
remotePeerInfoList.add(record.toRemotePeerInfo().get)
let newPeers = remotePeerInfoList.filterIt(
not px.peerManager.switch.isConnected(it.peerId))
not wpx.peerManager.switch.isConnected(it.peerId))
if newPeers.len() > 0:
waku_px_peers_received_unknown.inc(newPeers.len().int64())
debug "Connecting to newly discovered peers", count=newPeers.len()
await px.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange")
await wpx.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange")
px.handler = handler
px.codec = WakuPeerExchangeCodec
wpx.handler = handler
wpx.codec = WakuPeerExchangeCodec
proc init*(T: type WakuPeerExchange,
peerManager: PeerManager,
wakuDiscv5: Option[WakuDiscoveryV5] = none(WakuDiscoveryV5)
): T =
let px = WakuPeerExchange(
proc new*(T: type WakuPeerExchange,
peerManager: PeerManager,
wakuDiscv5: Option[WakuDiscoveryV5] = none(WakuDiscoveryV5)): T =
let wpx = WakuPeerExchange(
peerManager: peerManager,
wakuDiscv5: wakuDiscv5
)
px.initProtocolHandler()
return px
proc setPeer*(wpx: WakuPeerExchange, peer: RemotePeerInfo) =
wpx.peerManager.addPeer(peer, WakuPeerExchangeCodec)
waku_px_peers.inc()
wpx.initProtocolHandler()
return wpx