diff --git a/.github/workflows/container-image.yml b/.github/workflows/container-image.yml index d8de9e254..b5953f263 100644 --- a/.github/workflows/container-image.yml +++ b/.github/workflows/container-image.yml @@ -59,7 +59,7 @@ jobs: echo "commit_hash=$(git rev-parse HEAD)" >> $GITHUB_OUTPUT docker login -u ${QUAY_USER} -p ${QUAY_PASSWORD} quay.io - docker build -t ${IMAGE} -f docker/binaries/Dockerfile.bn.amd64 --label quay.expires-after=7d . + docker build -t ${IMAGE} -f docker/binaries/Dockerfile.bn.amd64 --label quay.expires-after=30d . docker push ${IMAGE} env: QUAY_PASSWORD: ${{ secrets.QUAY_PASSWORD }} diff --git a/CHANGELOG.md b/CHANGELOG.md index ceb66d14e..467c9aac2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,26 @@ +## v0.21.1 (2023-10-26) + +This patch release addresses the following issues: +- WSS connections being suddenly terminated under rare conditions +- Ability for the user to control announced multiaddresses + +### Changes + +- adding ext-multiaddr-only CLI flag ([#2141](https://github.com/waku-org/nwaku/issues/2141)) ([e2dfc2ed](https://github.com/waku-org/nwaku/commit/e2dfc2ed)) +- bumping nim-libp2p to include WSS fix ([#2150](https://github.com/waku-org/nwaku/issues/2150)) ([18b5149a](https://github.com/waku-org/nwaku/commit/18b5149a)) + +This is a patch release that is fully backwards-compatible with release `v0.21.0`. + +It supports the same [libp2p protocols](https://docs.libp2p.io/concepts/protocols/): +| Protocol | Spec status | Protocol id | +| ---: | :---: | :--- | +| [`11/WAKU2-RELAY`](https://rfc.vac.dev/spec/11/) | `stable` | `/vac/waku/relay/2.0.0` | +| [`12/WAKU2-FILTER`](https://rfc.vac.dev/spec/12/) | `draft` | `/vac/waku/filter/2.0.0-beta1`
`/vac/waku/filter-subscribe/2.0.0-beta1`
`/vac/waku/filter-push/2.0.0-beta1` | +| [`13/WAKU2-STORE`](https://rfc.vac.dev/spec/13/) | `draft` | `/vac/waku/store/2.0.0-beta4` | +| [`19/WAKU2-LIGHTPUSH`](https://rfc.vac.dev/spec/19/) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` | + +The Waku v1 implementation has been removed from this repository and can be found in a separate [Waku Legacy](https://github.com/waku-org/waku-legacy) repository. + ## v0.21.0 (2023-10-18) > Note: This is the last release supporting the `--topic` option. It is being deprecated in favor of a more specific options `--pubsub-topic` & `--content-topic` diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index f103acb17..4170039cf 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -8,6 +8,8 @@ import stew/results, chronicles, chronos, + libp2p/wire, + libp2p/multicodec, libp2p/crypto/crypto, libp2p/nameresolving/dnsresolver, libp2p/protocols/pubsub/gossipsub, @@ -29,7 +31,7 @@ import ../../waku/node/peer_manager/peer_store/waku_peer_storage, ../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../../waku/waku_api/message_cache, - ../../waku/waku_api/cache_handlers, + ../../waku/waku_api/handlers, ../../waku/waku_api/rest/server, ../../waku/waku_api/rest/debug/handlers as rest_debug_api, ../../waku/waku_api/rest/relay/handlers as rest_relay_api, @@ -117,39 +119,8 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T = quit(QuitFailure) else: netConfigRes.get() - var enrBuilder = EnrBuilder.init(key) + let recordRes = enrConfiguration(conf, netConfig, key) - enrBuilder.withIpAddressAndPorts( - netConfig.enrIp, - netConfig.enrPort, - netConfig.discv5UdpPort - ) - - if netConfig.wakuFlags.isSome(): - enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get()) - - enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) - - let topics = - if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0: - let shardsRes = conf.contentTopics.mapIt(getShard(it)) - for res in shardsRes: - if res.isErr(): - error "failed to shard content topic", error=res.error - quit(QuitFailure) - - let shards = shardsRes.mapIt(it.get()) - - conf.pubsubTopics & shards - else: - conf.topics - - let addShardedTopics = enrBuilder.withShardedTopics(topics) - if addShardedTopics.isErr(): - error "failed to add sharded topics to ENR", error=addShardedTopics.error - quit(QuitFailure) - - let recordRes = enrBuilder.build() let record = if recordRes.isErr(): error "failed to create record", error=recordRes.error @@ -329,6 +300,72 @@ proc setupWakuApp*(app: var App): AppResult[void] = ok() +proc getPorts(listenAddrs: seq[MultiAddress]): + AppResult[tuple[tcpPort, websocketPort: Option[Port]]] = + + var tcpPort, websocketPort = none(Port) + + for a in listenAddrs: + if a.isWsAddress(): + if websocketPort.isNone(): + let wsAddress = initTAddress(a).valueOr: + return err("getPorts wsAddr error:" & $error) + websocketPort = some(wsAddress.port) + elif tcpPort.isNone(): + let tcpAddress = initTAddress(a).valueOr: + return err("getPorts tcpAddr error:" & $error) + tcpPort = some(tcpAddress.port) + + return ok((tcpPort: tcpPort, websocketPort: websocketPort)) + +proc updateNetConfig(app: var App): AppResult[void] = + + var conf = app.conf + let (tcpPort, websocketPort) = getPorts(app.node.switch.peerInfo.listenAddrs).valueOr: + return err("Could not retrieve ports " & error) + + if tcpPort.isSome(): + conf.tcpPort = tcpPort.get() + + if websocketPort.isSome(): + conf.websocketPort = websocketPort.get() + + # Rebuild NetConfig with bound port values + let netConf = networkConfiguration(conf, clientId).valueOr: + return err("Could not update NetConfig: " & error) + + app.netConf = netConf + + return ok() + +proc updateEnr(app: var App): AppResult[void] = + + let record = enrConfiguration(app.conf, app.netConf, app.key).valueOr: + return err(error) + + app.record = record + app.node.enr = record + + if app.conf.discv5Discovery: + app.wakuDiscV5 = some(app.setupDiscoveryV5()) + + return ok() + +proc updateApp(app: var App): AppResult[void] = + + if app.conf.tcpPort == Port(0) or app.conf.websocketPort == Port(0): + + updateNetConfig(app).isOkOr: + return err("error calling updateNetConfig: " & $error) + + updateEnr(app).isOkOr: + return err("error calling updateEnr: " & $error) + + app.node.announcedAddresses = app.netConf.announcedAddresses + + printNodeNetworkInfo(app.node) + + return ok() ## Mount protocols @@ -548,7 +585,18 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, return ok() -proc startApp*(app: App): Future[AppResult[void]] {.async.} = +proc startApp*(app: var App): AppResult[void] = + + try: + (waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes)).isOkOr: + return err(error) + except CatchableError: + return err("exception starting node: " & getCurrentExceptionMsg()) + + # Update app data that is set dynamically on node start + app.updateApp().isOkOr: + return err("Error in updateApp: " & $error) + if app.wakuDiscv5.isSome(): let wakuDiscv5 = app.wakuDiscv5.get() @@ -559,17 +607,40 @@ proc startApp*(app: App): Future[AppResult[void]] {.async.} = asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager) asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue) - return await startNode( - app.node, - app.conf, - app.dynamicBootstrapNodes - ) + return ok() + ## Monitoring and external interfaces proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNodeConf): AppResult[RestServerRef] = - let server = ? newRestHttpServer(address, port) + + # Used to register api endpoints that are not currently installed as keys, + # values are holding error messages to be returned to the client + var notInstalledTab: Table[string, string] = initTable[string, string]() + + proc requestErrorHandler(error: RestRequestError, + request: HttpRequestRef): + Future[HttpResponseRef] {.async.} = + case error + of RestRequestError.Invalid: + return await request.respond(Http400, "Invalid request", HttpTable.init()) + of RestRequestError.NotFound: + let rootPath = request.rawPath.split("/")[1] + if notInstalledTab.hasKey(rootPath): + return await request.respond(Http404, notInstalledTab[rootPath], HttpTable.init()) + else: + return await request.respond(Http400, "Bad request initiated. Invalid path or method used.", HttpTable.init()) + of RestRequestError.InvalidContentBody: + return await request.respond(Http400, "Invalid content body", HttpTable.init()) + of RestRequestError.InvalidContentType: + return await request.respond(Http400, "Invalid content type", HttpTable.init()) + of RestRequestError.Unexpected: + return defaultResponse() + + return defaultResponse() + + let server = ? newRestHttpServer(address, port, requestErrorHandler = requestErrorHandler) ## Admin REST API installAdminApiHandlers(server.router, app.node) @@ -596,6 +667,8 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler)) installRelayApiHandlers(server.router, app.node, cache) + else: + notInstalledTab["relay"] = "/relay endpoints are not available. Please check your configuration: --relay" ## Filter REST API if conf.filternode != "" and @@ -606,15 +679,40 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache) let filterCache = rest_filter_api.MessageCache.init() - rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache) + + let filterDiscoHandler = + if app.wakuDiscv5.isSome(): + some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter)) + else: none(DiscoveryHandler) + + rest_filter_api.installFilterRestApiHandlers( + server.router, + app.node, + filterCache, + filterDiscoHandler, + ) + else: + notInstalledTab["filter"] = "/filter endpoints are not available. Please check your configuration: --filternode" ## Store REST API - installStoreApiHandlers(server.router, app.node) + let storeDiscoHandler = + if app.wakuDiscv5.isSome(): + some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store)) + else: none(DiscoveryHandler) + + installStoreApiHandlers(server.router, app.node, storeDiscoHandler) ## Light push API if conf.lightpushnode != "" and app.node.wakuLightpushClient != nil: - rest_lightpush_api.installLightPushRequestHandler(server.router, app.node) + let lightDiscoHandler = + if app.wakuDiscv5.isSome(): + some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush)) + else: none(DiscoveryHandler) + + rest_lightpush_api.installLightPushRequestHandler(server.router, app.node, lightDiscoHandler) + else: + notInstalledTab["lightpush"] = "/lightpush endpoints are not available. Please check your configuration: --lightpushnode" server.start() info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/" diff --git a/apps/wakunode2/external_config.nim b/apps/wakunode2/external_config.nim index 0ec5c1fb0..f18c4b159 100644 --- a/apps/wakunode2/external_config.nim +++ b/apps/wakunode2/external_config.nim @@ -92,6 +92,11 @@ type desc: "External multiaddresses to advertise to the network. Argument may be repeated." name: "ext-multiaddr" }: seq[string] + extMultiAddrsOnly* {. + desc: "Only announce external multiaddresses", + defaultValue: false, + name: "ext-multiaddr-only" }: bool + maxConnections* {. desc: "Maximum allowed number of libp2p connections." defaultValue: 50 diff --git a/apps/wakunode2/internal_config.nim b/apps/wakunode2/internal_config.nim index 06fc659bb..39826fdfe 100644 --- a/apps/wakunode2/internal_config.nim +++ b/apps/wakunode2/internal_config.nim @@ -4,15 +4,61 @@ import libp2p/crypto/crypto, libp2p/multiaddress, libp2p/nameresolving/dnsresolver, - std/options, + std/[options, sequtils], stew/results, stew/shims/net import ../../waku/common/utils/nat, ../../waku/node/config, ../../waku/waku_enr/capabilities, + ../../waku/waku_enr, + ../../waku/waku_core, ./external_config +proc enrConfiguration*(conf: WakuNodeConf, netConfig: NetConfig, key: crypto.PrivateKey): + Result[enr.Record, string] = + + var enrBuilder = EnrBuilder.init(key) + + enrBuilder.withIpAddressAndPorts( + netConfig.enrIp, + netConfig.enrPort, + netConfig.discv5UdpPort + ) + + if netConfig.wakuFlags.isSome(): + enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get()) + + enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) + + let topics = + if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0: + let shardsRes = conf.contentTopics.mapIt(getShard(it)) + for res in shardsRes: + if res.isErr(): + error "failed to shard content topic", error=res.error + return err($res.error) + + let shards = shardsRes.mapIt(it.get()) + + conf.pubsubTopics & shards + else: + conf.topics + + let addShardedTopics = enrBuilder.withShardedTopics(topics) + if addShardedTopics.isErr(): + error "failed to add sharded topics to ENR", error=addShardedTopics.error + return err($addShardedTopics.error) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create record", error=recordRes.error + return err($recordRes.error) + else: recordRes.get() + + return ok(record) + proc validateExtMultiAddrs*(vals: seq[string]): Result[seq[MultiAddress], string] = var multiaddrs: seq[MultiAddress] @@ -111,6 +157,7 @@ proc networkConfiguration*(conf: WakuNodeConf, extIp = extIp, extPort = extPort, extMultiAddrs = extMultiAddrs, + extMultiAddrsOnly = conf.extMultiAddrsOnly, wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift), wsEnabled = conf.websocketSupport, wssEnabled = conf.websocketSecureSupport, diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 30a281f33..cf05b5d67 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -87,7 +87,7 @@ when isMainModule: debug "5/7 Starting node and mounted protocols" - let res6 = waitFor wakunode2.startApp() + let res6 = wakunode2.startApp() if res6.isErr(): error "5/7 Starting node and protocols failed", error=res6.error quit(QuitFailure) diff --git a/docs/contributors/release-process.md b/docs/contributors/release-process.md index 795ab816c..ebcf47e60 100644 --- a/docs/contributors/release-process.md +++ b/docs/contributors/release-process.md @@ -71,7 +71,7 @@ Ensure all items in this list are ticked: ### After the release 1. Announce the release on Twitter, Discord and other channels. -2. Deploy the release image to [Dockerhub](https://hub.docker.com/layers/wakuorg/nwaku/a5f8b9/images/sha256-88691a8f82bd6a4242fa99053a65b7fc4762b23a2b4e879d0f8b578c798a0e09?context=explore) by triggering [the manual Jenkins deployment job](https://ci.infra.status.im/job/nim-waku/job/manual/build). +2. Deploy the release image to [Dockerhub](https://hub.docker.com/r/wakuorg/nwaku) by triggering [the manual Jenkins deployment job](https://ci.infra.status.im/job/nim-waku/job/docker-manual/). > Ensure the following build parameters are set: > - `MAKE_TARGET`: `wakunode2` > - `IMAGE_TAG`: the release tag (e.g. `v0.16.0`) @@ -87,3 +87,24 @@ Ensure all items in this list are ticked: - Deploy release to the `wakuv2.prod` fleet from [Jenkins](https://ci.infra.status.im/job/nim-waku/job/deploy-wakuv2-prod/). - Ensure that nodes successfully start up and monitor health using [Grafana](https://grafana.infra.status.im/d/qrp_ZCTGz/nim-waku-v2?orgId=1) and [Kibana](https://kibana.infra.status.im/goto/a7728e70-eb26-11ec-81d1-210eb3022c76). - If necessary, revert by deploying the previous release. Download logs and open a bug report issue. + +### Performing a patch release + +1. Cherry-pick the relevant commits from master to the release branch + + ``` + git cherry-pick + ``` + +2. Create a release-candidate tag with the same name as release and `-rc.N` suffix + +3. Update `CHANGELOG.md`. From the release branch, use the helper Make target after having cherry-picked the commits. + + ``` + make release-notes + ``` + Create a new branch and raise a PR with the changelog updates to master. + +4. Once the release-candidate has been validated and changelog PR got merged, cherry-pick the changelog update from master to the release branch. Create a final release tag and push it. + +5. Create a [Github release](https://github.com/waku-org/nwaku/releases) from the release tag and follow the same post-release process as usual. diff --git a/tests/test_waku_netconfig.nim b/tests/test_waku_netconfig.nim index 0c02afab1..713bdcefb 100644 --- a/tests/test_waku_netconfig.nim +++ b/tests/test_waku_netconfig.nim @@ -338,19 +338,63 @@ suite "Waku NetConfig": check: netConfig.enrMultiaddrs.contains(dns4TcpEndPoint(dns4DomainName, extPort)) - asyncTest "wsHostAddress is not announced if a WS address is provided in extMultiAddrs": + asyncTest "wsHostAddress is not announced if a WS/WSS address is provided in extMultiAddrs": + + var + conf = defaultTestWakuNodeConf() + extAddIp = ValidIpAddress.init("1.2.3.4") + extAddPort = Port(1234) + wsEnabled = true + wssEnabled = false + extMultiAddrs = @[(ip4TcpEndPoint(extAddIp, extAddPort) & wsFlag(wssEnabled))] + + var netConfigRes = NetConfig.init( + bindIp = conf.listenAddress, + bindPort = conf.tcpPort, + extMultiAddrs = extMultiAddrs, + wsEnabled = wsEnabled + ) + + assert netConfigRes.isOk(), $netConfigRes.error + + var netConfig = netConfigRes.get() + + check: + netConfig.announcedAddresses.len == 2 # Bind address + extAddress + netConfig.announcedAddresses[1] == extMultiAddrs[0] + + # Now same test for WSS external address + wssEnabled = true + extMultiAddrs = @[(ip4TcpEndPoint(extAddIp, extAddPort) & wsFlag(wssEnabled))] + + netConfigRes = NetConfig.init( + bindIp = conf.listenAddress, + bindPort = conf.tcpPort, + extMultiAddrs = extMultiAddrs, + wssEnabled = wssEnabled + ) + + assert netConfigRes.isOk(), $netConfigRes.error + + netConfig = netConfigRes.get() + + check: + netConfig.announcedAddresses.len == 2 # Bind address + extAddress + netConfig.announcedAddresses[1] == extMultiAddrs[0] + + asyncTest "Only extMultiAddrs are published when enabling extMultiAddrsOnly flag": let conf = defaultTestWakuNodeConf() extAddIp = ValidIpAddress.init("1.2.3.4") extAddPort = Port(1234) - wssEnabled = false - extMultiAddrs = @[(ip4TcpEndPoint(extAddIp, extAddPort) & wsFlag(wssEnabled))] + extMultiAddrs = @[ip4TcpEndPoint(extAddIp, extAddPort)] let netConfigRes = NetConfig.init( bindIp = conf.listenAddress, bindPort = conf.tcpPort, - extMultiAddrs = extMultiAddrs + extMultiAddrs = extMultiAddrs, + extMultiAddrsOnly = true ) assert netConfigRes.isOk(), $netConfigRes.error @@ -358,6 +402,6 @@ suite "Waku NetConfig": let netConfig = netConfigRes.get() check: - netConfig.announcedAddresses.len == 2 # Bind address + extAddress - netConfig.announcedAddresses[1] == extMultiAddrs[0] + netConfig.announcedAddresses.len == 1 # ExtAddress + netConfig.announcedAddresses[0] == extMultiAddrs[0] diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index fe2c735c1..f87d264f8 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -12,7 +12,9 @@ import import ../testlib/common, ../testlib/wakucore, - ../testlib/wakunode, + ../testlib/wakunode + +include ../../apps/wakunode2/app suite "Wakunode2 - App": @@ -27,7 +29,7 @@ suite "Wakunode2 - App": ## Then check: - version == app.git_version + version == git_version suite "Wakunode2 - App initialization": test "peer persistence setup should be successfully mounted": @@ -53,7 +55,7 @@ suite "Wakunode2 - App initialization": require wakunode2.setupDyamicBootstrapNodes().isOk() require wakunode2.setupWakuApp().isOk() require isOk(waitFor wakunode2.setupAndMountProtocols()) - require isOk(waitFor wakunode2.startApp()) + require isOk(wakunode2.startApp()) require wakunode2.setupMonitoringAndExternalInterfaces().isOk() ## Then @@ -67,3 +69,43 @@ suite "Wakunode2 - App initialization": ## Cleanup waitFor wakunode2.stop() + + test "app properly handles dynamic port configuration": + ## Given + var conf = defaultTestWakuNodeConf() + conf.tcpPort = Port(0) + + ## When + var wakunode2 = App.init(rng(), conf) + require wakunode2.setupPeerPersistence().isOk() + require wakunode2.setupDyamicBootstrapNodes().isOk() + require wakunode2.setupWakuApp().isOk() + require isOk(waitFor wakunode2.setupAndMountProtocols()) + require isOk(wakunode2.startApp()) + require wakunode2.setupMonitoringAndExternalInterfaces().isOk() + + ## Then + let + node = wakunode2.node + typedNodeEnr = node.enr.toTypedRecord() + typedAppEnr = wakunode2.record.toTypedRecord() + + assert typedNodeEnr.isOk(), $typedNodeEnr.error + assert typedAppEnr.isOk(), $typedAppEnr.error + + check: + # App started properly + not node.isNil() + node.wakuArchive.isNil() + node.wakuStore.isNil() + not node.wakuStoreClient.isNil() + not node.rendezvous.isNil() + + # DS structures are updated with dynamic ports + wakunode2.netConf.bindPort != Port(0) + wakunode2.netConf.enrPort.get() != Port(0) + typedNodeEnr.get().tcp.get() != 0 + typedAppEnr.get().tcp.get() != 0 + + ## Cleanup + waitFor wakunode2.stop() diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index 39f54ecae..4cc806c33 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -513,7 +513,7 @@ procSuite "Waku v2 Rest API - Store": response.status == 412 $response.contentType == $MIMETYPE_TEXT response.data.messages.len == 0 - response.data.error_message.get == "Missing known store-peer node" + response.data.error_message.get == NoPeerNoDiscError.errobj.message # Now add the storenode from "config" node.peerManager.addServicePeer(remotePeerInfo, diff --git a/vendor/nim-presto b/vendor/nim-presto index 2ae448ff5..81250a419 160000 --- a/vendor/nim-presto +++ b/vendor/nim-presto @@ -1 +1 @@ -Subproject commit 2ae448ff5b0808c8f562c6f0a70bbd7a05407a37 +Subproject commit 81250a419bc097a9e93f2ab69de60543eee07138 diff --git a/waku/node/config.nim b/waku/node/config.nim index f90aaff1d..f53d67134 100644 --- a/waku/node/config.nim +++ b/waku/node/config.nim @@ -59,7 +59,7 @@ proc formatListenAddress(inputMultiAdd: MultiAddress): MultiAddress = # If MultiAddress contains "0.0.0.0", replace it for "127.0.0.1" return MultiAddress.init(inputStr.replace("0.0.0.0", "127.0.0.1")).get() -proc isWsAddress(ma: MultiAddress): bool = +proc isWsAddress*(ma: MultiAddress): bool = let isWs = ma.contains(multiCodec("ws")).get() isWss = ma.contains(multiCodec("wss")).get() @@ -75,6 +75,7 @@ proc init*(T: type NetConfig, extIp = none(ValidIpAddress), extPort = none(Port), extMultiAddrs = newSeq[MultiAddress](), + extMultiAddrsOnly: bool = false, wsBindPort: Port = Port(8000), wsEnabled: bool = false, wssEnabled: bool = false, @@ -125,21 +126,22 @@ proc init*(T: type NetConfig, var announcedAddresses = newSeq[MultiAddress]() - if hostExtAddress.isSome(): - announcedAddresses.add(hostExtAddress.get()) - else: - announcedAddresses.add(formatListenAddress(hostAddress)) # We always have at least a bind address for the host + if not extMultiAddrsOnly: + if hostExtAddress.isSome(): + announcedAddresses.add(hostExtAddress.get()) + else: + announcedAddresses.add(formatListenAddress(hostAddress)) # We always have at least a bind address for the host + if wsExtAddress.isSome(): + announcedAddresses.add(wsExtAddress.get()) + elif wsHostAddress.isSome() and not containsWsAddress(extMultiAddrs): + # Only publish wsHostAddress if a WS address is not set in extMultiAddrs + announcedAddresses.add(wsHostAddress.get()) + # External multiaddrs that the operator may have configured if extMultiAddrs.len > 0: announcedAddresses.add(extMultiAddrs) - if wsExtAddress.isSome(): - announcedAddresses.add(wsExtAddress.get()) - elif wsHostAddress.isSome() and not containsWsAddress(extMultiAddrs): - # Only publish wsHostAddress if a WS address is not set in extMultiAddrs - announcedAddresses.add(wsHostAddress.get()) - let # enrMultiaddrs are just addresses which cannot be represented in ENR, as described in # https://rfc.vac.dev/spec/31/#many-connection-types diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 71f864cbe..f117c1b73 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1073,6 +1073,26 @@ proc mountRendezvous*(node: WakuNode) {.async, raises: [Defect, LPError].} = node.switch.mount(node.rendezvous) +proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = + let inputStr = $inputMultiAdd + if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"): + return true + + return false + +proc printNodeNetworkInfo*(node: WakuNode): void = + let peerInfo = node.switch.peerInfo + var listenStr = "" + + info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs + + for address in node.announcedAddresses: + var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" + listenStr &= fulladdr + + ## XXX: this should be /ip4..., / stripped? + info "Listening on", full = listenStr + info "DNS: discoverable ENR ", enr = node.enr.toUri() proc start*(node: WakuNode) {.async.} = ## Starts a created Waku Node and @@ -1081,16 +1101,15 @@ proc start*(node: WakuNode) {.async.} = waku_version.set(1, labelValues=[git_version]) info "Starting Waku node", version=git_version - let peerInfo = node.switch.peerInfo - info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs - var listenStr = "" + var zeroPortPresent = false for address in node.announcedAddresses: - var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" - listenStr &= fulladdr + if isBindIpWithZeroPort(address): + zeroPortPresent = true - ## XXX: this should be /ip4..., / stripped? - info "Listening on", full = listenStr - info "DNS: discoverable ENR ", enr = node.enr.toUri() + if not zeroPortPresent: + printNodeNetworkInfo(node) + else: + info "Listening port is dynamically allocated, address and ENR generation postponed" # Perform relay-specific startup tasks TODO: this should be rethought if not node.wakuRelay.isNil(): diff --git a/waku/waku_api/cache_handlers.nim b/waku/waku_api/cache_handlers.nim deleted file mode 100644 index c37850576..000000000 --- a/waku/waku_api/cache_handlers.nim +++ /dev/null @@ -1,23 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - chronos, - chronicles -import - ../waku_relay, - ../waku_core, - ./message_cache - -##### Message handler - -proc messageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler = - return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} = - cache.addMessage(PubSubTopic(pubsubTopic), msg) - -proc autoMessageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler = - return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} = - if cache.isSubscribed(msg.contentTopic): - cache.addMessage(msg.contentTopic, msg) \ No newline at end of file diff --git a/waku/waku_api/handlers.nim b/waku/waku_api/handlers.nim new file mode 100644 index 000000000..075c5959b --- /dev/null +++ b/waku/waku_api/handlers.nim @@ -0,0 +1,50 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + chronos, + chronicles, + std/[options, sequtils], + stew/results +import + ../waku_discv5, + ../waku_relay, + ../waku_core, + ./message_cache + +### Discovery + +type DiscoveryHandler* = proc(): Future[Result[Option[RemotePeerInfo], string]] {.async, closure.} + +proc defaultDiscoveryHandler*(discv5: WakuDiscoveryV5, cap: Capabilities): DiscoveryHandler = + proc(): Future[Result[Option[RemotePeerInfo], string]] {.async, closure.} = + #Discv5 is already filtering peers by shards no need to pass a predicate. + let findPeers = discv5.findRandomPeers() + + if not await findPeers.withTimeout(60.seconds): + return err("discovery process timed out!") + + var peers = findPeers.read() + + peers.keepItIf(it.supportsCapability(cap)) + + if peers.len == 0: + return ok(none(RemotePeerInfo)) + + let remotePeerInfo = peers[0].toRemotePeerInfo().valueOr: + return err($error) + + return ok(some(remotePeerInfo)) + +### Message Cache + +proc messageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler = + return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} = + cache.addMessage(PubSubTopic(pubsubTopic), msg) + +proc autoMessageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler = + return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} = + if cache.isSubscribed(msg.contentTopic): + cache.addMessage(msg.contentTopic, msg) \ No newline at end of file diff --git a/waku/waku_api/jsonrpc/relay/handlers.nim b/waku/waku_api/jsonrpc/relay/handlers.nim index c52369abf..6801a69de 100644 --- a/waku/waku_api/jsonrpc/relay/handlers.nim +++ b/waku/waku_api/jsonrpc/relay/handlers.nim @@ -17,7 +17,7 @@ import ../../../waku_rln_relay/rln/wrappers, ../../../waku_node, ../../message_cache, - ../../cache_handlers, + ../../handlers, ../message from std/times import getTime diff --git a/waku/waku_api/rest/filter/handlers.nim b/waku/waku_api/rest/filter/handlers.nim index 32527f386..a3dda79c1 100644 --- a/waku/waku_api/rest/filter/handlers.nim +++ b/waku/waku_api/rest/filter/handlers.nim @@ -21,6 +21,7 @@ import ../../../waku_filter_v2/client as filter_protocol_client, ../../../waku_filter_v2/common as filter_protocol_type, ../../message_cache, + ../../handlers, ../serdes, ../responses, ./types @@ -145,11 +146,18 @@ proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type return resp.get() -proc filterPostPutSubscriptionRequestHandler(node: WakuNode, - contentBody: Option[ContentBody], - cache: MessageCache): - Future[RestApiResponse] - {.async.} = +const NoPeerNoDiscoError = FilterSubscribeError.serviceUnavailable( + "No suitable service peer & no discovery method") + +const NoPeerNoneFoundError = FilterSubscribeError.serviceUnavailable( + "No suitable service peer & none discovered") + +proc filterPostPutSubscriptionRequestHandler( + node: WakuNode, + contentBody: Option[ContentBody], + cache: MessageCache, + discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler), + ): Future[RestApiResponse] {.async.} = ## handles any filter subscription requests, adds or modifies. let decodedBody = decodeRequestBody[FilterSubscribeRequest](contentBody) @@ -159,14 +167,17 @@ proc filterPostPutSubscriptionRequestHandler(node: WakuNode, let req: FilterSubscribeRequest = decodedBody.value() - let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + let handler = discHandler.valueOr: + return makeRestResponse(req.requestId, NoPeerNoDiscoError) - if peerOpt.isNone(): - return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("No suitable peers")) + let peerOp = (await handler()).valueOr: + return RestApiResponse.internalServerError($error) - let subFut = node.filterSubscribe(req.pubsubTopic, - req.contentFilters, - peerOpt.get()) + peerOp.valueOr: + return makeRestResponse(req.requestId, NoPeerNoneFoundError) + + let subFut = node.filterSubscribe(req.pubsubTopic, req.contentFilters, peer) if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): error "Failed to subscribe to contentFilters do to timeout!" @@ -178,29 +189,36 @@ proc filterPostPutSubscriptionRequestHandler(node: WakuNode, return makeRestResponse(req.requestId, subFut.read()) -proc installFilterPostSubscriptionsHandler(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = +proc installFilterPostSubscriptionsHandler( + router: var RestRouter, + node: WakuNode, + cache: MessageCache, + discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler), + ) = router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: ## Subscribes a node to a list of contentTopics of a pubsubTopic debug "post", ROUTE_FILTER_SUBSCRIPTIONS, contentBody - let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache) - return response + return await filterPostPutSubscriptionRequestHandler(node, contentBody, cache, discHandler) -proc installFilterPutSubscriptionsHandler(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = +proc installFilterPutSubscriptionsHandler( + router: var RestRouter, + node: WakuNode, + cache: MessageCache, + discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler), + ) = router.api(MethodPut, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: ## Modifies a subscribtion of a node to a list of contentTopics of a pubsubTopic debug "put", ROUTE_FILTER_SUBSCRIPTIONS, contentBody - let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache) - return response + return await filterPostPutSubscriptionRequestHandler(node, contentBody, cache, discHandler) -proc installFilterDeleteSubscriptionsHandler(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = +proc installFilterDeleteSubscriptionsHandler( + router: var RestRouter, + node: WakuNode, + cache: MessageCache, + discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler), + ) = router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: ## Subscribes a node to a list of contentTopics of a PubSub topic debug "delete", ROUTE_FILTER_SUBSCRIPTIONS, contentBody @@ -213,13 +231,18 @@ proc installFilterDeleteSubscriptionsHandler(router: var RestRouter, let req: FilterUnsubscribeRequest = decodedBody.value() - let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + let handler = discHandler.valueOr: + return makeRestResponse(req.requestId, NoPeerNoDiscoError) - if peerOpt.isNone(): - return makeRestResponse(req.requestId, - FilterSubscribeError.serviceUnavailable("No suitable peers")) + let peerOp = (await handler()).valueOr: + return RestApiResponse.internalServerError($error) + + peerOp.valueOr: + return makeRestResponse(req.requestId, NoPeerNoneFoundError) + + let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peer) - let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get()) if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): error "Failed to unsubscribe from contentFilters due to timeout!" return makeRestResponse(req.requestId, @@ -233,9 +256,12 @@ proc installFilterDeleteSubscriptionsHandler(router: var RestRouter, # Successfully unsubscribed from all requested contentTopics return makeRestResponse(req.requestId, unsubFut.read()) -proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = +proc installFilterDeleteAllSubscriptionsHandler( + router: var RestRouter, + node: WakuNode, + cache: MessageCache, + discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler), + ) = router.api(MethodDelete, ROUTE_FILTER_ALL_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: ## Subscribes a node to a list of contentTopics of a PubSub topic debug "delete", ROUTE_FILTER_ALL_SUBSCRIPTIONS, contentBody @@ -248,13 +274,18 @@ proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter, let req: FilterUnsubscribeAllRequest = decodedBody.value() - let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + let handler = discHandler.valueOr: + return makeRestResponse(req.requestId, NoPeerNoDiscoError) - if peerOpt.isNone(): - return makeRestResponse(req.requestId, - FilterSubscribeError.serviceUnavailable("No suitable peers")) + let peerOp = (await handler()).valueOr: + return RestApiResponse.internalServerError($error) - let unsubFut = node.filterUnsubscribeAll(peerOpt.get()) + peerOp.valueOr: + return makeRestResponse(req.requestId, NoPeerNoneFoundError) + + let unsubFut = node.filterUnsubscribeAll(peer) + if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): error "Failed to unsubscribe from contentFilters due to timeout!" return makeRestResponse(req.requestId, @@ -268,18 +299,26 @@ proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter, const ROUTE_FILTER_SUBSCRIBER_PING* = "/filter/v2/subscriptions/{requestId}" -proc installFilterPingSubscriberHandler(router: var RestRouter, - node: WakuNode) = +proc installFilterPingSubscriberHandler( + router: var RestRouter, + node: WakuNode, + discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler), + ) = router.api(MethodGet, ROUTE_FILTER_SUBSCRIBER_PING) do (requestId: string) -> RestApiResponse: ## Checks if a node has valid subscription or not. debug "get", ROUTE_FILTER_SUBSCRIBER_PING, requestId - let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) - if peerOpt.isNone(): - return makeRestResponse(requestId.get(), - FilterSubscribeError.serviceUnavailable("No suitable remote filter peers")) + let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + let handler = discHandler.valueOr: + return makeRestResponse(requestId.get(), NoPeerNoDiscoError) - let pingFutRes = node.wakuFilterClient.ping(peerOpt.get()) + let peerOp = (await handler()).valueOr: + return RestApiResponse.internalServerError($error) + + peerOp.valueOr: + return makeRestResponse(requestId.get(), NoPeerNoneFoundError) + + let pingFutRes = node.wakuFilterClient.ping(peer) if not await pingFutRes.withTimeout(futTimeoutForSubscriptionProcessing): error "Failed to ping filter service peer due to timeout!" @@ -325,12 +364,15 @@ proc installFilterGetMessagesHandler(router: var RestRouter, return resp.get() -proc installFilterRestApiHandlers*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - installFilterPingSubscriberHandler(router, node) - installFilterPostSubscriptionsHandler(router, node, cache) - installFilterPutSubscriptionsHandler(router, node, cache) - installFilterDeleteSubscriptionsHandler(router, node, cache) - installFilterDeleteAllSubscriptionsHandler(router, node, cache) +proc installFilterRestApiHandlers*( + router: var RestRouter, + node: WakuNode, + cache: MessageCache, + discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler), + ) = + installFilterPingSubscriberHandler(router, node, discHandler) + installFilterPostSubscriptionsHandler(router, node, cache, discHandler) + installFilterPutSubscriptionsHandler(router, node, cache, discHandler) + installFilterDeleteSubscriptionsHandler(router, node, cache, discHandler) + installFilterDeleteAllSubscriptionsHandler(router, node, cache, discHandler) installFilterGetMessagesHandler(router, node, cache) diff --git a/waku/waku_api/rest/lightpush/handlers.nim b/waku/waku_api/rest/lightpush/handlers.nim index 7bbf22f6d..c7da0ebe3 100644 --- a/waku/waku_api/rest/lightpush/handlers.nim +++ b/waku/waku_api/rest/lightpush/handlers.nim @@ -18,6 +18,7 @@ import ../../waku/node/peer_manager, ../../../waku_node, ../../waku/waku_lightpush, + ../../handlers, ../serdes, ../responses, ./types @@ -29,6 +30,12 @@ logScope: const futTimeoutForPushRequestProcessing* = 5.seconds +const NoPeerNoDiscoError = RestApiResponse.serviceUnavailable( + "No suitable service peer & no discovery method") + +const NoPeerNoneFoundError = RestApiResponse.serviceUnavailable( + "No suitable service peer & none discovered") + #### Request handlers const ROUTE_LIGHTPUSH* = "/lightpush/v1/message" @@ -50,8 +57,11 @@ func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiR return ok(requestResult.get()) -proc installLightPushRequestHandler*(router: var RestRouter, - node: WakuNode) = +proc installLightPushRequestHandler*( + router: var RestRouter, + node: WakuNode, + discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler), + ) = router.api(MethodPost, ROUTE_LIGHTPUSH) do (contentBody: Option[ContentBody]) -> RestApiResponse: ## Send a request to push a waku message @@ -63,24 +73,29 @@ proc installLightPushRequestHandler*(router: var RestRouter, return decodedBody.error() let req: PushRequest = decodedBody.value() - let msg = req.message.toWakuMessage() - - if msg.isErr(): + + let msg = req.message.toWakuMessage().valueOr: return RestApiResponse.badRequest("Invalid message: {msg.error}") - let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec) - if peerOpt.isNone(): - return RestApiResponse.serviceUnavailable("No suitable remote lightpush peers") + let peer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr: + let handler = discHandler.valueOr: + return NoPeerNoDiscoError - let subFut = node.lightpushPublish(req.pubsubTopic, - msg.value(), - peerOpt.get()) + let peerOp = (await handler()).valueOr: + return RestApiResponse.internalServerError($error) + + peerOp.valueOr: + return NoPeerNoneFoundError + + let subFut = node.lightpushPublish(req.pubsubTopic, msg, peer) if not await subFut.withTimeout(futTimeoutForPushRequestProcessing): error "Failed to request a message push due to timeout!" return RestApiResponse.serviceUnavailable("Push request timed out") if subFut.value().isErr(): - return RestApiResponse.serviceUnavailable(fmt("Failed to request a message push: {subFut.value().error}")) + return RestApiResponse.serviceUnavailable( + fmt("Failed to request a message push: {subFut.value().error}") + ) return RestApiResponse.ok() diff --git a/waku/waku_api/rest/relay/handlers.nim b/waku/waku_api/rest/relay/handlers.nim index 72631a227..9713b15e7 100644 --- a/waku/waku_api/rest/relay/handlers.nim +++ b/waku/waku_api/rest/relay/handlers.nim @@ -17,7 +17,7 @@ import ../../../waku_rln_relay, ../../../node/waku_node, ../../message_cache, - ../../cache_handlers, + ../../handlers, ../serdes, ../responses, ./types diff --git a/waku/waku_api/rest/server.nim b/waku/waku_api/rest/server.nim index 430472b68..f25f29ce9 100644 --- a/waku/waku_api/rest/server.nim +++ b/waku/waku_api/rest/server.nim @@ -59,7 +59,8 @@ proc getRouter(allowedOrigin: Option[string]): RestRouter = proc init*(T: type RestServerRef, ip: ValidIpAddress, port: Port, allowedOrigin=none(string), - conf=RestServerConf.default()): RestServerResult[T] = + conf=RestServerConf.default(), + requestErrorHandler: RestRequestErrorHandler = nil): RestServerResult[T] = let address = initTAddress(ip, port) let serverFlags = { HttpServerFlags.QueryCommaSeparatedArray, @@ -82,7 +83,8 @@ proc init*(T: type RestServerRef, serverFlags = serverFlags, httpHeadersTimeout = headersTimeout, maxHeadersSize = maxHeadersSize, - maxRequestBodySize = maxRequestBodySize + maxRequestBodySize = maxRequestBodySize, + requestErrorHandler = requestErrorHandler ) except CatchableError: return err(getCurrentExceptionMsg()) @@ -92,5 +94,7 @@ proc init*(T: type RestServerRef, proc newRestHttpServer*(ip: ValidIpAddress, port: Port, allowedOrigin=none(string), - conf=RestServerConf.default()): RestServerResult[RestServerRef] = - RestServerRef.init(ip, port, allowedOrigin, conf) + conf=RestServerConf.default(), + requestErrorHandler: RestRequestErrorHandler = nil): + RestServerResult[RestServerRef] = + RestServerRef.init(ip, port, allowedOrigin, conf, requestErrorHandler) diff --git a/waku/waku_api/rest/store/handlers.nim b/waku/waku_api/rest/store/handlers.nim index 220f26c53..66c053233 100644 --- a/waku/waku_api/rest/store/handlers.nim +++ b/waku/waku_api/rest/store/handlers.nim @@ -15,6 +15,7 @@ import ../../../waku_store/common, ../../../waku_node, ../../../node/peer_manager, + ../../handlers, ../responses, ../serdes, ./types @@ -26,6 +27,9 @@ logScope: const futTimeout* = 5.seconds # Max time to wait for futures +const NoPeerNoDiscError* = RestApiResponse.preconditionFailed( + "No suitable service peer & no discovery method") + # Queries the store-node with the query parameters and # returns a RestApiResponse that is sent back to the api client. proc performHistoryQuery(selfNode: WakuNode, @@ -182,10 +186,12 @@ proc toOpt(self: Option[Result[string, cstring]]): Option[string] = if self.isSome() and self.get().value != "": return some(self.get().value) - # Subscribes the rest handler to attend "/store/v1/messages" requests -proc installStoreV1Handler(router: var RestRouter, - node: WakuNode) = +proc installStoreApiHandlers*( + router: var RestRouter, + node: WakuNode, + discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler), + ) = # Handles the store-query request according to the passed parameters router.api(MethodGet, @@ -209,18 +215,20 @@ proc installStoreV1Handler(router: var RestRouter, # /store/v1/messages?peerAddr=%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\&pubsubTopic=my-waku-topic # Parse the peer address parameter - var parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt()) - if not parsedPeerAddr.isOk(): - return RestApiResponse.badRequest(parsedPeerAddr.error) + let parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt()).valueOr: + return RestApiResponse.badRequest(error) - var peerOpt = none(RemotePeerInfo) - if parsedPeerAddr.value.isSome(): - peerOpt = parsedPeerAddr.value - else: - # The user didn't specify any store peer address. - peerOpt = node.peerManager.selectPeer(WakuStoreCodec) - if peerOpt.isNone(): - return RestApiResponse.preconditionFailed("Missing known store-peer node") + let peerAddr = parsedPeerAddr.valueOr: + node.peerManager.selectPeer(WakuStoreCodec).valueOr: + let handler = discHandler.valueOr: + return NoPeerNoDiscError + + let peerOp = (await handler()).valueOr: + return RestApiResponse.internalServerError($error) + + peerOp.valueOr: + return RestApiResponse.preconditionFailed( + "No suitable service peer & none discovered") # Parse the rest of the parameters and create a HistoryQuery let histQuery = createHistoryQuery( @@ -238,10 +246,4 @@ proc installStoreV1Handler(router: var RestRouter, if not histQuery.isOk(): return RestApiResponse.badRequest(histQuery.error) - return await node.performHistoryQuery(histQuery.value, - peerOpt.get()) - -# Registers the Api Handlers -proc installStoreApiHandlers*(router: var RestRouter, - node: WakuNode) = - installStoreV1Handler(router, node) + return await node.performHistoryQuery(histQuery.value, peerAddr) \ No newline at end of file