From 4539dfc76109382f321080fdfbf92e44531c6de8 Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Wed, 23 Aug 2023 11:50:59 -0400 Subject: [PATCH] feat(discv5): topic subscriptions update discv5 filter predicate (#1918) --- apps/wakunode2/app.nim | 2 +- examples/publisher.nim | 2 +- examples/subscriber.nim | 2 +- tests/test_waku_peer_exchange.nim | 4 +- waku/waku_discv5.nim | 86 +++++++++++++++++-------------- 5 files changed, 53 insertions(+), 43 deletions(-) diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 1abc200a2..478ebc592 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -547,7 +547,7 @@ proc startApp*(app: App): Future[AppResult[void]] {.async.} = if res.isErr(): return err("failed to start waku discovery v5: " & $res.error) - asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager, some(app.record)) + asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager) asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue) return await startNode( diff --git a/examples/publisher.nim b/examples/publisher.nim index 596dea068..1c2e9d145 100644 --- a/examples/publisher.nim +++ b/examples/publisher.nim @@ -74,7 +74,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = error "failed to start discv5", error= discv5Res.error quit(1) - asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr)) + asyncSpawn wakuDiscv5.searchLoop(node.peerManager) # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: diff --git a/examples/subscriber.nim b/examples/subscriber.nim index 63e8f04d0..73e89b237 100644 --- a/examples/subscriber.nim +++ b/examples/subscriber.nim @@ -69,7 +69,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = error "failed to start discv5", error = discv5Res.error quit(1) - asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr)) + asyncSpawn wakuDiscv5.searchLoop(node.peerManager) # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: diff --git a/tests/test_waku_peer_exchange.nim b/tests/test_waku_peer_exchange.nim index d179ca080..f08949d1c 100644 --- a/tests/test_waku_peer_exchange.nim +++ b/tests/test_waku_peer_exchange.nim @@ -155,8 +155,8 @@ procSuite "Waku Peer Exchange": assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error let resultDisc2StartRes = disc2.start() assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error - asyncSpawn disc1.searchLoop(node1.peerManager, none(enr.Record)) - asyncSpawn disc2.searchLoop(node2.peerManager, none(enr.Record)) + asyncSpawn disc1.searchLoop(node1.peerManager) + asyncSpawn disc2.searchLoop(node2.peerManager) ## When var attempts = 10 diff --git a/waku/waku_discv5.nim b/waku/waku_discv5.nim index 6f6a880d9..52bf267b8 100644 --- a/waku/waku_discv5.nim +++ b/waku/waku_discv5.nim @@ -48,6 +48,31 @@ type WakuDiscoveryV5* = ref object conf: WakuDiscoveryV5Config protocol*: protocol.Protocol listening*: bool + predicate: Option[WakuDiscv5Predicate] + +proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] = + ## Filter peers based on relay sharding information + + let typeRecordRes = record.toTyped() + let typedRecord = + if typeRecordRes.isErr(): + debug "peer filtering failed", reason= $typeRecordRes.error + return none(WakuDiscv5Predicate) + else: typeRecordRes.get() + + let nodeShardOp = typedRecord.relaySharding() + let nodeShard = + if nodeShardOp.isNone(): + debug "no relay sharding information, peer filtering disabled" + return none(WakuDiscv5Predicate) + else: nodeShardOp.get() + + debug "peer filtering updated" + + let predicate = proc(record: waku_enr.Record): bool = + nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it)) + + return some(predicate) proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscoveryV5Config, record: Option[waku_enr.Record]): T = let protocol = newProtocol( @@ -64,7 +89,13 @@ proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscovery enrUdpPort = none(Port), ) - WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false) + let shardPredOp = + if record.isSome(): + shardingPredicate(record.get()) + else: + none(WakuDiscv5Predicate) + + WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false, predicate: shardPredOp) proc new*(T: type WakuDiscoveryV5, extIp: Option[ValidIpAddress], @@ -195,57 +226,29 @@ proc updateENRShards(wd: WakuDiscoveryV5, return ok() -proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] = - ## Filter peers based on relay sharding information - - let typeRecordRes = record.toTyped() - let typedRecord = - if typeRecordRes.isErr(): - debug "peer filtering failed", reason= $typeRecordRes.error - return none(WakuDiscv5Predicate) - else: typeRecordRes.get() - - let nodeShardOp = typedRecord.relaySharding() - let nodeShard = - if nodeShardOp.isNone(): - debug "no relay sharding information, peer filtering disabled" - return none(WakuDiscv5Predicate) - else: nodeShardOp.get() - - debug "peer filtering enabled" - - let predicate = proc(record: waku_enr.Record): bool = - nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it)) - - return some(predicate) - -proc findRandomPeers*(wd: WakuDiscoveryV5, pred = none(WakuDiscv5Predicate)): Future[seq[waku_enr.Record]] {.async.} = +proc findRandomPeers*(wd: WakuDiscoveryV5, overridePred = none(WakuDiscv5Predicate)): Future[seq[waku_enr.Record]] {.async.} = ## Find random peers to connect to using Discovery v5 let discoveredNodes = await wd.protocol.queryRandom() var discoveredRecords = discoveredNodes.mapIt(it.record) # Filter out nodes that do not match the predicate - if pred.isSome(): - discoveredRecords = discoveredRecords.filter(pred.get()) + if overridePred.isSome(): + discoveredRecords = discoveredRecords.filter(overridePred.get()) + elif wd.predicate.isSome(): + discoveredRecords = discoveredRecords.filter(wd.predicate.get()) return discoveredRecords #TODO abstract away PeerManager -proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager, record: Option[enr.Record]) {.async.} = +proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager) {.async.} = ## Continuously add newly discovered nodes info "Starting discovery v5 search" - let shardPredOp = - if record.isSome(): - shardingPredicate(record.get()) - else: - none(WakuDiscv5Predicate) - while wd.listening: trace "running discv5 discovery loop" - let discoveredRecords = await wd.findRandomPeers(shardPredOp) + let discoveredRecords = await wd.findRandomPeers() let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value) for peer in discoveredPeers: @@ -305,6 +308,9 @@ proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEv let subs = events.filterIt(it.kind == SubscriptionKind.PubsubSub).mapIt(it.pubsubSub) let unsubs = events.filterIt(it.kind == SubscriptionKind.PubsubUnsub).mapIt(it.pubsubUnsub) + if subs.len == 0 and unsubs.len == 0: + continue + let unsubRes = wd.updateENRShards(unsubs, false) let subRes = wd.updateENRShards(subs, true) @@ -314,8 +320,12 @@ proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEv if unsubRes.isErr(): debug "ENR shard removal failed", reason= $unsubRes.error - if subRes.isOk() and unsubRes.isOk(): - debug "ENR updated successfully" + if subRes.isErr() and unsubRes.isErr(): + continue + + debug "ENR updated successfully" + + wd.predicate = shardingPredicate(wd.protocol.localNode.record) topicSubscriptionQueue.unregister(key)