diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 4170039cf..85e8a3b4d 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -127,6 +127,21 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T = quit(QuitFailure) else: recordRes.get() + # Check the ENR sharding info for matching config cluster id + if conf.clusterId != 0: + let res = record.toTyped() + if res.isErr(): + error "ENR setup failed", error = $res.get() + quit(QuitFailure) + + let relayShard = res.get().relaySharding().valueOr: + error "no sharding info" + quit(QuitFailure) + + if conf.clusterId != relayShard.clusterId: + error "cluster id mismatch" + quit(QuitFailure) + App( version: git_version, conf: conf, @@ -234,7 +249,13 @@ proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 = autoupdateRecord: app.conf.discv5EnrAutoUpdate, ) - WakuDiscoveryV5.new(app.rng, discv5Conf, some(app.record)) + WakuDiscoveryV5.new( + app.rng, + discv5Conf, + some(app.record), + some(app.node.peerManager), + app.node.topicSubscriptionQueue, + ) ## Init waku node instance @@ -286,11 +307,6 @@ proc initNode(conf: WakuNodeConf, ok(node) proc setupWakuApp*(app: var App): AppResult[void] = - - ## Discv5 - if app.conf.discv5Discovery: - app.wakuDiscV5 = some(app.setupDiscoveryV5()) - ## Waku node let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes) if initNodeRes.isErr(): @@ -298,6 +314,10 @@ proc setupWakuApp*(app: var App): AppResult[void] = app.node = initNodeRes.get() + ## Discv5 + if app.conf.discv5Discovery: + app.wakuDiscV5 = some(app.setupDiscoveryV5()) + ok() proc getPorts(listenAddrs: seq[MultiAddress]): @@ -341,7 +361,17 @@ proc updateNetConfig(app: var App): AppResult[void] = proc updateEnr(app: var App): AppResult[void] = let record = enrConfiguration(app.conf, app.netConf, app.key).valueOr: - return err(error) + return err("ENR setup failed: " & error) + + if app.conf.clusterId != 0: + let tRecord = record.toTyped().valueOr: + return err("ENR setup failed: " & $error) + + let relayShard = tRecord.relaySharding().valueOr: + return err("ENR setup failed: no sharding info") + + if app.conf.clusterId != relayShard.clusterId: + return err("ENR setup failed: cluster id mismatch") app.record = record app.node.enr = record @@ -377,6 +407,9 @@ proc setupProtocols(node: WakuNode, ## Optionally include persistent message storage. ## No protocols are started yet. + node.mountMetadata(conf.clusterId).isOkOr: + return err("failed to mount waku metadata protocol: " & error) + # Mount relay on all nodes var peerExchangeHandler = none(RoutingRecordsHandler) if conf.relayPeerExchange: @@ -587,11 +620,12 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, proc startApp*(app: var App): AppResult[void] = - try: - (waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes)).isOkOr: - return err(error) - except CatchableError: - return err("exception starting node: " & getCurrentExceptionMsg()) + let nodeRes = catch: (waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes)) + if nodeRes.isErr(): + return err("exception starting node: " & nodeRes.error.msg) + + nodeRes.get().isOkOr: + return err("exception starting node: " & error) # Update app data that is set dynamically on node start app.updateApp().isOkOr: @@ -599,13 +633,12 @@ proc startApp*(app: var App): AppResult[void] = if app.wakuDiscv5.isSome(): let wakuDiscv5 = app.wakuDiscv5.get() + let catchRes = catch: (waitFor wakuDiscv5.start()) + let startRes = catchRes.valueOr: + return err("failed to start waku discovery v5: " & catchRes.error.msg) - let res = wakuDiscv5.start() - if res.isErr(): - return err("failed to start waku discovery v5: " & $res.error) - - asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager) - asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue) + startRes.isOkOr: + return err("failed to start waku discovery v5: " & error) return ok() diff --git a/examples/publisher.nim b/examples/publisher.nim index da18d4e7e..df8286795 100644 --- a/examples/publisher.nim +++ b/examples/publisher.nim @@ -51,31 +51,32 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = var bootstrapNodeEnr: enr.Record discard bootstrapNodeEnr.fromURI(bootstrapNode) + let discv5Conf = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: ip, + port: Port(discv5Port), + privateKey: keys.PrivateKey(nodeKey.skkey), + bootstrapRecords: @[bootstrapNodeEnr], + autoupdateRecord: true, + ) + # assumes behind a firewall, so not care about being discoverable let wakuDiscv5 = WakuDiscoveryV5.new( - extIp= none(ValidIpAddress), - extTcpPort = none(Port), - extUdpPort = none(Port), - bindIP = ip, - discv5UdpPort = Port(discv5Port), - bootstrapEnrs = @[bootstrapNodeEnr], - privateKey = keys.PrivateKey(nodeKey.skkey), - flags = flags, - rng = node.rng, - topics = @[], - ) + node.rng, + discv5Conf, + some(node.enr), + some(node.peerManager), + node.topicSubscriptionQueue, + ) await node.start() await node.mountRelay() node.peerManager.start() - let discv5Res = wakuDiscv5.start() - if discv5Res.isErr(): - error "failed to start discv5", error= discv5Res.error + (await wakuDiscv5.start()).isOkOr: + error "failed to start discv5", error = error quit(1) - asyncSpawn wakuDiscv5.searchLoop(node.peerManager) - # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected) diff --git a/examples/subscriber.nim b/examples/subscriber.nim index 946b4809b..85cf69a12 100644 --- a/examples/subscriber.nim +++ b/examples/subscriber.nim @@ -46,31 +46,32 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = var bootstrapNodeEnr: enr.Record discard bootstrapNodeEnr.fromURI(bootstrapNode) + let discv5Conf = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: ip, + port: Port(discv5Port), + privateKey: keys.PrivateKey(nodeKey.skkey), + bootstrapRecords: @[bootstrapNodeEnr], + autoupdateRecord: true, + ) + # assumes behind a firewall, so not care about being discoverable let wakuDiscv5 = WakuDiscoveryV5.new( - extIp= none(ValidIpAddress), - extTcpPort = none(Port), - extUdpPort = none(Port), - bindIP = ip, - discv5UdpPort = Port(discv5Port), - bootstrapEnrs = @[bootstrapNodeEnr], - privateKey = keys.PrivateKey(nodeKey.skkey), - flags = flags, - rng = node.rng, - topics = @[], - ) + node.rng, + discv5Conf, + some(node.enr), + some(node.peerManager), + node.topicSubscriptionQueue, + ) await node.start() await node.mountRelay() node.peerManager.start() - let discv5Res = wakuDiscv5.start() - if discv5Res.isErr(): - error "failed to start discv5", error = discv5Res.error + (await wakuDiscv5.start()).isOkOr: + error "failed to start discv5", error = error quit(1) - asyncSpawn wakuDiscv5.searchLoop(node.peerManager) - # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index b3ef488d8..8576f2725 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -268,16 +268,38 @@ procSuite "Peer Manager": await allFutures([node1.stop(), node2.stop(), node3.stop()]) asyncTest "Peer manager drops conections to peers on different networks": - let clusterId1 = 1.uint32 - let clusterId2 = 2.uint32 + let clusterId3 = 3.uint32 + let clusterId4 = 4.uint32 let # different network - node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId1) + node1 = newTestWakuNode( + generateSecp256k1Key(), + ValidIpAddress.init("0.0.0.0"), + Port(0), + clusterId = clusterId3, + topics = @["/waku/2/rs/3/0"], + ) # same network - node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2) - node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2) + node2 = newTestWakuNode( + generateSecp256k1Key(), + ValidIpAddress.init("0.0.0.0"), + Port(0), + clusterId = clusterId4, + topics = @["/waku/2/rs/4/0"], + ) + node3 = newTestWakuNode( + generateSecp256k1Key(), + ValidIpAddress.init("0.0.0.0"), + Port(0), + clusterId = clusterId4, + topics = @["/waku/2/rs/4/0"], + ) + + discard node1.mountMetadata(clusterId3) + discard node2.mountMetadata(clusterId4) + discard node3.mountMetadata(clusterId4) # Start nodes await allFutures([node1.start(), node2.start(), node3.start()]) diff --git a/tests/test_waku_discv5.nim b/tests/test_waku_discv5.nim index 9a7a49a90..dbf18d4a1 100644 --- a/tests/test_waku_discv5.nim +++ b/tests/test_waku_discv5.nim @@ -34,10 +34,13 @@ proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey, builder.build().tryGet() -proc newTestDiscv5(privKey: libp2p_keys.PrivateKey, - bindIp: string, tcpPort: uint16, udpPort: uint16, - record: waku_enr.Record, - bootstrapRecords = newSeq[waku_enr.Record]()): WakuDiscoveryV5 = +proc newTestDiscv5( + privKey: libp2p_keys.PrivateKey, + bindIp: string, tcpPort: uint16, udpPort: uint16, + record: waku_enr.Record, + bootstrapRecords = newSeq[waku_enr.Record](), + queue = newAsyncEventQueue[SubscriptionEvent](30), + ): WakuDiscoveryV5 = let config = WakuDiscoveryV5Config( privateKey: eth_keys.PrivateKey(privKey.skkey), address: ValidIpAddress.init(bindIp), @@ -45,7 +48,12 @@ proc newTestDiscv5(privKey: libp2p_keys.PrivateKey, bootstrapRecords: bootstrapRecords, ) - let discv5 = WakuDiscoveryV5.new(rng(), config, some(record)) + let discv5 = WakuDiscoveryV5.new( + rng = rng(), + conf = config, + record = some(record), + queue = queue, + ) return discv5 @@ -122,13 +130,13 @@ procSuite "Waku Discovery v5": bootstrapRecords = @[record1, record2] ) - let res1 = node1.start() + let res1 = await node1.start() assert res1.isOk(), res1.error - let res2 = node2.start() + let res2 = await node2.start() assert res2.isOk(), res2.error - let res3 = node3.start() + let res3 = await node3.start() assert res3.isOk(), res3.error ## When @@ -240,16 +248,16 @@ procSuite "Waku Discovery v5": ) # Start nodes' discoveryV5 protocols - let res1 = node1.start() + let res1 = await node1.start() assert res1.isOk(), res1.error - let res2 = node2.start() + let res2 = await node2.start() assert res2.isOk(), res2.error - let res3 = node3.start() + let res3 = await node3.start() assert res3.isOk(), res3.error - let res4 = node4.start() + let res4 = await node4.start() assert res4.isOk(), res4.error ## Given @@ -401,22 +409,20 @@ procSuite "Waku Discovery v5": udpPort = udpPort, ) + let queue = newAsyncEventQueue[SubscriptionEvent](30) + let node = newTestDiscv5( privKey = privKey, bindIp = bindIp, tcpPort = tcpPort, udpPort = udpPort, - record = record + record = record, + queue = queue, ) - let res = node.start() + let res = await node.start() assert res.isOk(), res.error - let queue = newAsyncEventQueue[SubscriptionEvent](0) - - ## When - asyncSpawn node.subscriptionsListener(queue) - ## Then queue.emit((kind: PubsubSub, topic: shard1)) queue.emit((kind: PubsubSub, topic: shard2)) @@ -442,14 +448,13 @@ procSuite "Waku Discovery v5": queue.emit((kind: PubsubUnsub, topic: shard1)) queue.emit((kind: PubsubUnsub, topic: shard2)) - queue.emit((kind: PubsubUnsub, topic: shard3)) await sleepAsync(1.seconds) check: node.protocol.localNode.record.containsShard(shard1) == false node.protocol.localNode.record.containsShard(shard2) == false - node.protocol.localNode.record.containsShard(shard3) == false + node.protocol.localNode.record.containsShard(shard3) == true ## Cleanup await node.stop() diff --git a/tests/test_waku_peer_exchange.nim b/tests/test_waku_peer_exchange.nim index f08949d1c..d4159dd8f 100644 --- a/tests/test_waku_peer_exchange.nim +++ b/tests/test_waku_peer_exchange.nim @@ -131,7 +131,8 @@ procSuite "Waku Peer Exchange": let disc1 = WakuDiscoveryV5.new( node1.rng, conf1, - some(node1.enr) + some(node1.enr), + some(node1.peerManager), ) let conf2 = WakuDiscoveryV5Config( @@ -146,17 +147,15 @@ procSuite "Waku Peer Exchange": let disc2 = WakuDiscoveryV5.new( node2.rng, conf2, - some(node2.enr) + some(node2.enr), + some(node2.peerManager), ) - await allFutures(node1.start(), node2.start(), node3.start()) - let resultDisc1StartRes = disc1.start() + let resultDisc1StartRes = await disc1.start() assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error - let resultDisc2StartRes = disc2.start() + let resultDisc2StartRes = await disc2.start() assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error - asyncSpawn disc1.searchLoop(node1.peerManager) - asyncSpawn disc2.searchLoop(node2.peerManager) ## When var attempts = 10 diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index 1ba6a0f81..f614b272b 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -32,7 +32,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf = dnsAddrsNameServers: @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")], nat: "any", maxConnections: 50, - topics: @["/waku/2/default-waku/proto"], + topics: @[], relay: true ) @@ -55,28 +55,27 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, dns4DomainName = none(string), discv5UdpPort = none(Port), agentString = none(string), - clusterId: uint32 = 0.uint32, + clusterId: uint32 = 2.uint32, + topics: seq[string] = @["/waku/2/rs/2/0"], peerStoreCapacity = none(int)): WakuNode = var resolvedExtIp = extIp # Update extPort to default value if it's missing and there's an extIp or a DNS domain - let extPort = if (extIp.isSome() or dns4DomainName.isSome()) and - extPort.isNone(): - some(Port(60000)) - else: - extPort + let extPort = + if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000)) + else: extPort + + let conf = defaultTestWakuNodeConf() if dns4DomainName.isSome() and extIp.isNone(): - let conf = defaultTestWakuNodeConf() # If there's an error resolving the IP, an exception is thrown and test fails - let dnsRes = waitFor dnsResolve(dns4DomainName.get(), conf) - if dnsRes.isErr(): - raise newException(Defect, $dnsRes.error) - else: - resolvedExtIp = some(ValidIpAddress.init(dnsRes.get())) + let dns = (waitFor dnsResolve(dns4DomainName.get(), conf)).valueOr: + raise newException(Defect, error) + + resolvedExtIp = some(ValidIpAddress.init(dns)) - let netConfigRes = NetConfig.init( + let netConf = NetConfig.init( bindIp = bindIp, clusterId = clusterId, bindPort = bindPort, @@ -89,36 +88,33 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, wakuFlags = wakuFlags, dns4DomainName = dns4DomainName, discv5UdpPort = discv5UdpPort, - ) - let netConf = - if netConfigRes.isErr(): - raise newException(Defect, "Invalid network configuration: " & $netConfigRes.error) - else: - netConfigRes.get() + ).valueOr: + raise newException(Defect, "Invalid network configuration: " & error) var enrBuilder = EnrBuilder.init(nodeKey) + enrBuilder.withShardedTopics(topics).isOkOr: + raise newException(Defect, "Invalid record: " & error) + enrBuilder.withIpAddressAndPorts( ipAddr = netConf.enrIp, tcpPort = netConf.enrPort, udpPort = netConf.discv5UdpPort, ) - if netConf.wakuFlags.isSome(): - enrBuilder.withWakuCapabilities(netConf.wakuFlags.get()) + enrBuilder.withMultiaddrs(netConf.enrMultiaddrs) - let recordRes = enrBuilder.build() - let record = - if recordRes.isErr(): - raise newException(Defect, "Invalid record: " & $recordRes.error) - else: - recordRes.get() + if netConf.wakuFlags.isSome(): + enrBuilder.withWakuCapabilities(netConf.wakuFlags.get()) + + let record = enrBuilder.build().valueOr: + raise newException(Defect, "Invalid record: " & $error) var builder = WakuNodeBuilder.init() builder.withRng(rng()) builder.withNodeKey(nodeKey) builder.withRecord(record) - builder.withNetworkConfiguration(netConfigRes.get()) + builder.withNetworkConfiguration(netConf) builder.withPeerStorage(peerStorage, capacity = peerStoreCapacity) builder.withSwitchConfiguration( maxConnections = some(maxConnections), diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index f87d264f8..71fcd009d 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -51,12 +51,24 @@ suite "Wakunode2 - App initialization": ## When var wakunode2 = App.init(rng(), conf) - require wakunode2.setupPeerPersistence().isOk() - require wakunode2.setupDyamicBootstrapNodes().isOk() - require wakunode2.setupWakuApp().isOk() - require isOk(waitFor wakunode2.setupAndMountProtocols()) - require isOk(wakunode2.startApp()) - require wakunode2.setupMonitoringAndExternalInterfaces().isOk() + + let persRes = wakunode2.setupPeerPersistence() + assert persRes.isOk(), persRes.error + + let bootRes = wakunode2.setupDyamicBootstrapNodes() + assert bootRes.isOk(), bootRes.error + + let setupRes = wakunode2.setupWakuApp() + assert setupRes.isOk(), setupRes.error + + let mountRes = waitFor wakunode2.setupAndMountProtocols() + assert mountRes.isOk(), mountRes.error + + let startRes = wakunode2.startApp() + assert startRes.isOk(), startRes.error + + let monitorRes = wakunode2.setupMonitoringAndExternalInterfaces() + assert monitorRes.isOk(), monitorRes.error ## Then let node = wakunode2.node diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index d26829a40..51362e50d 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -155,12 +155,6 @@ proc new*(T: type WakuNode, topicSubscriptionQueue: queue ) - # mount metadata protocol - let metadata = WakuMetadata.new(netConfig.clusterId, queue) - node.switch.mount(metadata, protocolMatcher(WakuMetadataCodec)) - node.wakuMetadata = metadata - peerManager.wakuMetadata = metadata - return node proc peerInfo*(node: WakuNode): PeerInfo = @@ -189,6 +183,22 @@ proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], s # NOTE Connects to the node without a give protocol, which automatically creates streams for relay await peer_manager.connectToNodes(node.peerManager, nodes, source=source) +## Waku Metadata + +proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] = + if not node.wakuMetadata.isNil(): + return err("Waku metadata already mounted, skipping") + + let metadata = WakuMetadata.new(clusterId, node.enr, node.topicSubscriptionQueue) + + node.wakuMetadata = metadata + node.peerManager.wakuMetadata = metadata + + let catchRes = catch: node.switch.mount(node.wakuMetadata, protocolMatcher(WakuMetadataCodec)) + if catchRes.isErr(): + return err(catchRes.error.msg) + + return ok() ## Waku relay @@ -1124,6 +1134,9 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuRelay.isNil(): await node.startRelay() + if not node.wakuMetadata.isNil(): + node.wakuMetadata.start() + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = @@ -1136,8 +1149,6 @@ proc start*(node: WakuNode) {.async.} = node.started = true - node.wakuMetadata.start() - if not zeroPortPresent: printNodeNetworkInfo(node) else: @@ -1149,6 +1160,9 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuRelay.isNil(): await node.wakuRelay.stop() + if not node.wakuMetadata.isNil(): + node.wakuMetadata.stop() + await node.switch.stop() node.peerManager.stop() diff --git a/waku/waku_discv5.nim b/waku/waku_discv5.nim index 8456cab42..cb1216f77 100644 --- a/waku/waku_discv5.nim +++ b/waku/waku_discv5.nim @@ -49,6 +49,8 @@ type WakuDiscoveryV5* = ref object protocol*: protocol.Protocol listening*: bool predicate: Option[WakuDiscv5Predicate] + peerManager: Option[PeerManager] + topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent] proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] = ## Filter peers based on relay sharding information @@ -72,7 +74,9 @@ proc new*( T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscoveryV5Config, - record: Option[waku_enr.Record] + record: Option[waku_enr.Record], + peerManager: Option[PeerManager] = none(PeerManager), + queue: AsyncEventQueue[SubscriptionEvent] = newAsyncEventQueue[SubscriptionEvent](30), ): T = let shardPredOp = if record.isSome(): shardingPredicate(record.get()) @@ -101,82 +105,26 @@ proc new*( enrUdpPort = none(Port), ) - WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false, predicate: shardPredOp) - -proc new*(T: type WakuDiscoveryV5, - extIp: Option[ValidIpAddress], - extTcpPort: Option[Port], - extUdpPort: Option[Port], - bindIP: ValidIpAddress, - discv5UdpPort: Port, - bootstrapEnrs = newSeq[enr.Record](), - enrAutoUpdate = false, - privateKey: eth_keys.PrivateKey, - flags: CapabilitiesBitfield, - multiaddrs = newSeq[MultiAddress](), - rng: ref HmacDrbgContext, - topics: seq[string], - discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig - ): T {. - deprecated: "use the config and record proc variant instead".}= - - let relayShardsRes = topicsToRelayShards(topics) - - let relayShard = - if relayShardsRes.isErr(): - debug "pubsub topic parsing error", reason = relayShardsRes.error - none(RelayShards) - else: relayShardsRes.get() - - let record = block: - var builder = EnrBuilder.init(privateKey) - builder.withIpAddressAndPorts( - ipAddr = extIp, - tcpPort = extTcpPort, - udpPort = extUdpPort, - ) - builder.withWakuCapabilities(flags) - builder.withMultiaddrs(multiaddrs) - - if relayShard.isSome(): - let res = builder.withWakuRelaySharding(relayShard.get()) - - if res.isErr(): - debug "building ENR with relay sharding failed", reason = res.error - else: - debug "building ENR with relay sharding information", clusterId = $relayShard.get().clusterId(), shards = $relayShard.get().shardIds() - - builder.build().expect("Record within size limits") - - let conf = WakuDiscoveryV5Config( - discv5Config: some(discv5Config), - address: bindIP, - port: discv5UdpPort, - privateKey: privateKey, - bootstrapRecords: bootstrapEnrs, - autoupdateRecord: enrAutoUpdate, - ) - - WakuDiscoveryV5.new(rng, conf, some(record)) + WakuDiscoveryV5( + conf: conf, + protocol: protocol, + listening: false, + predicate: shardPredOp, + peerManager: peerManager, + topicSubscriptionQueue: queue, + ) proc updateENRShards(wd: WakuDiscoveryV5, newTopics: seq[PubsubTopic], add: bool): Result[void, string] = ## Add or remove shards from the Discv5 ENR + let newShardOp = topicsToRelayShards(newTopics).valueOr: + return err("ENR update failed: " & error) + + let newShard = newShardOp.valueOr: + return ok() - let newShardOp = ?topicsToRelayShards(newTopics) - - let newShard = - if newShardOp.isSome(): - newShardOp.get() - else: - return ok() - - let typedRecordRes = wd.protocol.localNode.record.toTyped() - let typedRecord = - if typedRecordRes.isErr(): - return err($typedRecordRes.error) - else: - typedRecordRes.get() + let typedRecord = wd.protocol.localNode.record.toTyped().valueOr: + return err("ENR update failed: " & $error) let currentShardsOp = typedRecord.relaySharding() @@ -185,14 +133,16 @@ proc updateENRShards(wd: WakuDiscoveryV5, let currentShard = currentShardsOp.get() if currentShard.clusterId != newShard.clusterId: - return err("ENR are limited to one clusterId id") + return err("ENR update failed: clusterId id mismatch") + + RelayShards.init(currentShard.clusterId, currentShard.shardIds & newShard.shardIds).valueOr: + return err("ENR update failed: " & error) - ?RelayShards.init(currentShard.clusterId, currentShard.shardIds & newShard.shardIds) elif not add and currentShardsOp.isSome(): let currentShard = currentShardsOp.get() if currentShard.clusterId != newShard.clusterId: - return err("ENR are limited to one clusterId id") + return err("ENR update failed: clusterId id mismatch") let currentSet = toHashSet(currentShard.shardIds) let newSet = toHashSet(newShard.shardIds) @@ -200,16 +150,11 @@ proc updateENRShards(wd: WakuDiscoveryV5, let indices = toSeq(currentSet - newSet) if indices.len == 0: - # Can't create RelayShard with no indices so update then return - let (field, value) = (ShardingIndicesListEnrField, newSeq[byte](3)) + return err("ENR update failed: cannot remove all shards") - let res = wd.protocol.updateRecord([(field, value)]) - if res.isErr(): - return err($res.error) + RelayShards.init(currentShard.clusterId, indices).valueOr: + return err("ENR update failed: " & error) - return ok() - - ?RelayShards.init(currentShard.clusterId, indices) elif add and currentShardsOp.isNone(): newShard else: return ok() @@ -217,18 +162,13 @@ proc updateENRShards(wd: WakuDiscoveryV5, if resultShard.shardIds.len >= ShardingIndicesListMaxLength: (ShardingBitVectorEnrField, resultShard.toBitVector()) else: - let listRes = resultShard.toIndicesList() - let list = - if listRes.isErr(): - return err($listRes.error) - else: - listRes.get() + let list = resultShard.toIndicesList().valueOr: + return err("ENR update failed: " & $error) (ShardingIndicesListEnrField, list) - let res = wd.protocol.updateRecord([(field, value)]) - if res.isErr(): - return err($res.error) + wd.protocol.updateRecord([(field, value)]).isOkOr: + return err("ENR update failed: " & $error) return ok() @@ -246,10 +186,12 @@ proc findRandomPeers*(wd: WakuDiscoveryV5, overridePred = none(WakuDiscv5Predica return discoveredRecords -#TODO abstract away PeerManager -proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager) {.async.} = +proc searchLoop(wd: WakuDiscoveryV5) {.async.} = ## Continuously add newly discovered nodes + let peerManager = wd.peerManager.valueOr: + return + info "Starting discovery v5 search" while wd.listening: @@ -267,50 +209,13 @@ proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager) {.async.} = # Also, give some time to dial the discovered nodes and update stats, etc. await sleepAsync(5.seconds) -proc start*(wd: WakuDiscoveryV5): Result[void, string] = - if wd.listening: - return err("already listening") - - info "Starting discovery v5 service" - - debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port - try: - wd.protocol.open() - except CatchableError: - return err("failed to open udp port: " & getCurrentExceptionMsg()) - - wd.listening = true - - trace "start discv5 service" - wd.protocol.start() - - debug "Successfully started discovery v5 service" - info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri() - - ok() - -proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} = - if not wd.listening: - return - - info "Stopping discovery v5 service" - - wd.listening = false - trace "Stop listening on discv5 port" - await wd.protocol.closeWait() - - debug "Successfully stopped discovery v5 service" - -proc subscriptionsListener*( - wd: WakuDiscoveryV5, - topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent] - ) {.async.} = +proc subscriptionsListener(wd: WakuDiscoveryV5) {.async.} = ## Listen for pubsub topics subscriptions changes - let key = topicSubscriptionQueue.register() + let key = wd.topicSubscriptionQueue.register() while wd.listening: - let events = await topicSubscriptionQueue.waitEvents(key) + let events = await wd.topicSubscriptionQueue.waitEvents(key) # Since we don't know the events we will receive we have to anticipate. @@ -336,7 +241,44 @@ proc subscriptionsListener*( wd.predicate = shardingPredicate(wd.protocol.localNode.record) - topicSubscriptionQueue.unregister(key) + wd.topicSubscriptionQueue.unregister(key) + +proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async.} = + if wd.listening: + return err("already listening") + + info "Starting discovery v5 service" + + debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port + try: + wd.protocol.open() + except CatchableError: + return err("failed to open udp port: " & getCurrentExceptionMsg()) + + wd.listening = true + + trace "start discv5 service" + wd.protocol.start() + + asyncSpawn wd.searchLoop() + asyncSpawn wd.subscriptionsListener() + + debug "Successfully started discovery v5 service" + info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri() + + ok() + +proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} = + if not wd.listening: + return + + info "Stopping discovery v5 service" + + wd.listening = false + trace "Stop listening on discv5 port" + await wd.protocol.closeWait() + + debug "Successfully stopped discovery v5 service" ## Helper functions diff --git a/waku/waku_metadata/protocol.nim b/waku/waku_metadata/protocol.nim index eefcf5b82..78573650a 100644 --- a/waku/waku_metadata/protocol.nim +++ b/waku/waku_metadata/protocol.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/[options, sequtils, random, sets], + std/[options, sequtils, sets], stew/results, chronicles, chronos, @@ -15,7 +15,9 @@ import eth/p2p/discoveryv5/enr import ../common/nimchronos, + ../common/enr, ../waku_core, + ../waku_enr, ./rpc logScope: @@ -93,13 +95,24 @@ proc initProtocolHandler*(m: WakuMetadata) = proc new*(T: type WakuMetadata, clusterId: uint32, + enr: Record, queue: AsyncEventQueue[SubscriptionEvent], ): T = - let wm = WakuMetadata(clusterId: clusterId, topicSubscriptionQueue: queue) + var (cluster, shards) = (clusterId, initHashSet[uint32]()) + + let enrRes = enr.toTyped() + if enrRes.isOk(): + let shardingRes = enrRes.get().relaySharding() + if shardingRes.isSome(): + let relayShard = shardingRes.get() + cluster = uint32(relayShard.clusterId) + shards = toHashSet(relayShard.shardIds.mapIt(uint32(it))) + + let wm = WakuMetadata(clusterId: cluster, shards: shards, topicSubscriptionQueue: queue) wm.initProtocolHandler() - info "Created WakuMetadata protocol", clusterId=clusterId + info "Created WakuMetadata protocol", clusterId=cluster return wm