mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-28 23:10:54 +00:00
feat: discovery peer filtering for relay shard (#1804)
Add discv6 predicate that filter peer by static shard. Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com>
This commit is contained in:
parent
5d4fa3ce73
commit
a4da87bb8c
@ -251,7 +251,7 @@ procSuite "Waku Discovery v5":
|
|||||||
await allFutures(node1.start(), node2.start(), node3.start(), node4.start())
|
await allFutures(node1.start(), node2.start(), node3.start(), node4.start())
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let recordPredicate = proc(record: waku_enr.Record): bool =
|
let recordPredicate: WakuDiscv5Predicate = proc(record: waku_enr.Record): bool =
|
||||||
let typedRecord = record.toTyped()
|
let typedRecord = record.toTyped()
|
||||||
if typedRecord.isErr():
|
if typedRecord.isErr():
|
||||||
return false
|
return false
|
||||||
@ -270,7 +270,7 @@ procSuite "Waku Discovery v5":
|
|||||||
# for peer in await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate):
|
# for peer in await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate):
|
||||||
# peers.incl(peer)
|
# peers.incl(peer)
|
||||||
await sleepAsync(5.seconds) # Wait for discv5 discvery loop to run
|
await sleepAsync(5.seconds) # Wait for discv5 discvery loop to run
|
||||||
let peers = await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate)
|
let peers = await node1.wakuDiscv5.findRandomPeers(some(recordPredicate))
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
@ -309,3 +309,78 @@ procSuite "Waku Discovery v5":
|
|||||||
assert emptyRes.isOk(), emptyRes.error
|
assert emptyRes.isOk(), emptyRes.error
|
||||||
assert emptyRes.value.isNone(), $emptyRes.value
|
assert emptyRes.value.isNone(), $emptyRes.value
|
||||||
|
|
||||||
|
asyncTest "filter peer per static shard":
|
||||||
|
## Given
|
||||||
|
let recordCluster21 = block:
|
||||||
|
let
|
||||||
|
enrSeqNum = 1u64
|
||||||
|
enrPrivKey = generatesecp256k1key()
|
||||||
|
|
||||||
|
let
|
||||||
|
shardCluster: uint16 = 21
|
||||||
|
shardIndices: seq[uint16] = @[1u16, 2u16, 5u16, 7u16, 9u16, 11u16]
|
||||||
|
|
||||||
|
let shards = RelayShards.init(shardCluster, shardIndices)
|
||||||
|
|
||||||
|
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
|
||||||
|
require builder.withWakuRelaySharding(shards).isOk()
|
||||||
|
|
||||||
|
let recordRes = builder.build()
|
||||||
|
require recordRes.isOk()
|
||||||
|
recordRes.tryGet()
|
||||||
|
|
||||||
|
let recordCluster22Indices1 = block:
|
||||||
|
let
|
||||||
|
enrSeqNum = 1u64
|
||||||
|
enrPrivKey = generatesecp256k1key()
|
||||||
|
|
||||||
|
let
|
||||||
|
shardCluster: uint16 = 22
|
||||||
|
shardIndices: seq[uint16] = @[2u16, 4u16, 5u16, 8u16, 10u16, 12u16]
|
||||||
|
|
||||||
|
let shards = RelayShards.init(shardCluster, shardIndices)
|
||||||
|
|
||||||
|
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
|
||||||
|
require builder.withWakuRelaySharding(shards).isOk()
|
||||||
|
|
||||||
|
let recordRes = builder.build()
|
||||||
|
require recordRes.isOk()
|
||||||
|
recordRes.tryGet()
|
||||||
|
|
||||||
|
let recordCluster22Indices2 = block:
|
||||||
|
let
|
||||||
|
enrSeqNum = 1u64
|
||||||
|
enrPrivKey = generatesecp256k1key()
|
||||||
|
|
||||||
|
let
|
||||||
|
shardCluster: uint16 = 22
|
||||||
|
shardIndices: seq[uint16] = @[1u16, 3u16, 6u16, 7u16, 9u16, 11u16]
|
||||||
|
|
||||||
|
let shards = RelayShards.init(shardCluster, shardIndices)
|
||||||
|
|
||||||
|
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
|
||||||
|
require builder.withWakuRelaySharding(shards).isOk()
|
||||||
|
|
||||||
|
let recordRes = builder.build()
|
||||||
|
require recordRes.isOk()
|
||||||
|
recordRes.tryGet()
|
||||||
|
|
||||||
|
## When
|
||||||
|
let predicateCluster21Op = shardingPredicate(recordCluster21)
|
||||||
|
require predicateCluster21Op.isSome()
|
||||||
|
let predicateCluster21 = predicateCluster21Op.get()
|
||||||
|
|
||||||
|
let predicateCluster22Op = shardingPredicate(recordCluster22Indices1)
|
||||||
|
require predicateCluster22Op.isSome()
|
||||||
|
let predicateCluster22 = predicateCluster22Op.get()
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
predicateCluster21(recordCluster21) == true
|
||||||
|
predicateCluster21(recordCluster22Indices1) == false
|
||||||
|
predicateCluster21(recordCluster22Indices2) == false
|
||||||
|
predicateCluster22(recordCluster21) == false
|
||||||
|
predicateCluster22(recordCluster22Indices1) == true
|
||||||
|
predicateCluster22(recordCluster22Indices2) == false
|
||||||
|
|
||||||
|
|
||||||
|
@ -844,9 +844,11 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
|
|||||||
|
|
||||||
info "starting discv5 discovery loop"
|
info "starting discv5 discovery loop"
|
||||||
|
|
||||||
|
let shardPredOp = shardingPredicate(node.enr)
|
||||||
|
|
||||||
while node.wakuDiscv5.listening:
|
while node.wakuDiscv5.listening:
|
||||||
trace "running discv5 discovery loop"
|
trace "running discv5 discovery loop"
|
||||||
let discoveredRecords = await node.wakuDiscv5.findRandomPeers()
|
let discoveredRecords = await node.wakuDiscv5.findRandomPeers(shardPredOp)
|
||||||
let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value)
|
let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value)
|
||||||
|
|
||||||
for peer in discoveredPeers:
|
for peer in discoveredPeers:
|
||||||
|
@ -170,15 +170,39 @@ proc closeWait*(wd: WakuDiscoveryV5) {.async.} =
|
|||||||
wd.listening = false
|
wd.listening = false
|
||||||
await wd.protocol.closeWait()
|
await wd.protocol.closeWait()
|
||||||
|
|
||||||
proc findRandomPeers*(wd: WakuDiscoveryV5, pred: WakuDiscv5Predicate = nil): Future[seq[waku_enr.Record]] {.async.} =
|
proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
|
||||||
|
## Filter peers based on relay sharding information
|
||||||
|
|
||||||
|
let typeRecordRes = record.toTyped()
|
||||||
|
let typedRecord =
|
||||||
|
if typeRecordRes.isErr():
|
||||||
|
debug "peer filtering failed", reason= $typeRecordRes.error
|
||||||
|
return none(WakuDiscv5Predicate)
|
||||||
|
else: typeRecordRes.get()
|
||||||
|
|
||||||
|
let nodeShardOp = typedRecord.relaySharding()
|
||||||
|
let nodeShard =
|
||||||
|
if nodeShardOp.isNone():
|
||||||
|
debug "no relay sharding information, peer filtering disabled"
|
||||||
|
return none(WakuDiscv5Predicate)
|
||||||
|
else: nodeShardOp.get()
|
||||||
|
|
||||||
|
debug "peer filtering enabled"
|
||||||
|
|
||||||
|
let predicate = proc(record: waku_enr.Record): bool =
|
||||||
|
nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it))
|
||||||
|
|
||||||
|
return some(predicate)
|
||||||
|
|
||||||
|
proc findRandomPeers*(wd: WakuDiscoveryV5, pred = none(WakuDiscv5Predicate)): Future[seq[waku_enr.Record]] {.async.} =
|
||||||
## Find random peers to connect to using Discovery v5
|
## Find random peers to connect to using Discovery v5
|
||||||
let discoveredNodes = await wd.protocol.queryRandom()
|
let discoveredNodes = await wd.protocol.queryRandom()
|
||||||
|
|
||||||
var discoveredRecords = discoveredNodes.mapIt(it.record)
|
var discoveredRecords = discoveredNodes.mapIt(it.record)
|
||||||
|
|
||||||
# Filter out nodes that do not match the predicate
|
# Filter out nodes that do not match the predicate
|
||||||
if not pred.isNil():
|
if pred.isSome():
|
||||||
discoveredRecords = discoveredRecords.filter(pred)
|
discoveredRecords = discoveredRecords.filter(pred.get())
|
||||||
|
|
||||||
return discoveredRecords
|
return discoveredRecords
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user