refactor(px): refactor peer exchange + tests (#1527)

This commit is contained in:
Alvaro Revuelta 2023-02-09 16:59:29 +01:00 committed by GitHub
parent cdc09aeeb4
commit 9c3e5f5bc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 224 additions and 94 deletions

View File

@ -41,6 +41,7 @@ import
../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity,
../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_time,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_lightpush,
../../waku/v2/protocol/waku_peer_exchange,
@ -525,14 +526,10 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
if conf.peerExchange:
asyncSpawn runPeerExchangeDiscv5Loop(node.wakuPeerExchange)
# retrieve and connect to peer exchange peers
# retrieve px peers and add the to the peer store
if conf.peerExchangeNode != "":
info "Retrieving peer info via peer exchange protocol"
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
try:
discard await node.wakuPeerExchange.request(desiredOutDegree)
except:
return err("failed to retrieve peer info via peer exchange protocol: " & getCurrentExceptionMsg())
await node.fetchPeerExchangePeers(desiredOutDegree)
# Start keepalive, if enabled
if conf.keepAlive:

View File

@ -1,12 +1,13 @@
{.used.}
import
std/options,
std/[options, sequtils],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr
@ -18,7 +19,8 @@ import
../../waku/v2/protocol/waku_peer_exchange/rpc,
../../waku/v2/protocol/waku_peer_exchange/rpc_codec,
../test_helpers,
./utils
./utils,
./testlib/testutils
# TODO: Extend test coverage
@ -30,8 +32,8 @@ procSuite "Waku Peer Exchange":
enr1 = enr.Record(seqNum: 0, raw: @[])
enr2 = enr.Record(seqNum: 0, raw: @[])
discard enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw")
discard enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq")
check enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw")
check enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq")
let peerInfos = @[
PeerExchangePeerInfo(enr: enr1.raw),
@ -127,20 +129,127 @@ procSuite "Waku Peer Exchange":
await node1.mountPeerExchange()
await node3.mountPeerExchange()
await sleepAsync(3000.millis) # Give the algorithm some time to work its magic
# Give the algorithm some time to work its magic
await sleepAsync(3000.millis)
asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop()
node3.setPeerExchangePeer(node1.peerInfo.toRemotePeerInfo())
let connOpt = await node3.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
check:
connOpt.isSome
# Give the algorithm some time to work its magic
await sleepAsync(2000.millis)
## When
discard waitFor node3.wakuPeerExchange.request(1)
await sleepAsync(2000.millis) # Give the algorithm some time to work its magic
let response = await node3.wakuPeerExchange.request(1, connOpt.get())
## Then
check:
response.isOk
response.get().peerInfos.len == 1
node1.wakuDiscv5.protocol.nodesDiscovered > 0
node3.switch.peerStore[AddressBook].contains(node2.switch.peerInfo.peerId)
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "peer exchange request functions returns some discovered peers":
let
node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require:
connOpt.isSome
# Create some enr and add to peer exchange (sumilating disv5)
var enr1, enr2 = enr.Record()
check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB")
check enr2.fromUri("enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB")
# Mock that we have discovered these enrs
node1.wakuPeerExchange.enrCache.add(enr1)
node1.wakuPeerExchange.enrCache.add(enr2)
# Request 2 peer from px. Test all request variants
let response1 = await node2.wakuPeerExchange.request(2)
let response2 = await node2.wakuPeerExchange.request(2, node1.peerInfo.toRemotePeerInfo())
let response3 = await node2.wakuPeerExchange.request(2, connOpt.get())
# Check the response or dont even continue
require:
response1.isOk
response2.isOk
response3.isOk
check:
response1.get().peerInfos.len == 2
response2.get().peerInfos.len == 2
response3.get().peerInfos.len == 2
# Since it can return duplicates test that at least one of the enrs is in the response
response1.get().peerInfos.anyIt(it.enr == enr1.raw) or response1.get().peerInfos.anyIt(it.enr == enr2.raw)
response2.get().peerInfos.anyIt(it.enr == enr1.raw) or response2.get().peerInfos.anyIt(it.enr == enr2.raw)
response3.get().peerInfos.anyIt(it.enr == enr1.raw) or response3.get().peerInfos.anyIt(it.enr == enr2.raw)
asyncTest "peer exchange handler works as expected":
let
node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Mock that we have discovered these enrs
var enr1 = enr.Record()
check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB")
node1.wakuPeerExchange.enrCache.add(enr1)
# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require connOpt.isSome
let conn = connOpt.get()
# Send bytes so that they directly hit the handler
let rpc = PeerExchangeRpc(
request: PeerExchangeRequest(numPeers: 1))
var buffer: seq[byte]
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)
# Decode the response
let decodedBuff = PeerExchangeRpc.decode(buffer)
require decodedBuff.isOk
# Check we got back the enr we mocked
check:
decodedBuff.get().response.peerInfos.len == 1
decodedBuff.get().response.peerInfos[0].enr == enr1.raw
asyncTest "peer exchange request fails gracefully":
let
node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require connOpt.isSome
# Force closing the connection to simulate a failed peer
await connOpt.get().close()
# Request 2 peer from px
let response = await node1.wakuPeerExchange.request(2, connOpt.get())
# Check that it failed gracefully
check: response.isErr

View File

@ -1,6 +1,7 @@
{.used.}
import
std/sequtils,
stew/byteutils,
stew/shims/net as stewNet,
testutils/unittests,
@ -13,13 +14,17 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/nameresolving/mockresolver
libp2p/nameresolving/mockresolver,
eth/p2p/discoveryv5/enr
import
../../waku/v2/node/waku_node,
../../waku/v2/node/peer_manager,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_relay,
../../waku/v2/utils/peers
../../waku/v2/protocol/waku_peer_exchange,
../../waku/v2/utils/peers,
./testlib/testutils,
../test_helpers
procSuite "WakuNode":
@ -284,3 +289,29 @@ procSuite "WakuNode":
node1MultiAddrs.contains(expectedMultiaddress1)
await allFutures(node1.stop(), node2.stop())
asyncTest "Function fetchPeerExchangePeers succesfully exchanges px peers":
let
node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Mock that we discovered a node (to avoid running discv5)
var enr = enr.Record()
require enr.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB")
node2.wakuPeerExchange.enrCache.add(enr)
# Set node2 as service peer (default one) for px protocol
node1.peerManager.addServicePeer(node2.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
# Request 1 peer from peer exchange protocol
await node1.fetchPeerExchangePeers(1)
# Check that the peer ended up in the peerstore
let rpInfo = enr.toRemotePeerInfo.get()
check:
node1.peerManager.peerStore.peers.anyIt(it.peerId == rpInfo.peerId)
node1.peerManager.peerStore.peers.anyIt(it.addrs == rpInfo.addrs)

View File

@ -175,5 +175,8 @@ proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[S
proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =
return peerStore.peers.filterIt(it.connectedness != Connected)
proc getConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =
return peerStore.peers.filterIt(it.connectedness == Connected)
proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[StoredInfo] =
return peerStore.peers.filterIt(it.protos.contains(proto))

View File

@ -960,6 +960,25 @@ proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))
proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [Defect].} =
if node.wakuPeerExchange.isNil():
error "could not get peers from px, waku peer-exchange is nil"
return
info "Retrieving peer info via peer exchange protocol"
let pxPeersRes = await node.wakuPeerExchange.request(amount)
if pxPeersRes.isOk:
var validPeers = 0
for pi in pxPeersRes.get().peerInfos:
var record: enr.Record
if enr.fromBytes(record, pi.enr):
# TODO: Add source: PX
node.peerManager.addPeer(record.toRemotePeerInfo().get, WakuRelayCodec)
validPeers += 1
info "Retrieved peer info via peer exchange protocol", validPeers = validPeers
else:
warn "Failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error
# TODO: Move to application module (e.g., wakunode2.nim)
proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
if node.wakuPeerExchange.isNil():

View File

@ -29,7 +29,7 @@ logScope:
const
# We add a 64kB safety buffer for protocol overhead.
# 10x-multiplier also for safety
MaxRpcSize = 10 * MaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs...
MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs...
MaxCacheSize = 1000
CacheCleanWindow = 200
@ -49,70 +49,53 @@ type
WakuPeerExchange* = ref object of LPProtocol
peerManager*: PeerManager
wakuDiscv5: Option[WakuDiscoveryV5]
enrCache: seq[enr.Record] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
enrCache*: seq[enr.Record] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
proc sendPeerExchangeRpcToPeer(wpx: WakuPeerExchange, rpc: PeerExchangeRpc, peer: RemotePeerInfo | PeerId): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
proc request*(wpx: WakuPeerExchange, numPeers: uint64, conn: Connection): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} =
let rpc = PeerExchangeRpc(
request: PeerExchangeRequest(numPeers: numPeers))
var buffer: seq[byte]
try:
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return err("write/read failed: " & $exc.msg)
let decodedBuff = PeerExchangeRpc.decode(buffer)
if decodedBuff.isErr():
return err("decode failed: " & $decodedBuff.error)
return ok(decodedBuff.get().response)
proc request*(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} =
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
if connOpt.isNone():
return err(dialFailure)
return await wpx.request(numPeers, connOpt.get())
let connection = connOpt.get()
await connection.writeLP(rpc.encode().buffer)
return ok()
proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let rpc = PeerExchangeRpc(
request: PeerExchangeRequest(
numPeers: numPeers
)
)
let res = await wpx.sendPeerExchangeRpcToPeer(rpc, peer)
if res.isErr():
waku_px_errors.inc(labelValues = [res.error()])
return err(res.error())
return ok()
proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
return await wpx.request(numPeers, peerOpt.get())
proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo | PeerId): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
var peerInfos: seq[PeerExchangePeerInfo] = @[]
for e in enrs:
let pi = PeerExchangePeerInfo(
enr: e.raw
)
peerInfos.add(pi)
proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let rpc = PeerExchangeRpc(
response: PeerExchangeResponse(
peerInfos: peerInfos
peerInfos: enrs.mapIt(PeerExchangePeerInfo(enr: it.raw))
)
)
let res = await wpx.sendPeerExchangeRpcToPeer(rpc, peer)
if res.isErr():
waku_px_errors.inc(labelValues = [res.error()])
return err(res.error())
try:
await conn.writeLP(rpc.encode().buffer)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return err(exc.msg)
return ok()
proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
return await wpx.respond(enrs, peerOpt.get())
proc cleanCache(wpx: WakuPeerExchange) {.gcsafe.} =
wpx.enrCache.delete(0..CacheCleanWindow-1)
@ -147,45 +130,32 @@ proc getEnrsFromCache(wpx: WakuPeerExchange, numPeers: uint64): seq[enr.Record]
return @[]
for i in 0..<min(numPeers, wpx.enrCache.len().uint64()):
let ri = rand(0..<wpx.enrCache.len())
# TODO: Note that duplicated peers can be returned here
result.add(wpx.enrCache[ri])
proc initProtocolHandler(wpx: WakuPeerExchange) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buff = await conn.readLp(MaxRpcSize.int)
var buffer: seq[byte]
try:
buffer = await conn.readLp(MaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return
let res = PeerExchangeRpc.decode(buff)
if res.isErr():
let decBuf = PeerExchangeRpc.decode(buffer)
if decBuf.isErr():
waku_px_errors.inc(labelValues = [decodeRpcFailure])
return
let rpc = res.get()
# handle peer exchange request
if rpc.request != PeerExchangeRequest():
trace "peer exchange request received"
let enrs = wpx.getEnrsFromCache(rpc.request.numPeers)
discard await wpx.respond(enrs, conn.peerId)
let rpc = decBuf.get()
trace "peer exchange request received"
let enrs = wpx.getEnrsFromCache(rpc.request.numPeers)
let res = await wpx.respond(enrs, conn)
if res.isErr:
waku_px_errors.inc(labelValues = [res.error])
else:
waku_px_peers_sent.inc(enrs.len().int64())
# handle peer exchange response
if rpc.response != PeerExchangeResponse():
# todo: error handling
trace "peer exchange response received"
var record: enr.Record
var remotePeerInfoList: seq[RemotePeerInfo]
waku_px_peers_received_total.inc(rpc.response.peerInfos.len().int64())
for pi in rpc.response.peerInfos:
discard enr.fromBytes(record, pi.enr)
remotePeerInfoList.add(record.toRemotePeerInfo().get)
let newPeers = remotePeerInfoList.filterIt(
not wpx.peerManager.switch.isConnected(it.peerId))
if newPeers.len() > 0:
waku_px_peers_received_unknown.inc(newPeers.len().int64())
debug "Connecting to newly discovered peers", count=newPeers.len()
await wpx.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange")
wpx.handler = handler
wpx.codec = WakuPeerExchangeCodec

View File

@ -83,7 +83,8 @@ proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] =
var rpc = PeerExchangeRpc()
var requestBuffer: seq[byte]
discard ?pb.getField(1, requestBuffer)
if not ?pb.getField(1, requestBuffer):
return err(ProtoError.RequiredFieldMissing)
rpc.request = ?PeerExchangeRequest.decode(requestBuffer)
var responseBuffer: seq[byte]