mirror of https://github.com/waku-org/nwaku.git
chore: add peer filtering by cluster for waku peer exchange (#2932)
This commit is contained in:
parent
e4e01fabfe
commit
b4618f98ed
|
@ -23,6 +23,8 @@ import
|
|||
waku_relay,
|
||||
waku_core,
|
||||
waku_core/message/codec,
|
||||
common/enr/builder,
|
||||
waku_enr/sharding,
|
||||
],
|
||||
../testlib/[wakucore, wakunode, simple_mock, assertions],
|
||||
./utils.nim
|
||||
|
@ -237,6 +239,39 @@ suite "Waku Peer Exchange":
|
|||
response.isErr
|
||||
response.error == "peer_not_found_failure"
|
||||
|
||||
asyncTest "Pool filtering":
|
||||
let
|
||||
key1 = generateSecp256k1Key()
|
||||
key2 = generateSecp256k1Key()
|
||||
cluster: Option[uint16] = some(uint16(16))
|
||||
bindIp = parseIpAddress("0.0.0.0")
|
||||
nodeTcpPort = Port(64010)
|
||||
nodeUdpPort = Port(9000)
|
||||
|
||||
var
|
||||
builder1 = EnrBuilder.init(key1)
|
||||
builder2 = EnrBuilder.init(key2)
|
||||
|
||||
builder1.withIpAddressAndPorts(some(bindIp), some(nodeTcpPort), some(nodeUdpPort))
|
||||
builder2.withIpAddressAndPorts(some(bindIp), some(nodeTcpPort), some(nodeUdpPort))
|
||||
builder1.withShardedTopics(@["/waku/2/rs/1/7"]).expect("valid topic")
|
||||
builder2.withShardedTopics(@["/waku/2/rs/16/32"]).expect("valid topic")
|
||||
|
||||
let
|
||||
enr1 = builder1.build().expect("valid ENR")
|
||||
enr2 = builder2.build().expect("valid ENR")
|
||||
|
||||
var
|
||||
peerInfo1 = enr1.toRemotePeerInfo().expect("valid PeerInfo")
|
||||
peerInfo2 = enr2.toRemotePeerInfo().expect("valid PeerInfo")
|
||||
|
||||
peerInfo1.origin = PeerOrigin.Discv5
|
||||
peerInfo2.origin = PeerOrigin.Discv5
|
||||
|
||||
check:
|
||||
not poolFilter(cluster, peerInfo1)
|
||||
poolFilter(cluster, peerInfo2)
|
||||
|
||||
asyncTest "Request 0 peers, with 1 peer in PeerExchange":
|
||||
# Given two valid nodes with PeerExchange
|
||||
let
|
||||
|
|
|
@ -341,7 +341,7 @@ proc setupProtocols(
|
|||
# waku peer exchange setup
|
||||
if conf.peerExchangeNode != "" or conf.peerExchange:
|
||||
try:
|
||||
await mountPeerExchange(node)
|
||||
await mountPeerExchange(node, some(conf.clusterId))
|
||||
except CatchableError:
|
||||
return
|
||||
err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
|
||||
|
|
|
@ -1101,10 +1101,12 @@ proc mountRlnRelay*(
|
|||
|
||||
## Waku peer-exchange
|
||||
|
||||
proc mountPeerExchange*(node: WakuNode) {.async: (raises: []).} =
|
||||
proc mountPeerExchange*(
|
||||
node: WakuNode, cluster: Option[uint16] = none(uint16)
|
||||
) {.async: (raises: []).} =
|
||||
info "mounting waku peer exchange"
|
||||
|
||||
node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager)
|
||||
node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, cluster)
|
||||
|
||||
if node.started:
|
||||
try:
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import
|
||||
std/[options, sequtils, random],
|
||||
std/[options, sequtils, random, sugar],
|
||||
results,
|
||||
chronicles,
|
||||
chronos,
|
||||
|
@ -50,6 +50,7 @@ type
|
|||
WakuPeerExchange* = ref object of LPProtocol
|
||||
peerManager*: PeerManager
|
||||
enrCache*: seq[enr.Record]
|
||||
cluster*: Option[uint16]
|
||||
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
|
||||
|
||||
proc request*(
|
||||
|
@ -128,12 +129,25 @@ proc getEnrsFromCache(
|
|||
# return numPeers or less if cache is smaller
|
||||
return shuffledCache[0 ..< min(shuffledCache.len.int, numPeers.int)]
|
||||
|
||||
proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
||||
if peer.origin != Discv5:
|
||||
trace "peer not from discv5", peer = $peer, origin = $peer.origin
|
||||
return false
|
||||
|
||||
if peer.enr.isNone():
|
||||
trace "peer has no ENR", peer = $peer
|
||||
return false
|
||||
|
||||
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
|
||||
trace "peer has mismatching cluster", peer = $peer
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
proc populateEnrCache(wpx: WakuPeerExchange) =
|
||||
# share only peers that i) are reachable ii) come from discv5
|
||||
let withEnr = wpx.peerManager.peerStore
|
||||
.getReachablePeers()
|
||||
.filterIt(it.origin == Discv5)
|
||||
.filterIt(it.enr.isSome)
|
||||
# share only peers that i) are reachable ii) come from discv5 iii) share cluster
|
||||
let withEnr =
|
||||
wpx.peerManager.peerStore.getReachablePeers().filterIt(poolFilter(wpx.cluster, it))
|
||||
|
||||
# either what we have or max cache size
|
||||
var newEnrCache = newSeq[enr.Record](0)
|
||||
|
@ -181,8 +195,12 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
|
|||
wpx.handler = handler
|
||||
wpx.codec = WakuPeerExchangeCodec
|
||||
|
||||
proc new*(T: type WakuPeerExchange, peerManager: PeerManager): T =
|
||||
let wpx = WakuPeerExchange(peerManager: peerManager)
|
||||
proc new*(
|
||||
T: type WakuPeerExchange,
|
||||
peerManager: PeerManager,
|
||||
cluster: Option[uint16] = none(uint16),
|
||||
): T =
|
||||
let wpx = WakuPeerExchange(peerManager: peerManager, cluster: cluster)
|
||||
wpx.initProtocolHandler()
|
||||
asyncSpawn wpx.updatePxEnrCache()
|
||||
return wpx
|
||||
|
|
Loading…
Reference in New Issue