segregate peer-exchange client and service implementation (#3523)

This commit is contained in:
Prem Chaitanya Prathi 2025-08-13 12:04:01 +05:30 committed by GitHub
parent e4358c9718
commit 4379f9ec50
11 changed files with 192 additions and 154 deletions

View File

@ -78,11 +78,11 @@ suite "Waku Peer Exchange":
check:
node.peerManager.switch.peerStore.peers.len == 0
res.error.status_code == SERVICE_UNAVAILABLE
res.error.status_desc == some("PeerExchange is not mounted")
res.error.status_desc == some("PeerExchangeClient is not mounted")
asyncTest "Node fetches with mounted peer exchange, but no peers":
# Given a node with peer exchange mounted
await node.mountPeerExchange()
await node.mountPeerExchangeClient()
# When a node fetches peers
let res = await node.fetchPeerExchangePeers(1)
@ -95,7 +95,7 @@ suite "Waku Peer Exchange":
asyncTest "Node succesfully exchanges px peers with faked discv5":
# Given both nodes mount peer exchange
await allFutures([node.mountPeerExchange(), node2.mountPeerExchange()])
await allFutures([node.mountPeerExchangeClient(), node2.mountPeerExchange()])
check node.peerManager.switch.peerStore.peers.len == 0
# Mock that we discovered a node (to avoid running discv5)
@ -271,6 +271,7 @@ suite "Waku Peer Exchange with discv5":
# Mount peer exchange
await node1.mountPeerExchange()
await node3.mountPeerExchange()
await node3.mountPeerExchangeClient()
let dialResponse =
await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo())

View File

@ -16,6 +16,7 @@ import
waku_peer_exchange/rpc,
waku_peer_exchange/rpc_codec,
waku_peer_exchange/protocol,
waku_peer_exchange/client,
node/peer_manager,
waku_core,
common/enr/builder,
@ -145,7 +146,7 @@ suite "Waku Peer Exchange":
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])
# Create connection
let connOpt = await node2.peerManager.dialPeer(
@ -168,10 +169,10 @@ suite "Waku Peer Exchange":
node1.wakuPeerExchange.enrCache.add(enr2)
# Request 2 peer from px. Test all request variants
let response1 = await node2.wakuPeerExchange.request(2)
let response1 = await node2.wakuPeerExchangeClient.request(2)
let response2 =
await node2.wakuPeerExchange.request(2, node1.peerInfo.toRemotePeerInfo())
let response3 = await node2.wakuPeerExchange.request(2, connOpt.get())
await node2.wakuPeerExchangeClient.request(2, node1.peerInfo.toRemotePeerInfo())
let response3 = await node2.wakuPeerExchangeClient.request(2, connOpt.get())
# Check the response or dont even continue
require:
@ -213,7 +214,7 @@ suite "Waku Peer Exchange":
await connOpt.get().close()
# Request 2 peer from px
let response = await node1.wakuPeerExchange.request(2, connOpt.get())
let response = await node1.wakuPeerExchangeClient.request(2, connOpt.get())
# Check that it failed gracefully
check:
@ -225,10 +226,10 @@ suite "Waku Peer Exchange":
let
switch = newTestSwitch()
peerManager = PeerManager.new(switch)
peerExchange = WakuPeerExchange.new(peerManager)
peerExchangeClient = WakuPeerExchangeClient.new(peerManager)
# When requesting 0 peers
let response = await peerExchange.request(0)
let response = await peerExchangeClient.request(0)
# Then the response should be an error
check:
@ -278,7 +279,7 @@ suite "Waku Peer Exchange":
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])
# Connect the nodes
let dialResponse = await node2.peerManager.dialPeer(
@ -294,7 +295,7 @@ suite "Waku Peer Exchange":
node1.wakuPeerExchange.enrCache.add(record)
# When requesting 0 peers
let response = await node1.wakuPeerExchange.request(0)
let response = await node2.wakuPeerExchangeClient.request(0)
# Then the response should be empty
assertResultOk(response)
@ -310,19 +311,19 @@ suite "Waku Peer Exchange":
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchange()])
# Mock that we have discovered one enr
var record = enr.Record()
check record.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
node1.wakuPeerExchange.enrCache.add(record)
node2.wakuPeerExchange.enrCache.add(record)
# When making any request with an invalid peer info
var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo()
remotePeerInfo2.peerId.data.add(255.byte)
let response = await node1.wakuPeerExchange.request(1, remotePeerInfo2)
let response = await node1.wakuPeerExchangeClient.request(1, remotePeerInfo2)
# Then the response should be an error
check:
@ -337,10 +338,11 @@ suite "Waku Peer Exchange":
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountPeerExchange()))
await allFutures(nodes.mapIt(it.mountPeerExchangeClient()))
# Multiple nodes request to node 0
for i in 1 ..< 3:
let resp = await nodes[i].wakuPeerExchange.request(
let resp = await nodes[i].wakuPeerExchangeClient.request(
2, nodes[0].switch.peerInfo.toRemotePeerInfo()
)
require resp.isOk
@ -409,7 +411,7 @@ suite "Waku Peer Exchange":
await allFutures(
[
node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)),
node2.mountPeerExchange(),
node2.mountPeerExchangeClient(),
]
)
@ -436,19 +438,19 @@ suite "Waku Peer Exchange":
await sleepAsync(150.milliseconds)
# Request 2 peer from px. Test all request variants
let response1 = await node2.wakuPeerExchange.request(1)
let response1 = await node2.wakuPeerExchangeClient.request(1)
check:
response1.isOk
response1.get().peerInfos.len == 1
let response2 =
await node2.wakuPeerExchange.request(1, node1.peerInfo.toRemotePeerInfo())
await node2.wakuPeerExchangeClient.request(1, node1.peerInfo.toRemotePeerInfo())
check:
response2.isErr
response2.error().status_code == PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS
await sleepAsync(150.milliseconds)
let response3 = await node2.wakuPeerExchange.request(1, connOpt.get())
let response3 = await node2.wakuPeerExchangeClient.request(1, connOpt.get())
check:
response3.isOk
response3.get().peerInfos.len == 1

View File

@ -17,6 +17,7 @@ import
waku_peer_exchange,
waku_peer_exchange/rpc,
waku_peer_exchange/protocol,
waku_peer_exchange/client,
node/peer_manager,
waku_core,
],
@ -40,7 +41,8 @@ proc dialForPeerExchange*(
require connOpt.isSome()
await sleepAsync(FUTURE_TIMEOUT_SHORT)
let response = await client.wakuPeerExchange.request(requestedPeers, connOpt.get())
let response =
await client.wakuPeerExchangeClient.request(requestedPeers, connOpt.get())
assertResultOk(response)
if uint64(response.get().peerInfos.len) > minimumPeers:

View File

@ -615,8 +615,10 @@ proc build*(
protectedShards: protectedShards,
relay: relay,
lightPush: lightPush,
peerExchange: peerExchange,
peerExchangeService: peerExchange,
rendezvous: rendezvous,
peerExchangeDiscovery: true,
# enabling peer exchange client by default for quicker bootstrapping
remoteStoreNode: builder.remoteStoreNode,
remoteLightPushNode: builder.remoteLightPushNode,
remoteFilterNode: builder.remoteFilterNode,

View File

@ -412,7 +412,7 @@ proc setupProtocols(
return err("failed to set node waku filter peer: " & filterNode.error)
# waku peer exchange setup
if conf.peerExchange:
if conf.peerExchangeService:
try:
await mountPeerExchange(
node, some(conf.clusterId), node.rateLimitSettings.getSetting(PEEREXCHG)
@ -429,6 +429,8 @@ proc setupProtocols(
return
err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
if conf.peerExchangeDiscovery:
await node.mountPeerExchangeClient()
return ok()
## Start node
@ -473,7 +475,7 @@ proc startNode*(
#
# Use px to periodically get peers if discv5 is disabled, as discv5 nodes have their own
# periodic loop to find peers and px returned peers actually come from discv5
if conf.peerExchange and not conf.discv5Conf.isSome():
if conf.peerExchangeDiscovery and not conf.discv5Conf.isSome():
node.startPeerExchangeLoop()
# Maintain relay connections

View File

@ -85,7 +85,8 @@ type WakuConf* {.requiresInit.} = ref object
relay*: bool
lightPush*: bool
peerExchange*: bool
peerExchangeService*: bool
peerExchangeDiscovery*: bool
# TODO: remove relay peer exchange
relayPeerExchange*: bool
@ -145,7 +146,7 @@ proc logConf*(conf: WakuConf) =
store = conf.storeServiceConf.isSome(),
filter = conf.filterServiceConf.isSome(),
lightPush = conf.lightPush,
peerExchange = conf.peerExchange
peerExchange = conf.peerExchangeService
info "Configuration. Network", cluster = conf.clusterId

View File

@ -111,6 +111,7 @@ type
wakuLightPush*: WakuLightPush
wakuLightpushClient*: WakuLightPushClient
wakuPeerExchange*: WakuPeerExchange
wakuPeerExchangeClient*: WakuPeerExchangeClient
wakuMetadata*: WakuMetadata
wakuAutoSharding*: Option[Sharding]
enr*: enr.Record
@ -1281,21 +1282,26 @@ proc mountPeerExchange*(
except LPError:
error "failed to mount wakuPeerExchange", error = getCurrentExceptionMsg()
proc mountPeerExchangeClient*(node: WakuNode) {.async: (raises: []).} =
info "mounting waku peer exchange client"
if node.wakuPeerExchangeClient.isNil():
node.wakuPeerExchangeClient = WakuPeerExchangeClient.new(node.peerManager)
proc fetchPeerExchangePeers*(
node: Wakunode, amount = DefaultPXNumPeersReq
): Future[Result[int, PeerExchangeResponseStatus]] {.async: (raises: []).} =
if node.wakuPeerExchange.isNil():
error "could not get peers from px, waku peer-exchange is nil"
if node.wakuPeerExchangeClient.isNil():
error "could not get peers from px, waku peer-exchange-client is nil"
return err(
(
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some("PeerExchange is not mounted"),
status_desc: some("PeerExchangeClient is not mounted"),
)
)
info "Retrieving peer info via peer exchange protocol", amount
let pxPeersRes = await node.wakuPeerExchange.request(amount)
if pxPeersRes.isOk:
let pxPeersRes = await node.wakuPeerExchangeClient.request(amount)
if pxPeersRes.isOk():
var validPeers = 0
let peers = pxPeersRes.get().peerInfos
for pi in peers:
@ -1313,17 +1319,19 @@ proc fetchPeerExchangePeers*(
proc peerExchangeLoop(node: WakuNode) {.async.} =
while true:
await sleepAsync(1.minutes)
if not node.started:
await sleepAsync(5.seconds)
continue
(await node.fetchPeerExchangePeers()).isOkOr:
warn "Cannot fetch peers from peer exchange", cause = error
await sleepAsync(1.minutes)
proc startPeerExchangeLoop*(node: WakuNode) =
if node.wakuPeerExchange.isNil():
if node.wakuPeerExchangeClient.isNil():
error "startPeerExchangeLoop: Peer Exchange is not mounted"
return
node.wakuPeerExchange.pxLoopHandle = node.peerExchangeLoop()
info "Starting peer exchange loop"
node.wakuPeerExchangeClient.pxLoopHandle = node.peerExchangeLoop()
# TODO: Move to application module (e.g., wakunode2.nim)
proc setPeerExchangePeer*(
@ -1561,6 +1569,10 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil():
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()
if not node.wakuPeerExchangeClient.isNil() and
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
if not node.wakuRendezvous.isNil():
await node.wakuRendezvous.stopWait()

View File

@ -1,5 +1,5 @@
{.push raises: [].}
import ./waku_peer_exchange/[protocol, rpc]
import ./waku_peer_exchange/[protocol, rpc, common, client]
export protocol, rpc
export protocol, rpc, common, client

View File

@ -0,0 +1,102 @@
import std/options, results, chronicles, chronos, metrics
import ./common, ./rpc, ./rpc_codec, ../node/peer_manager
from ../waku_core/codecs import WakuPeerExchangeCodec
declarePublicGauge waku_px_peers_received_total,
"number of ENRs received via peer exchange"
declarePublicCounter waku_px_client_errors, "number of peer exchange errors", ["type"]
logScope:
topics = "waku peer_exchange client"
type WakuPeerExchangeClient* = ref object
peerManager*: PeerManager
pxLoopHandle*: Future[void]
proc new*(T: type WakuPeerExchangeClient, peerManager: PeerManager): T =
WakuPeerExchangeClient(peerManager: peerManager)
proc request*(
wpx: WakuPeerExchangeClient, numPeers = DefaultPXNumPeersReq, conn: Connection
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
let rpc = PeerExchangeRpc.makeRequest(numPeers)
var buffer: seq[byte]
var callResult =
(status_code: PeerExchangeResponseStatusCode.SUCCESS, status_desc: none(string))
try:
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
error "exception when handling peer exchange request", error = exc.msg
waku_px_client_errors.inc(labelValues = ["error_sending_or_receiving_px_req"])
callResult = (
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some($exc.msg),
)
finally:
# close, no more data is expected
await conn.closeWithEof()
if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS:
error "peer exchange request failed", status_code = callResult.status_code
return err(callResult)
let decoded = PeerExchangeRpc.decode(buffer).valueOr:
error "peer exchange request error decoding buffer", error = $error
return err(
(
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
status_desc: some($error),
)
)
if decoded.response.status_code != PeerExchangeResponseStatusCode.SUCCESS:
error "peer exchange request error", status_code = decoded.response.status_code
return err(
(
status_code: decoded.response.status_code,
status_desc: decoded.response.status_desc,
)
)
return ok(decoded.response)
proc request*(
wpx: WakuPeerExchangeClient, numPeers = DefaultPXNumPeersReq, peer: RemotePeerInfo
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
try:
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
if connOpt.isNone():
error "error in request connOpt is none"
return err(
(
status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE,
status_desc: some(dialFailure),
)
)
return await wpx.request(numPeers, connOpt.get())
except CatchableError:
error "peer exchange request exception", error = getCurrentExceptionMsg()
return err(
(
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
status_desc: some("exception dialing peer: " & getCurrentExceptionMsg()),
)
)
proc request*(
wpx: WakuPeerExchangeClient, numPeers = DefaultPXNumPeersReq
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_client_errors.inc(labelValues = [peerNotFoundFailure])
error "peer exchange error peerOpt is none"
return err(
(
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some(peerNotFoundFailure),
)
)
return await wpx.request(numPeers, peerOpt.get())

View File

@ -0,0 +1,21 @@
import results, metrics, chronos
import ./rpc, ../waku_core
const
# We add a 64kB safety buffer for protocol overhead.
# 10x-multiplier also for safety
DefaultMaxRpcSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
# TODO what is the expected size of a PX message? As currently specified, it can contain an arbitrary number of ENRs...
MaxPeersCacheSize* = 60
CacheRefreshInterval* = 10.minutes
DefaultPXNumPeersReq* = 5.uint64()
# Error types (metric label values)
const
dialFailure* = "dial_failure"
peerNotFoundFailure* = "peer_not_found_failure"
decodeRpcFailure* = "decode_rpc_failure"
retrievePeersDiscv5Error* = "retrieve_peers_discv5_failure"
pxFailure* = "px_failure"
type WakuPeerExchangeResult*[T] = Result[T, PeerExchangeResponseStatus]

View File

@ -14,136 +14,29 @@ import
../discovery/waku_discv5,
./rpc,
./rpc_codec,
../common/rate_limit/request_limiter
../common/rate_limit/request_limiter,
./common
from ../waku_core/codecs import WakuPeerExchangeCodec
export WakuPeerExchangeCodec
declarePublicGauge waku_px_peers_received_total,
"number of ENRs received via peer exchange"
declarePublicGauge waku_px_peers_received_unknown,
"number of previously unknown ENRs received via peer exchange"
declarePublicCounter waku_px_peers_sent,
"number of ENRs sent to peer exchange requesters"
declarePublicGauge waku_px_peers_cached, "number of peer exchange peer ENRs cached"
declarePublicCounter waku_px_errors, "number of peer exchange errors", ["type"]
declarePublicCounter waku_px_peers_sent,
"number of ENRs sent to peer exchange requesters"
logScope:
topics = "waku peer_exchange"
const
# We add a 64kB safety buffer for protocol overhead.
# 10x-multiplier also for safety
DefaultMaxRpcSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
# TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs...
MaxPeersCacheSize = 60
CacheRefreshInterval = 10.minutes
DefaultPXNumPeersReq* = 5.uint64()
# Error types (metric label values)
const
dialFailure = "dial_failure"
peerNotFoundFailure = "peer_not_found_failure"
decodeRpcFailure = "decode_rpc_failure"
retrievePeersDiscv5Error = "retrieve_peers_discv5_failure"
pxFailure = "px_failure"
type
WakuPeerExchangeResult*[T] = Result[T, PeerExchangeResponseStatus]
WakuPeerExchange* = ref object of LPProtocol
peerManager*: PeerManager
enrCache*: seq[enr.Record]
cluster*: Option[uint16]
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
requestRateLimiter*: RequestRateLimiter
pxLoopHandle*: Future[void]
proc request*(
wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq, conn: Connection
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
let rpc = PeerExchangeRpc.makeRequest(numPeers)
var buffer: seq[byte]
var callResult =
(status_code: PeerExchangeResponseStatusCode.SUCCESS, status_desc: none(string))
try:
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
error "exception when handling peer exchange request",
error = getCurrentExceptionMsg()
waku_px_errors.inc(labelValues = [exc.msg])
callResult = (
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some($exc.msg),
)
finally:
# close, no more data is expected
await conn.closeWithEof()
if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS:
error "peer exchange request failed", status_code = callResult.status_code
return err(callResult)
let decodedBuff = PeerExchangeRpc.decode(buffer)
if decodedBuff.isErr():
error "peer exchange request error decoding buffer", error = $decodedBuff.error
return err(
(
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
status_desc: some($decodedBuff.error),
)
)
if decodedBuff.get().response.status_code != PeerExchangeResponseStatusCode.SUCCESS:
error "peer exchange request error",
status_code = decodedBuff.get().response.status_code
return err(
(
status_code: decodedBuff.get().response.status_code,
status_desc: decodedBuff.get().response.status_desc,
)
)
return ok(decodedBuff.get().response)
proc request*(
wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq, peer: RemotePeerInfo
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
try:
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
if connOpt.isNone():
error "error in request connOpt is none"
return err(
(
status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE,
status_desc: some(dialFailure),
)
)
return await wpx.request(numPeers, connOpt.get())
except CatchableError:
error "peer exchange request exception", error = getCurrentExceptionMsg()
return err(
(
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
status_desc: some("exception dialing peer: " & getCurrentExceptionMsg()),
)
)
proc request*(
wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
error "peer exchange error peerOpt is none"
return err(
(
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some(peerNotFoundFailure),
)
)
return await wpx.request(numPeers, peerOpt.get())
type WakuPeerExchange* = ref object of LPProtocol
peerManager*: PeerManager
enrCache*: seq[enr.Record]
cluster*: Option[uint16]
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
requestRateLimiter*: RequestRateLimiter
pxLoopHandle*: Future[void]
proc respond(
wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection