From 70c3afb4a7494137edd8afb1dfff5deb6bb67895 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 21 Nov 2025 16:11:58 +0100 Subject: [PATCH 1/8] Introduce api/send Added events and requests for support. Reworked delivery_monitor into a featured devlivery_service, that - supports relay publish and lightpush depending on configuration but with fallback options - if available and configured it utilizes store api to confirm message delivery - emits message delivery events accordingly Notice: There are parts still in WIP and needs review and follow ups. prepare for use in api_example --- Makefile | 6 +- .../api_example.nim} | 20 +- library/waku_context.nim | 3 +- vendor/nim-libp2p | 2 +- waku.nim | 6 +- waku.nimble | 4 + waku/api/api.nim | 48 +++- waku/api/api_conf.nim | 5 +- waku/api/request_id.nim | 13 + waku/api/send_api.md | 46 ++++ waku/api/subscribe/subscribe.nim | 12 + waku/api/types.nim | 64 +++++ waku/events/delivery_events.nim | 27 ++ waku/events/events.nim | 3 + waku/events/message_events.nim | 23 ++ waku/factory/waku.nim | 37 +-- .../delivery_monitor/delivery_callback.nim | 17 -- .../delivery_monitor/delivery_monitor.nim | 43 ---- .../delivery_monitor/publish_observer.nim | 9 - waku/node/delivery_monitor/send_monitor.nim | 212 ---------------- .../subscriptions_observer.nim | 13 - .../delivery_service/delivery_service.nim | 36 +++ .../not_delivered_storage/migrations.nim | 2 +- .../not_delivered_storage.nim | 8 +- waku/node/delivery_service/recv_service.nim | 3 + .../recv_service/recv_service.nim} | 96 +++---- waku/node/delivery_service/send_service.nim | 6 + .../send_service/delivery_task.nim | 66 +++++ .../send_service/lightpush_processor.nim | 74 ++++++ .../send_service/relay_processor.nim | 67 +++++ .../send_service/send_processor.nim | 32 +++ .../send_service/send_service.nim | 238 ++++++++++++++++++ .../health_monitor}/topic_health.nim | 4 +- waku/node/kernel_api/lightpush.nim | 33 +-- waku/node/kernel_api/relay.nim | 14 +- waku/node/waku_node.nim | 97 ++++--- waku/requests/health_request.nim | 23 ++ waku/requests/node_requests.nim | 11 + waku/requests/requests.nim | 3 + waku/requests/rln_requests.nim | 9 + waku/waku_core/event/event_emitter.nim | 20 ++ waku/waku_core/message/digest.nim | 5 + waku/waku_filter_v2/client.nim | 14 +- waku/waku_lightpush/callbacks.nim | 4 +- waku/waku_lightpush/client.nim | 86 ++++--- waku/waku_lightpush/common.nim | 4 +- waku/waku_lightpush/protocol.nim | 2 +- waku/waku_lightpush_legacy/callbacks.nim | 4 +- waku/waku_lightpush_legacy/client.nim | 11 - waku/waku_lightpush_legacy/common.nim | 2 +- waku/waku_lightpush_legacy/protocol.nim | 2 +- waku/waku_relay.nim | 3 +- waku/waku_relay/protocol.nim | 32 ++- waku/waku_rln_relay/rln_relay.nim | 43 +++- 54 files changed, 1152 insertions(+), 515 deletions(-) rename examples/{waku_example.nim => api_example/api_example.nim} (63%) create mode 100644 waku/api/request_id.nim create mode 100644 waku/api/send_api.md create mode 100644 waku/api/subscribe/subscribe.nim create mode 100644 waku/api/types.nim create mode 100644 waku/events/delivery_events.nim create mode 100644 waku/events/events.nim create mode 100644 waku/events/message_events.nim delete mode 100644 waku/node/delivery_monitor/delivery_callback.nim delete mode 100644 waku/node/delivery_monitor/delivery_monitor.nim delete mode 100644 waku/node/delivery_monitor/publish_observer.nim delete mode 100644 waku/node/delivery_monitor/send_monitor.nim delete mode 100644 waku/node/delivery_monitor/subscriptions_observer.nim create mode 100644 waku/node/delivery_service/delivery_service.nim rename waku/node/{delivery_monitor => delivery_service}/not_delivered_storage/migrations.nim (95%) rename waku/node/{delivery_monitor => delivery_service}/not_delivered_storage/not_delivered_storage.nim (93%) create mode 100644 waku/node/delivery_service/recv_service.nim rename waku/node/{delivery_monitor/recv_monitor.nim => delivery_service/recv_service/recv_service.nim} (72%) create mode 100644 waku/node/delivery_service/send_service.nim create mode 100644 waku/node/delivery_service/send_service/delivery_task.nim create mode 100644 waku/node/delivery_service/send_service/lightpush_processor.nim create mode 100644 waku/node/delivery_service/send_service/relay_processor.nim create mode 100644 waku/node/delivery_service/send_service/send_processor.nim create mode 100644 waku/node/delivery_service/send_service/send_service.nim rename waku/{waku_relay => node/health_monitor}/topic_health.nim (84%) create mode 100644 waku/requests/health_request.nim create mode 100644 waku/requests/node_requests.nim create mode 100644 waku/requests/requests.nim create mode 100644 waku/requests/rln_requests.nim create mode 100644 waku/waku_core/event/event_emitter.nim diff --git a/Makefile b/Makefile index 2f15ccd71..dd970f278 100644 --- a/Makefile +++ b/Makefile @@ -147,7 +147,7 @@ NIM_PARAMS := $(NIM_PARAMS) -d:disable_libbacktrace endif # enable experimental exit is dest feature in libp2p mix -NIM_PARAMS := $(NIM_PARAMS) -d:libp2p_mix_experimental_exit_is_dest +NIM_PARAMS := $(NIM_PARAMS) -d:libp2p_mix_experimental_exit_is_dest libbacktrace: + $(MAKE) -C vendor/nim-libbacktrace --no-print-directory BUILD_CXX_LIB=0 @@ -266,6 +266,10 @@ lightpushwithmix: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim lightpushwithmix $(NIM_PARAMS) waku.nims +api_example: | build deps librln + echo -e $(BUILD_MSG) "build/$@" && \ + $(ENV_SCRIPT) nim api_example $(NIM_PARAMS) waku.nims + build/%: | build deps librln echo -e $(BUILD_MSG) "build/$*" && \ $(ENV_SCRIPT) nim buildone $(NIM_PARAMS) waku.nims $* diff --git a/examples/waku_example.nim b/examples/api_example/api_example.nim similarity index 63% rename from examples/waku_example.nim rename to examples/api_example/api_example.nim index ebac0b466..f2524f319 100644 --- a/examples/waku_example.nim +++ b/examples/api_example/api_example.nim @@ -7,6 +7,22 @@ type CliArgs = object defaultValue: "", desc: "ETH RPC Endpoint, if passed, RLN is enabled" .}: string +proc periodicSender(w: Waku): Future[void] {.async.} = + ## Periodically sends a Waku message every 30 seconds + var counter = 0 + while true: + let envelope = MessageEnvelope.init( + contentTopic = "example/content/topic", + payload = "Hello Waku! Message number: " & $counter, + ) + + let sendRequestId = (await w.send(envelope)).valueOr: + echo "Failed to send message: ", error + quit(QuitFailure) + + counter += 1 + await sleepAsync(30.seconds) + when isMainModule: let args = CliArgs.load() @@ -21,7 +37,7 @@ when isMainModule: ) else: # Connect to TWN, use ETH RPC Endpoint for RLN - NodeConfig.init(ethRpcEndpoints = @[args.ethRpcEndpoint]) + NodeConfig.init(mode = WakuMode.Core, ethRpcEndpoints = @[args.ethRpcEndpoint]) # Create the node using the library API's createNode function let node = (waitFor createNode(config)).valueOr: @@ -37,4 +53,6 @@ when isMainModule: echo "Node started successfully!" + asyncSpawn periodicSender(node) + runForever() diff --git a/library/waku_context.nim b/library/waku_context.nim index ab4b996af..15280b4cf 100644 --- a/library/waku_context.nim +++ b/library/waku_context.nim @@ -8,7 +8,8 @@ import waku/common/logging, waku/factory/waku, waku/node/peer_manager, - waku/waku_relay/[protocol, topic_health], + waku/waku_relay/protocol, + waku/node/health_monitor/topic_health, waku/waku_core/[topics/pubsub_topic, message], ./waku_thread_requests/[waku_thread_request, requests/debug_node_request], ./ffi_types, diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index e82080f7b..0309685cd 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit e82080f7b1aa61c6d35fa5311b873f41eff4bb52 +Subproject commit 0309685cd27d4bf763c8b3be86a76c33bcfe67ea diff --git a/waku.nim b/waku.nim index 18d52741e..12e69fdaf 100644 --- a/waku.nim +++ b/waku.nim @@ -1,10 +1,10 @@ ## Main module for using nwaku as a Nimble library -## +## ## This module re-exports the public API for creating and managing Waku nodes ## when using nwaku as a library dependency. -import waku/api/[api, api_conf] -export api, api_conf +import waku/api/[api, api_conf, types] +export api, api_conf, types import waku/factory/waku export waku diff --git a/waku.nimble b/waku.nimble index 79fdd9fd6..2ec38370b 100644 --- a/waku.nimble +++ b/waku.nimble @@ -177,6 +177,10 @@ task lightpushwithmix, "Build lightpushwithmix": let name = "lightpush_publisher_mix" buildBinary name, "examples/lightpush_mix/" +task api_example, "Build api_example": + let name = "api_example" + buildBinary name, "examples/api_example/" + task buildone, "Build custom target": let filepath = paramStr(paramCount()) discard buildModule filepath diff --git a/waku/api/api.nim b/waku/api/api.nim index 5bab06188..020cd6c6d 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -1,8 +1,12 @@ import chronicles, chronos, results import waku/factory/waku +import waku/[requests/health_request, waku_core, waku_node] +import waku/node/delivery_service/send_service +import ./[api_conf, types], ./subscribe/subscribe -import ./api_conf +logScope: + topics = "api" # TODO: Specs says it should return a `WakuNode`. As `send` and other APIs are defined, we can align. proc createNode*(config: NodeConfig): Future[Result[Waku, string]] {.async.} = @@ -15,3 +19,45 @@ proc createNode*(config: NodeConfig): Future[Result[Waku, string]] {.async.} = return err("Failed setting up Waku: " & $error) return ok(wakuRes) + +proc checkApiAvailability(w: Waku): Result[void, string] = + if w.isNil(): + return err("Waku node is not initialized") + + # check if health is satisfactory + # If Node is not healthy, return err("Waku node is not healthy") + let healthStatus = waitFor RequestNodeHealth.request() + + if healthStatus.isErr(): + warn "Failed to get Waku node health status: ", error = healthStatus.error + # Let's suppose the node is hesalthy enough, go ahead + else: + if healthStatus.get().healthStatus != NodeHealth.Unhealthy: + return err("Waku node is not healthy, has got no connections.") + + return ok() + +proc subscribe*( + w: Waku, contentTopic: ContentTopic +): Future[Result[RequestId, string]] {.async.} = + ?checkApiAvailability(w) + + let requestId = newRequestId(w.rng) + + asyncSpawn w.subscribeImpl(requestId, contentTopic) + + return ok(requestId) + +proc send*( + w: Waku, envelope: MessageEnvelope +): Future[Result[RequestId, string]] {.async.} = + ?checkApiAvailability(w) + + let requestId = newRequestId(w.rng) + + let deliveryTask = DeliveryTask.create(requestId, envelope).valueOr: + return err("Failed to create delivery task: " & error) + + asyncSpawn w.deliveryService.sendService.send(deliveryTask) + + return ok(requestId) diff --git a/waku/api/api_conf.nim b/waku/api/api_conf.nim index 155554dfd..692ada821 100644 --- a/waku/api/api_conf.nim +++ b/waku/api/api_conf.nim @@ -131,8 +131,9 @@ proc toWakuConf*(nodeConfig: NodeConfig): Result[WakuConf, string] = b.rateLimitConf.withRateLimits(@["filter:100/1s", "lightpush:5/1s", "px:5/1s"]) of Edge: - return err("Edge mode is not implemented") - + # All client side protocols are mounted by default + # Peer exchange client is always enabled and start_node will start the px loop + discard ## Network Conf let protocolsConfig = nodeConfig.protocolsConfig diff --git a/waku/api/request_id.nim b/waku/api/request_id.nim new file mode 100644 index 000000000..fab4ccfbf --- /dev/null +++ b/waku/api/request_id.nim @@ -0,0 +1,13 @@ +{.push raises: [].} + +import bearssl/rand + +import waku/utils/requests as request_utils + +import ./types + +proc newRequestId*(rng: ref HmacDrbgContext): RequestId = + ## Generate a new RequestId using the provided RNG. + RequestId(request_utils.generateRequestId(rng)) + +{.pop.} diff --git a/waku/api/send_api.md b/waku/api/send_api.md new file mode 100644 index 000000000..69f55dc9e --- /dev/null +++ b/waku/api/send_api.md @@ -0,0 +1,46 @@ +# SEND API + +**THIS IS TO BE REMOVED BEFORE PR MERGE** + +This document collects logic and todo's around the Send API. + +## Overview + +Send api hides the complex logic of using raw protocols for reliable message delivery. +The delivery method is chosen based on the node configuration and actual availabilites of peers. + +## Delivery task + +Each message send request is boundled into a task that not just holds the composed message but also the state of the delivery. + +## Delivery methods + +Depending on the configuration and the availability of store client protocol + actual configured and/or discovered store nodes: +- P2PReliability validation - checking network store node wheather the message is reached at least a store node. +- Simple retry until message is propagated to the network + - Relay says >0 peers as publish result + - LightushClient returns with success + +Depending on node config: +- Relay +- Lightpush + +These methods are used in combination to achieve the best reliability. +Fallback mechanism is used to switch between methods if the current one fails. + +Relay+StoreCheck -> Relay+simeple retry -> Lightpush+StoreCheck -> Lightpush simple retry -> Error + +Combination is dynamicly chosen on node configuration. Levels can be skipped depending on actual connectivity. +Actual connectivity is checked: +- Relay's topic health check - at least dLow peers in the mesh for the topic +- Store nodes availability - at least one store service node is available in peer manager +- Lightpush client availability - at least one lightpush service node is available in peer manager + +## Delivery processing + +At every send request, each task is tried to be delivered right away. +Any further retries and store check is done as a background task in a loop with predefined intervals. +Each task is set for a maximum number of retries and/or maximum time to live. + +In each round of store check and retry send tasks are selected based on their state. +The state is updated based on the result of the delivery method. diff --git a/waku/api/subscribe/subscribe.nim b/waku/api/subscribe/subscribe.nim new file mode 100644 index 000000000..9283936cf --- /dev/null +++ b/waku/api/subscribe/subscribe.nim @@ -0,0 +1,12 @@ +# import chronicles, chronos, results +import chronos +import waku/waku_core +import waku/api/types +import waku/factory/waku + +proc subscribeImpl*( + w: Waku, requestId: RequestId, contentTopic: ContentTopic +): Future[void] {.async.} = + ## Implementation of the subscribe API + ## This is a placeholder implementation + await sleepAsync(1000) # Simulate async work diff --git a/waku/api/types.nim b/waku/api/types.nim new file mode 100644 index 000000000..c42718d1b --- /dev/null +++ b/waku/api/types.nim @@ -0,0 +1,64 @@ +{.push raises: [].} + +import bearssl/rand, std/times, chronos, chronicles +import stew/byteutils +import waku/utils/requests as request_utils +import waku/waku_core/[topics/content_topic, message/message, time] +import waku/requests/requests + +logScope: + topics = "message envelope" + +type + MessageEnvelope* = object + contentTopic*: ContentTopic + payload*: seq[byte] + ephemeral*: bool + + RequestId* = distinct string + + NodeHealth* {.pure.} = enum + Healthy + MinimallyHealthy + Unhealthy + +proc newRequestId*(rng: ref HmacDrbgContext): RequestId = + ## Generate a new RequestId using the provided RNG. + RequestId(request_utils.generateRequestId(rng)) + +proc `$`*(r: RequestId): string {.inline.} = + string(r) + +proc init*( + T: type MessageEnvelope, + contentTopic: ContentTopic, + payload: seq[byte] | string, + ephemeral: bool = false, +): MessageEnvelope = + when payload is seq[byte]: + MessageEnvelope(contentTopic: contentTopic, payload: payload, ephemeral: ephemeral) + else: + MessageEnvelope( + contentTopic: contentTopic, payload: payload.toBytes(), ephemeral: ephemeral + ) + +proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = + ## Convert a MessageEnvelope to a WakuMessage. + var wm = WakuMessage( + contentTopic: envelope.contentTopic, + payload: envelope.payload, + ephemeral: envelope.ephemeral, + timestamp: getNanosecondTime(getTime().toUnixFloat()), + ) + + # TODO: First find out if proof is needed at all + let requestedProof = ( + waitFor RequestGenerateRlnProof.request(wm, getTime().toUnixFloat()) + ).valueOr: + warn "Failed to add RLN proof to WakuMessage: ", error = error + return wm + + wm.proof = requestedProof.proof + return wm + +{.pop.} diff --git a/waku/events/delivery_events.nim b/waku/events/delivery_events.nim new file mode 100644 index 000000000..f8eb0f48d --- /dev/null +++ b/waku/events/delivery_events.nim @@ -0,0 +1,27 @@ +import waku/waku_core/[message/message, message/digest], waku/common/broker/event_broker + +type DeliveryDirection* {.pure.} = enum + PUBLISHING + RECEIVING + +type DeliverySuccess* {.pure.} = enum + SUCCESSFUL + UNSUCCESSFUL + +EventBroker: + type DeliveryFeedbackEvent* = ref object + success*: DeliverySuccess + dir*: DeliveryDirection + comment*: string + msgHash*: WakuMessageHash + msg*: WakuMessage + +EventBroker: + type OnFilterSubscribeEvent* = object + pubsubTopic*: string + contentTopics*: seq[string] + +EventBroker: + type OnFilterUnSubscribeEvent* = object + pubsubTopic*: string + contentTopics*: seq[string] diff --git a/waku/events/events.nim b/waku/events/events.nim new file mode 100644 index 000000000..2a0af8828 --- /dev/null +++ b/waku/events/events.nim @@ -0,0 +1,3 @@ +import ./[message_events, delivery_events] + +export message_events, delivery_events diff --git a/waku/events/message_events.nim b/waku/events/message_events.nim new file mode 100644 index 000000000..9c8a3b321 --- /dev/null +++ b/waku/events/message_events.nim @@ -0,0 +1,23 @@ +import waku/common/broker/event_broker +import waku/api/types + +export types + +EventBroker: + # Event emitted when a message is sent to the network + type MessageSentEvent* = object + requestId*: RequestId + messageHash*: string + +EventBroker: + # Event emitted when a message send operation fails + type MessageErrorEvent* = object + requestId*: RequestId + messageHash*: string + error*: string + +EventBroker: + # Confirmation that a message has been correctly delivered to some neighbouring nodes. + type MessagePropagatedEvent* = object + requestId*: RequestId + messageHash*: string diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index bed8a9137..057c78810 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -27,7 +27,7 @@ import ../node/peer_manager, ../node/health_monitor, ../node/waku_metrics, - ../node/delivery_monitor/delivery_monitor, + ../node/delivery_service/delivery_service, ../rest_api/message_cache, ../rest_api/endpoint/server, ../rest_api/endpoint/builder as rest_server_builder, @@ -69,7 +69,7 @@ type Waku* = ref object healthMonitor*: NodeHealthMonitor - deliveryMonitor: DeliveryMonitor + deliveryService*: DeliveryService restServer*: WakuRestServerRef metricsServer*: MetricsHttpServerRef @@ -200,16 +200,10 @@ proc new*( return err("Failed setting up app callbacks: " & $error) ## Delivery Monitor - var deliveryMonitor: DeliveryMonitor - if wakuConf.p2pReliability: - if wakuConf.remoteStoreNode.isNone(): - return err("A storenode should be set when reliability mode is on") - - let deliveryMonitor = DeliveryMonitor.new( - node.wakuStoreClient, node.wakuRelay, node.wakuLightpushClient, - node.wakuFilterClient, - ).valueOr: - return err("could not create delivery monitor: " & $error) + let deliveryService = DeliveryService.new( + wakuConf.p2pReliability, node, + ).valueOr: + return err("could not create delivery service: " & $error) var waku = Waku( version: git_version, @@ -218,7 +212,7 @@ proc new*( key: wakuConf.nodeKey, node: node, healthMonitor: healthMonitor, - deliveryMonitor: deliveryMonitor, + deliveryService: deliveryService, appCallbacks: appCallbacks, restServer: restServer, ) @@ -403,8 +397,8 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = return err("failed to start waku discovery v5: " & $error) ## Reliability - if not waku[].deliveryMonitor.isNil(): - waku[].deliveryMonitor.startDeliveryMonitor() + if not waku[].deliveryService.isNil(): + waku[].deliveryService.startDeliveryService() ## Health Monitor waku[].healthMonitor.startHealthMonitor().isOkOr: @@ -463,3 +457,16 @@ proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} = if not waku.restServer.isNil(): await waku.restServer.stop() + +proc isModeCoreAvailable*(waku: Waku): bool = + return not waku.node.wakuRelay.isNil() + +proc isModeEdgeAvailable*(waku: Waku): bool = + return + waku.node.wakuRelay.isNil() and not waku.node.wakuStoreClient.isNil() and + not waku.node.wakuFilterClient.isNil() and not waku.node.wakuLightPushClient.isNil() + +proc isP2PReliabilityEnabled*(waku: Waku): bool = + return not waku.deliveryService.isNil() + +{.pop.} diff --git a/waku/node/delivery_monitor/delivery_callback.nim b/waku/node/delivery_monitor/delivery_callback.nim deleted file mode 100644 index c996bc7b0..000000000 --- a/waku/node/delivery_monitor/delivery_callback.nim +++ /dev/null @@ -1,17 +0,0 @@ -import ../../waku_core - -type DeliveryDirection* {.pure.} = enum - PUBLISHING - RECEIVING - -type DeliverySuccess* {.pure.} = enum - SUCCESSFUL - UNSUCCESSFUL - -type DeliveryFeedbackCallback* = proc( - success: DeliverySuccess, - dir: DeliveryDirection, - comment: string, - msgHash: WakuMessageHash, - msg: WakuMessage, -) {.gcsafe, raises: [].} diff --git a/waku/node/delivery_monitor/delivery_monitor.nim b/waku/node/delivery_monitor/delivery_monitor.nim deleted file mode 100644 index 4dda542cc..000000000 --- a/waku/node/delivery_monitor/delivery_monitor.nim +++ /dev/null @@ -1,43 +0,0 @@ -## This module helps to ensure the correct transmission and reception of messages - -import results -import chronos -import - ./recv_monitor, - ./send_monitor, - ./delivery_callback, - ../../waku_core, - ../../waku_store/client, - ../../waku_relay/protocol, - ../../waku_lightpush/client, - ../../waku_filter_v2/client - -type DeliveryMonitor* = ref object - sendMonitor: SendMonitor - recvMonitor: RecvMonitor - -proc new*( - T: type DeliveryMonitor, - storeClient: WakuStoreClient, - wakuRelay: protocol.WakuRelay, - wakuLightpushClient: WakuLightpushClient, - wakuFilterClient: WakuFilterClient, -): Result[T, string] = - ## storeClient is needed to give store visitility to DeliveryMonitor - ## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendMonitor to re-publish - let sendMonitor = ?SendMonitor.new(storeClient, wakuRelay, wakuLightpushClient) - let recvMonitor = RecvMonitor.new(storeClient, wakuFilterClient) - return ok(DeliveryMonitor(sendMonitor: sendMonitor, recvMonitor: recvMonitor)) - -proc startDeliveryMonitor*(self: DeliveryMonitor) = - self.sendMonitor.startSendMonitor() - self.recvMonitor.startRecvMonitor() - -proc stopDeliveryMonitor*(self: DeliveryMonitor) {.async.} = - self.sendMonitor.stopSendMonitor() - await self.recvMonitor.stopRecvMonitor() - -proc setDeliveryCallback*(self: DeliveryMonitor, deliveryCb: DeliveryFeedbackCallback) = - ## The deliveryCb is a proc defined by the api client so that it can get delivery feedback - self.sendMonitor.setDeliveryCallback(deliveryCb) - self.recvMonitor.setDeliveryCallback(deliveryCb) diff --git a/waku/node/delivery_monitor/publish_observer.nim b/waku/node/delivery_monitor/publish_observer.nim deleted file mode 100644 index 1f517f8bd..000000000 --- a/waku/node/delivery_monitor/publish_observer.nim +++ /dev/null @@ -1,9 +0,0 @@ -import chronicles -import ../../waku_core/message/message - -type PublishObserver* = ref object of RootObj - -method onMessagePublished*( - self: PublishObserver, pubsubTopic: string, message: WakuMessage -) {.base, gcsafe, raises: [].} = - error "onMessagePublished not implemented" diff --git a/waku/node/delivery_monitor/send_monitor.nim b/waku/node/delivery_monitor/send_monitor.nim deleted file mode 100644 index 15b16065f..000000000 --- a/waku/node/delivery_monitor/send_monitor.nim +++ /dev/null @@ -1,212 +0,0 @@ -## This module reinforces the publish operation with regular store-v3 requests. -## - -import std/[sequtils, tables] -import chronos, chronicles, libp2p/utility -import - ./delivery_callback, - ./publish_observer, - ../../waku_core, - ./not_delivered_storage/not_delivered_storage, - ../../waku_store/[client, common], - ../../waku_archive/archive, - ../../waku_relay/protocol, - ../../waku_lightpush/client - -const MaxTimeInCache* = chronos.minutes(1) - ## Messages older than this time will get completely forgotten on publication and a - ## feedback will be given when that happens - -const SendCheckInterval* = chronos.seconds(3) - ## Interval at which we check that messages have been properly received by a store node - -const MaxMessagesToCheckAtOnce = 100 - ## Max number of messages to check if they were properly archived by a store node - -const ArchiveTime = chronos.seconds(3) - ## Estimation of the time we wait until we start confirming that a message has been properly - ## received and archived by a store node - -type DeliveryInfo = object - pubsubTopic: string - msg: WakuMessage - -type SendMonitor* = ref object of PublishObserver - publishedMessages: Table[WakuMessageHash, DeliveryInfo] - ## Cache that contains the delivery info per message hash. - ## This is needed to make sure the published messages are properly published - - msgStoredCheckerHandle: Future[void] ## handle that allows to stop the async task - - notDeliveredStorage: NotDeliveredStorage - ## NOTE: this is not fully used because that might be tackled by higher abstraction layers - - storeClient: WakuStoreClient - deliveryCb: DeliveryFeedbackCallback - - wakuRelay: protocol.WakuRelay - wakuLightpushClient: WakuLightPushClient - -proc new*( - T: type SendMonitor, - storeClient: WakuStoreClient, - wakuRelay: protocol.WakuRelay, - wakuLightpushClient: WakuLightPushClient, -): Result[T, string] = - if wakuRelay.isNil() and wakuLightpushClient.isNil(): - return err( - "Could not create SendMonitor. wakuRelay or wakuLightpushClient should be set" - ) - - let notDeliveredStorage = ?NotDeliveredStorage.new() - - let sendMonitor = SendMonitor( - notDeliveredStorage: notDeliveredStorage, - storeClient: storeClient, - wakuRelay: wakuRelay, - wakuLightpushClient: wakuLightPushClient, - ) - - if not wakuRelay.isNil(): - wakuRelay.addPublishObserver(sendMonitor) - - if not wakuLightpushClient.isNil(): - wakuLightpushClient.addPublishObserver(sendMonitor) - - return ok(sendMonitor) - -proc performFeedbackAndCleanup( - self: SendMonitor, - msgsToDiscard: Table[WakuMessageHash, DeliveryInfo], - success: DeliverySuccess, - dir: DeliveryDirection, - comment: string, -) = - ## This procs allows to bring delivery feedback to the API client - ## It requires a 'deliveryCb' to be registered beforehand. - if self.deliveryCb.isNil(): - error "deliveryCb is nil in performFeedbackAndCleanup", - success, dir, comment, hashes = toSeq(msgsToDiscard.keys).mapIt(shortLog(it)) - return - - for hash, deliveryInfo in msgsToDiscard: - info "send monitor performFeedbackAndCleanup", - success, dir, comment, msg_hash = shortLog(hash) - - self.deliveryCb(success, dir, comment, hash, deliveryInfo.msg) - self.publishedMessages.del(hash) - -proc checkMsgsInStore( - self: SendMonitor, msgsToValidate: Table[WakuMessageHash, DeliveryInfo] -): Future[ - Result[ - tuple[ - publishedCorrectly: Table[WakuMessageHash, DeliveryInfo], - notYetPublished: Table[WakuMessageHash, DeliveryInfo], - ], - void, - ] -] {.async.} = - let hashesToValidate = toSeq(msgsToValidate.keys) - - let storeResp: StoreQueryResponse = ( - await self.storeClient.queryToAny( - StoreQueryRequest(includeData: false, messageHashes: hashesToValidate) - ) - ).valueOr: - error "checkMsgsInStore failed to get remote msgHashes", - hashes = hashesToValidate.mapIt(shortLog(it)), error = $error - return err() - - let publishedHashes = storeResp.messages.mapIt(it.messageHash) - - var notYetPublished: Table[WakuMessageHash, DeliveryInfo] - var publishedCorrectly: Table[WakuMessageHash, DeliveryInfo] - - for msgHash, deliveryInfo in msgsToValidate.pairs: - if publishedHashes.contains(msgHash): - publishedCorrectly[msgHash] = deliveryInfo - self.publishedMessages.del(msgHash) ## we will no longer track that message - else: - notYetPublished[msgHash] = deliveryInfo - - return ok((publishedCorrectly: publishedCorrectly, notYetPublished: notYetPublished)) - -proc processMessages(self: SendMonitor) {.async.} = - var msgsToValidate: Table[WakuMessageHash, DeliveryInfo] - var msgsToDiscard: Table[WakuMessageHash, DeliveryInfo] - - let now = getNowInNanosecondTime() - let timeToCheckThreshold = now - ArchiveTime.nanos - let maxLifeTime = now - MaxTimeInCache.nanos - - for hash, deliveryInfo in self.publishedMessages.pairs: - if deliveryInfo.msg.timestamp < maxLifeTime: - ## message is too old - msgsToDiscard[hash] = deliveryInfo - - if deliveryInfo.msg.timestamp < timeToCheckThreshold: - msgsToValidate[hash] = deliveryInfo - - ## Discard the messages that are too old - self.performFeedbackAndCleanup( - msgsToDiscard, DeliverySuccess.UNSUCCESSFUL, DeliveryDirection.PUBLISHING, - "Could not publish messages. Please try again.", - ) - - let (publishedCorrectly, notYetPublished) = ( - await self.checkMsgsInStore(msgsToValidate) - ).valueOr: - return ## the error log is printed in checkMsgsInStore - - ## Give positive feedback for the correctly published messages - self.performFeedbackAndCleanup( - publishedCorrectly, DeliverySuccess.SUCCESSFUL, DeliveryDirection.PUBLISHING, - "messages published correctly", - ) - - ## Try to publish again - for msgHash, deliveryInfo in notYetPublished.pairs: - let pubsubTopic = deliveryInfo.pubsubTopic - let msg = deliveryInfo.msg - if not self.wakuRelay.isNil(): - info "trying to publish again with wakuRelay", msgHash, pubsubTopic - (await self.wakuRelay.publish(pubsubTopic, msg)).isOkOr: - error "could not publish with wakuRelay.publish", - msgHash, pubsubTopic, error = $error - continue - - if not self.wakuLightpushClient.isNil(): - info "trying to publish again with wakuLightpushClient", msgHash, pubsubTopic - (await self.wakuLightpushClient.publishToAny(pubsubTopic, msg)).isOkOr: - error "could not publish with publishToAny", error = $error - continue - -proc checkIfMessagesStored(self: SendMonitor) {.async.} = - ## Continuously monitors that the sent messages have been received by a store node - while true: - await self.processMessages() - await sleepAsync(SendCheckInterval) - -method onMessagePublished( - self: SendMonitor, pubsubTopic: string, msg: WakuMessage -) {.gcsafe, raises: [].} = - ## Implementation of the PublishObserver interface. - ## - ## When publishing a message either through relay or lightpush, we want to add some extra effort - ## to make sure it is received to one store node. Hence, keep track of those published messages. - - info "onMessagePublished" - let msgHash = computeMessageHash(pubSubTopic, msg) - - if not self.publishedMessages.hasKey(msgHash): - self.publishedMessages[msgHash] = DeliveryInfo(pubsubTopic: pubsubTopic, msg: msg) - -proc startSendMonitor*(self: SendMonitor) = - self.msgStoredCheckerHandle = self.checkIfMessagesStored() - -proc stopSendMonitor*(self: SendMonitor) = - discard self.msgStoredCheckerHandle.cancelAndWait() - -proc setDeliveryCallback*(self: SendMonitor, deliveryCb: DeliveryFeedbackCallback) = - self.deliveryCb = deliveryCb diff --git a/waku/node/delivery_monitor/subscriptions_observer.nim b/waku/node/delivery_monitor/subscriptions_observer.nim deleted file mode 100644 index 800117ae9..000000000 --- a/waku/node/delivery_monitor/subscriptions_observer.nim +++ /dev/null @@ -1,13 +0,0 @@ -import chronicles - -type SubscriptionObserver* = ref object of RootObj - -method onSubscribe*( - self: SubscriptionObserver, pubsubTopic: string, contentTopics: seq[string] -) {.base, gcsafe, raises: [].} = - error "onSubscribe not implemented" - -method onUnsubscribe*( - self: SubscriptionObserver, pubsubTopic: string, contentTopics: seq[string] -) {.base, gcsafe, raises: [].} = - error "onUnsubscribe not implemented" diff --git a/waku/node/delivery_service/delivery_service.nim b/waku/node/delivery_service/delivery_service.nim new file mode 100644 index 000000000..3019c0dfb --- /dev/null +++ b/waku/node/delivery_service/delivery_service.nim @@ -0,0 +1,36 @@ +## This module helps to ensure the correct transmission and reception of messages + +import results +import chronos +import + ./recv_service, + ./send_service, + waku/[ + waku_core, + waku_node, + waku_store/client, + waku_relay/protocol, + waku_lightpush/client, + waku_filter_v2/client, + ] + +type DeliveryService* = ref object + sendService*: SendService + recvService: RecvService + +proc new*( + T: type DeliveryService, useP2PReliability: bool, w: WakuNode +): Result[T, string] = + ## storeClient is needed to give store visitility to DeliveryService + ## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendService to re-publish + let sendService = ?SendService.new(useP2PReliability, w) + let recvService = RecvService.new(w) + return ok(DeliveryService(sendService: sendService, recvService: recvService)) + +proc startDeliveryService*(self: DeliveryService) = + self.sendService.startSendService() + self.recvService.startRecvService() + +proc stopDeliveryService*(self: DeliveryService) {.async.} = + self.sendService.stopSendService() + await self.recvService.stopRecvService() diff --git a/waku/node/delivery_monitor/not_delivered_storage/migrations.nim b/waku/node/delivery_service/not_delivered_storage/migrations.nim similarity index 95% rename from waku/node/delivery_monitor/not_delivered_storage/migrations.nim rename to waku/node/delivery_service/not_delivered_storage/migrations.nim index 8175aea62..807074d64 100644 --- a/waku/node/delivery_monitor/not_delivered_storage/migrations.nim +++ b/waku/node/delivery_service/not_delivered_storage/migrations.nim @@ -4,7 +4,7 @@ import std/[tables, strutils, os], results, chronicles import ../../../common/databases/db_sqlite, ../../../common/databases/common logScope: - topics = "waku node delivery_monitor" + topics = "waku node delivery_service" const TargetSchemaVersion* = 1 # increase this when there is an update in the database schema diff --git a/waku/node/delivery_monitor/not_delivered_storage/not_delivered_storage.nim b/waku/node/delivery_service/not_delivered_storage/not_delivered_storage.nim similarity index 93% rename from waku/node/delivery_monitor/not_delivered_storage/not_delivered_storage.nim rename to waku/node/delivery_service/not_delivered_storage/not_delivered_storage.nim index 85611310b..b0f5f5828 100644 --- a/waku/node/delivery_monitor/not_delivered_storage/not_delivered_storage.nim +++ b/waku/node/delivery_service/not_delivered_storage/not_delivered_storage.nim @@ -1,17 +1,17 @@ ## This module is aimed to keep track of the sent/published messages that are considered ## not being properly delivered. -## +## ## The archiving of such messages will happen in a local sqlite database. -## +## ## In the very first approach, we consider that a message is sent properly is it has been ## received by any store node. -## +## import results import ../../../common/databases/db_sqlite, ../../../waku_core/message/message, - ../../../node/delivery_monitor/not_delivered_storage/migrations + ../../../node/delivery_service/not_delivered_storage/migrations const NotDeliveredMessagesDbUrl = "not-delivered-messages.db" diff --git a/waku/node/delivery_service/recv_service.nim b/waku/node/delivery_service/recv_service.nim new file mode 100644 index 000000000..c4dcf4fef --- /dev/null +++ b/waku/node/delivery_service/recv_service.nim @@ -0,0 +1,3 @@ +import ./recv_service/recv_service + +export recv_service diff --git a/waku/node/delivery_monitor/recv_monitor.nim b/waku/node/delivery_service/recv_service/recv_service.nim similarity index 72% rename from waku/node/delivery_monitor/recv_monitor.nim rename to waku/node/delivery_service/recv_service/recv_service.nim index 6ea35d301..9fe98b4bd 100644 --- a/waku/node/delivery_monitor/recv_monitor.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -5,12 +5,15 @@ import std/[tables, sequtils, options] import chronos, chronicles, libp2p/utility import - ../../waku_core, - ./delivery_callback, - ./subscriptions_observer, - ../../waku_store/[client, common], - ../../waku_filter_v2/client, - ../../waku_core/topics + waku/[ + waku_core, + waku_store/client, + waku_store/common, + waku_filter_v2/client, + waku_core/topics, + events/delivery_events, + waku_node, + ] const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries @@ -28,14 +31,14 @@ type RecvMessage = object rxTime: Timestamp ## timestamp of the rx message. We will not keep the rx messages forever -type RecvMonitor* = ref object of SubscriptionObserver +type RecvService* = ref object of RootObj topicsInterest: Table[PubsubTopic, seq[ContentTopic]] ## Tracks message verification requests and when was the last time a ## pubsub topic was verified for missing messages ## The key contains pubsub-topics - - storeClient: WakuStoreClient - deliveryCb: DeliveryFeedbackCallback + node: WakuNode + onSubscribeListener: OnFilterSubscribeEventListener + onUnsubscribeListener: OnFilterUnsubscribeEventListener recentReceivedMsgs: seq[RecvMessage] @@ -46,10 +49,10 @@ type RecvMonitor* = ref object of SubscriptionObserver endTimeToCheck: Timestamp proc getMissingMsgsFromStore( - self: RecvMonitor, msgHashes: seq[WakuMessageHash] + self: RecvService, msgHashes: seq[WakuMessageHash] ): Future[Result[seq[TupleHashAndMsg], string]] {.async.} = let storeResp: StoreQueryResponse = ( - await self.storeClient.queryToAny( + await self.node.wakuStoreClient.queryToAny( StoreQueryRequest(includeData: true, messageHashes: msgHashes) ) ).valueOr: @@ -62,25 +65,21 @@ proc getMissingMsgsFromStore( ) proc performDeliveryFeedback( - self: RecvMonitor, + self: RecvService, success: DeliverySuccess, dir: DeliveryDirection, comment: string, msgHash: WakuMessageHash, msg: WakuMessage, ) {.gcsafe, raises: [].} = - ## This procs allows to bring delivery feedback to the API client - ## It requires a 'deliveryCb' to be registered beforehand. - if self.deliveryCb.isNil(): - error "deliveryCb is nil in performDeliveryFeedback", - success, dir, comment, msg_hash - return - info "recv monitor performDeliveryFeedback", success, dir, comment, msg_hash = shortLog(msgHash) - self.deliveryCb(success, dir, comment, msgHash, msg) -proc msgChecker(self: RecvMonitor) {.async.} = + DeliveryFeedbackEvent.emit( + success = success, dir = dir, comment = comment, msgHash = msgHash, msg = msg + ) + +proc msgChecker(self: RecvService) {.async.} = ## Continuously checks if a message has been received while true: await sleepAsync(StoreCheckPeriod) @@ -90,7 +89,7 @@ proc msgChecker(self: RecvMonitor) {.async.} = var msgHashesInStore = newSeq[WakuMessageHash](0) for pubsubTopic, cTopics in self.topicsInterest.pairs: let storeResp: StoreQueryResponse = ( - await self.storeClient.queryToAny( + await self.node.wakuStoreClient.queryToAny( StoreQueryRequest( includeData: false, pubsubTopic: some(PubsubTopic(pubsubTopic)), @@ -126,8 +125,8 @@ proc msgChecker(self: RecvMonitor) {.async.} = ## update next check times self.startTimeToCheck = self.endTimeToCheck -method onSubscribe( - self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string] +proc onSubscribe( + self: RecvService, pubsubTopic: string, contentTopics: seq[string] ) {.gcsafe, raises: [].} = info "onSubscribe", pubsubTopic, contentTopics self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): @@ -135,8 +134,8 @@ method onSubscribe( do: self.topicsInterest[pubsubTopic] = contentTopics -method onUnsubscribe( - self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string] +proc onUnsubscribe( + self: RecvService, pubsubTopic: string, contentTopics: seq[string] ) {.gcsafe, raises: [].} = info "onUnsubscribe", pubsubTopic, contentTopics @@ -150,19 +149,13 @@ method onUnsubscribe( do: error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics -proc new*( - T: type RecvMonitor, - storeClient: WakuStoreClient, - wakuFilterClient: WakuFilterClient, -): T = +proc new*(T: type RecvService, node: WakuNode): T = ## The storeClient will help to acquire any possible missed messages let now = getNowInNanosecondTime() - var recvMonitor = RecvMonitor(storeClient: storeClient, startTimeToCheck: now) - - if not wakuFilterClient.isNil(): - wakuFilterClient.addSubscrObserver(recvMonitor) + var recvService = RecvService(node: node, startTimeToCheck: now) + if not node.wakuFilterClient.isNil(): let filterPushHandler = proc( pubsubTopic: PubsubTopic, message: WakuMessage ) {.async, closure.} = @@ -170,27 +163,40 @@ proc new*( let msgHash = computeMessageHash(pubSubTopic, message) let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) - recvMonitor.recentReceivedMsgs.add(rxMsg) + recvService.recentReceivedMsgs.add(rxMsg) - wakuFilterClient.registerPushHandler(filterPushHandler) + node.wakuFilterClient.registerPushHandler(filterPushHandler) - return recvMonitor + return recvService -proc loopPruneOldMessages(self: RecvMonitor) {.async.} = +proc loopPruneOldMessages(self: RecvService) {.async.} = while true: let oldestAllowedTime = getNowInNanosecondTime() - MaxMessageLife.nanos self.recentReceivedMsgs.keepItIf(it.rxTime > oldestAllowedTime) await sleepAsync(PruneOldMsgsPeriod) -proc startRecvMonitor*(self: RecvMonitor) = +proc startRecvService*(self: RecvService) = self.msgCheckerHandler = self.msgChecker() self.msgPrunerHandler = self.loopPruneOldMessages() -proc stopRecvMonitor*(self: RecvMonitor) {.async.} = + self.onSubscribeListener = OnFilterSubscribeEvent.listen( + proc(subsEv: OnFilterSubscribeEvent): Future[void] {.async: (raises: []).} = + self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics) + ).valueOr: + error "Failed to set OnFilterSubscribeEvent listener", error = error + quit(QuitFailure) + + self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen( + proc(subsEv: OnFilterUnsubscribeEvent): Future[void] {.async: (raises: []).} = + self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics) + ).valueOr: + error "Failed to set OnFilterUnsubscribeEvent listener", error = error + quit(QuitFailure) + +proc stopRecvService*(self: RecvService) {.async.} = + OnFilterSubscribeEvent.dropListener(self.onSubscribeListener) + OnFilterUnSubscribeEvent.dropListener(self.onUnsubscribeListener) if not self.msgCheckerHandler.isNil(): await self.msgCheckerHandler.cancelAndWait() if not self.msgPrunerHandler.isNil(): await self.msgPrunerHandler.cancelAndWait() - -proc setDeliveryCallback*(self: RecvMonitor, deliveryCb: DeliveryFeedbackCallback) = - self.deliveryCb = deliveryCb diff --git a/waku/node/delivery_service/send_service.nim b/waku/node/delivery_service/send_service.nim new file mode 100644 index 000000000..de0dbf6a3 --- /dev/null +++ b/waku/node/delivery_service/send_service.nim @@ -0,0 +1,6 @@ +## This module reinforces the publish operation with regular store-v3 requests. +## + +import ./send_service/[send_service, delivery_task] + +export send_service, delivery_task diff --git a/waku/node/delivery_service/send_service/delivery_task.nim b/waku/node/delivery_service/send_service/delivery_task.nim new file mode 100644 index 000000000..efd272ad7 --- /dev/null +++ b/waku/node/delivery_service/send_service/delivery_task.nim @@ -0,0 +1,66 @@ +import std/[options, times], chronos +import waku/waku_core, waku/api/types, waku/requests/node_requests + +type DeliveryState* {.pure.} = enum + Entry + SuccessfullyPropagated + SuccessfullyValidated + FallbackRetry + NextRoundRetry + FailedToDeliver + +type DeliveryTask* = ref object + requestId*: RequestId + pubsubTopic*: PubsubTopic + msg*: WakuMessage + msgHash*: WakuMessageHash + tryCount*: int + state*: DeliveryState + deliveryTime*: Moment + errorDesc*: string + +proc create*( + T: type DeliveryTask, requestId: RequestId, envelop: MessageEnvelope +): Result[T, string] = + let msg = envelop.toWakuMessage() + # TODO: use sync request for such as soon as available + let relayShardRes = ( + waitFor RequestRelayShard.request(none[PubsubTopic](), envelop.contentTopic) + ).valueOr: + return err($error) + + let pubsubTopic = relayShardRes.relayShard.toPubsubTopic() + let msgHash = computeMessageHash(pubsubTopic, msg) + + return ok( + T( + requestId: requestId, + pubsubTopic: pubsubTopic, + msg: msg, + msgHash: msgHash, + tryCount: 0, + state: DeliveryState.Entry, + ) + ) + +func `==`*(r, l: DeliveryTask): bool = + if r.isNil() == l.isNil(): + r.isNil() or r.msgHash == l.msgHash + else: + false + +proc messageAge*(self: DeliveryTask): timer.Duration = + let actual = getNanosecondTime(getTime().toUnixFloat()) + if self.msg.timestamp >= 0 and self.msg.timestamp < actual: + nanoseconds(actual - self.msg.timestamp) + else: + ZeroDuration + +proc deliveryAge*(self: DeliveryTask): timer.Duration = + if self.state == DeliveryState.SuccessfullyPropagated: + timer.Moment.now() - self.deliveryTime + else: + ZeroDuration + +proc isEphemeral*(self: DeliveryTask): bool = + return self.msg.ephemeral diff --git a/waku/node/delivery_service/send_service/lightpush_processor.nim b/waku/node/delivery_service/send_service/lightpush_processor.nim new file mode 100644 index 000000000..8d7ab5a5e --- /dev/null +++ b/waku/node/delivery_service/send_service/lightpush_processor.nim @@ -0,0 +1,74 @@ +import chronicles, chronos, results +import std/options + +import + waku/waku_node, + waku/waku_core, + waku/node/peer_manager, + waku/waku_lightpush/[callbacks, common, client, rpc] + +import ./[delivery_task, send_processor] + +logScope: + topics = "send service lightpush processor" + +type LightpushSendProcessor* = ref object of BaseSendProcessor + peerManager: PeerManager + lightpushClient: WakuLightPushClient + +proc new*( + T: type LightpushSendProcessor, + peerManager: PeerManager, + lightpushClient: WakuLightPushClient, +): T = + return T(peerManager: peerManager, lightpushClient: lightpushClient) + +proc isLightpushPeerAvailable( + self: LightpushSendProcessor, pubsubTopic: PubsubTopic +): bool = + return self.peerManager.selectPeer(WakuLightPushCodec, some(pubsubTopic)).isSome() + +method isValidProcessor*( + self: LightpushSendProcessor, task: DeliveryTask +): Future[bool] {.async.} = + return self.isLightpushPeerAvailable(task.pubsubTopic) + +method sendImpl*( + self: LightpushSendProcessor, task: DeliveryTask +): Future[void] {.async.} = + task.tryCount.inc() + info "Trying message delivery via Lightpush", + requestId = task.requestId, msgHash = task.msgHash, tryCount = task.tryCount + + let peer = self.peerManager.selectPeer(WakuLightPushCodec, some(task.pubsubTopic)).valueOr: + task.state = DeliveryState.NextRoundRetry + return + + let pushResult = + await self.lightpushClient.publish(some(task.pubsubTopic), task.msg, peer) + if pushResult.isErr: + error "LightpushSendProcessor sendImpl failed", + error = pushResult.error.desc.get($pushResult.error.code) + case pushResult.error.code + of LightPushErrorCode.NO_PEERS_TO_RELAY, LightPushErrorCode.TOO_MANY_REQUESTS, + LightPushErrorCode.OUT_OF_RLN_PROOF, LightPushErrorCode.SERVICE_NOT_AVAILABLE, + LightPushErrorCode.INTERNAL_SERVER_ERROR: + task.state = DeliveryState.NextRoundRetry + else: + # the message is malformed, send error + task.state = DeliveryState.FailedToDeliver + task.errorDesc = pushResult.error.desc.get($pushResult.error.code) + task.deliveryTime = Moment.now() + return + + if pushResult.isOk and pushResult.get() > 0: + info "Message propagated via Relay", + requestId = task.requestId, msgHash = task.msgHash + task.state = DeliveryState.SuccessfullyPropagated + task.deliveryTime = Moment.now() + # TODO: with a simple retry processor it might be more accurate to say `Sent` + else: + # Controversial state, publish says ok but no peer. It should not happen. + task.state = DeliveryState.NextRoundRetry + + return diff --git a/waku/node/delivery_service/send_service/relay_processor.nim b/waku/node/delivery_service/send_service/relay_processor.nim new file mode 100644 index 000000000..7f7fdc8dc --- /dev/null +++ b/waku/node/delivery_service/send_service/relay_processor.nim @@ -0,0 +1,67 @@ +import chronos, chronicles +import std/options +import waku/[waku_node, waku_core], waku/waku_lightpush/[common, callbacks, rpc] +import waku/requests/health_request +import waku/api/types +import ./[delivery_task, send_processor] + +logScope: + topics = "send service relay processor" + +type RelaySendProcessor* = ref object of BaseSendProcessor + publishProc: PushMessageHandler + fallbackStateToSet: DeliveryState + +proc new*( + T: type RelaySendProcessor, + lightpushAvailable: bool, + publishProc: PushMessageHandler, +): RelaySendProcessor = + let fallbackStateToSet = + if lightpushAvailable: + DeliveryState.FallbackRetry + else: + DeliveryState.FailedToDeliver + + return + RelaySendProcessor(publishProc: publishProc, fallbackStateToSet: fallbackStateToSet) + +proc isTopicHealthy(topic: PubsubTopic): Future[bool] {.async.} = + let healthReport = (await RequestRelayTopicsHealth.request(@[topic])).valueOr: + return false + + if healthReport.topicHealth.len() < 1: + return false + let health = healthReport.topicHealth[0].health + return health == MINIMALLY_HEALTHY or health == SUFFICIENTLY_HEALTHY + +method isValidProcessor*( + self: RelaySendProcessor, task: DeliveryTask +): Future[bool] {.async.} = + return await isTopicHealthy(task.pubsubTopic) + +method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} = + task.tryCount.inc() + info "Trying message delivery via Relay", + requestId = task.requestId, msgHash = task.msgHash, tryCount = task.tryCount + + let pushResult = await self.publishProc(task.pubsubTopic, task.msg) + if pushResult.isErr(): + let errorMessage = pushResult.error.desc.get($pushResult.error.code) + error "Failed to publish message with relay", + request = task.requestId, msgHash = task.msgHash, error = errorMessage + if pushResult.error.code != LightPushErrorCode.NO_PEERS_TO_RELAY: + task.state = DeliveryState.FailedToDeliver + task.errorDesc = errorMessage + else: + task.state = self.fallbackStateToSet + return + + if pushResult.isOk and pushResult.get() > 0: + info "Message propagated via Relay", + requestId = task.requestId, msgHash = task.msgHash + task.state = DeliveryState.SuccessfullyPropagated + task.deliveryTime = Moment.now() + else: + # It shall not happen, but still covering it + task.state = self.fallbackStateToSet diff --git a/waku/node/delivery_service/send_service/send_processor.nim b/waku/node/delivery_service/send_service/send_processor.nim new file mode 100644 index 000000000..3969560c5 --- /dev/null +++ b/waku/node/delivery_service/send_service/send_processor.nim @@ -0,0 +1,32 @@ +import chronos +import ./delivery_task + +type BaseSendProcessor* = ref object of RootObj + fallbackProcessor*: BaseSendProcessor + +proc chain*(self: BaseSendProcessor, next: BaseSendProcessor) = + self.fallbackProcessor = next + +method isValidProcessor*( + self: BaseSendProcessor, task: DeliveryTask +): Future[bool] {.async, base.} = + return false + +method sendImpl*( + self: BaseSendProcessor, task: DeliveryTask +): Future[void] {.async, base.} = + assert false, "Not implemented" + +method process*( + self: BaseSendProcessor, task: DeliveryTask +): Future[void] {.async, base.} = + var currentProcessor: BaseSendProcessor = self + var keepTrying = true + while not currentProcessor.isNil() and keepTrying: + if await currentProcessor.isValidProcessor(task): + await currentProcessor.sendImpl(task) + currentProcessor = currentProcessor.fallbackProcessor + keepTrying = task.state == DeliveryState.FallbackRetry + + if task.state == DeliveryState.FallbackRetry: + task.state = DeliveryState.NextRoundRetry diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim new file mode 100644 index 000000000..82e27d637 --- /dev/null +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -0,0 +1,238 @@ +## This module reinforces the publish operation with regular store-v3 requests. +## + +import std/[sequtils, tables, options] +import chronos, chronicles, libp2p/utility +import + ./[send_processor, relay_processor, lightpush_processor, delivery_task], + waku/[ + waku_core, + node/waku_node, + node/peer_manager, + waku_store/client, + waku_store/common, + waku_archive/archive, + waku_relay/protocol, + waku_rln_relay/rln_relay, + waku_lightpush/client, + waku_lightpush/callbacks, + events/delivery_events, + events/message_events, + ] + +logScope: + topics = "send service" + +# This useful util is missing from sequtils, this extends applyIt with predicate... +template applyItIf*(varSeq, pred, op: untyped) = + for i in low(varSeq) .. high(varSeq): + let it {.inject.} = varSeq[i] + if pred: + op + varSeq[i] = it + +template forEach*(varSeq, op: untyped) = + for i in low(varSeq) .. high(varSeq): + let it {.inject.} = varSeq[i] + op + +const MaxTimeInCache* = chronos.minutes(1) + ## Messages older than this time will get completely forgotten on publication and a + ## feedback will be given when that happens + +const ServiceLoopInterval* = chronos.seconds(1) + ## Interval at which we check that messages have been properly received by a store node + +const ArchiveTime = chronos.seconds(3) + ## Estimation of the time we wait until we start confirming that a message has been properly + ## received and archived by a store node + +type SendService* = ref object of RootObj + taskCache: seq[DeliveryTask] + ## Cache that contains the delivery task per message hash. + ## This is needed to make sure the published messages are properly published + + serviceLoopHandle: Future[void] ## handle that allows to stop the async task + sendProcessor: BaseSendProcessor + + node: WakuNode + checkStoreForMessages: bool + +proc setupSendProcessorChain( + peerManager: PeerManager, + lightpushClient: WakuLightPushClient, + relay: WakuRelay, + rlnRelay: WakuRLNRelay, +): Result[BaseSendProcessor, string] = + let isRelayAvail = not relay.isNil() + let isLightPushAvail = not lightpushClient.isNil() + + if not isRelayAvail and not isLightPushAvail: + return err("No valid send processor found for the delivery task") + + var processors = newSeq[BaseSendProcessor]() + + if isRelayAvail: + let rln: Option[WakuRLNRelay] = + if rlnRelay.isNil(): + none[WakuRLNRelay]() + else: + some(rlnRelay) + let publishProc = getRelayPushHandler(relay, rln) + + processors.add(RelaySendProcessor.new(isLightPushAvail, publishProc)) + if isLightPushAvail: + processors.add(LightpushSendProcessor.new(peerManager, lightpushClient)) + + var currentProcessor: BaseSendProcessor = processors[0] + for i in 1 ..< processors.len(): + currentProcessor.chain(processors[i]) + currentProcessor = processors[i] + + return ok(processors[0]) + +proc new*( + T: type SendService, preferP2PReliability: bool, w: WakuNode +): Result[T, string] = + if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil(): + return err( + "Could not create SendService. wakuRelay or wakuLightpushClient should be set" + ) + + let checkStoreForMessages = preferP2PReliability and not w.wakuStoreClient.isNil() + + let sendProcessorChain = setupSendProcessorChain( + w.peerManager, w.wakuLightPushClient, w.wakuRelay, w.wakuRlnRelay + ).valueOr: + return err(error) + + let sendService = SendService( + taskCache: newSeq[DeliveryTask](), + serviceLoopHandle: nil, + sendProcessor: sendProcessorChain, + node: w, + checkStoreForMessages: checkStoreForMessages, + ) + + return ok(sendService) + +proc addTask(self: SendService, task: DeliveryTask) = + self.taskCache.addUnique(task) + +proc isStorePeerAvailable*(sendService: SendService): bool = + return sendService.node.peerManager.selectPeer(WakuStoreCodec).isSome() + +proc checkMsgsInStore(self: SendService, tasksToValidate: seq[DeliveryTask]) {.async.} = + if tasksToValidate.len() == 0: + return + + if not isStorePeerAvailable(self): + warn "Skipping store validation for ", + messageCount = tasksToValidate.len(), error = "no store peer available" + return + + var hashesToValidate = tasksToValidate.mapIt(it.msgHash) + + let storeResp: StoreQueryResponse = ( + await self.node.wakuStoreClient.queryToAny( + StoreQueryRequest(includeData: false, messageHashes: hashesToValidate) + ) + ).valueOr: + error "Failed to get store validation for messages", + hashes = hashesToValidate.mapIt(shortLog(it)), error = $error + return + + let storedItems = storeResp.messages.mapIt(it.messageHash) + + # Set success state for messages found in store + self.taskCache.applyItIf(storedItems.contains(it.msgHash)): + it.state = DeliveryState.SuccessfullyValidated + + # set retry state for messages not found in store + hashesToValidate.keepItIf(not storedItems.contains(it)) + self.taskCache.applyItIf(hashesToValidate.contains(it.msgHash)): + it.state = DeliveryState.NextRoundRetry + +proc checkStoredMessages(self: SendService) {.async.} = + if not self.checkStoreForMessages: + return + + let tasksToValidate = self.taskCache.filterIt( + it.state == DeliveryState.SuccessfullyPropagated and it.deliveryAge() > ArchiveTime and + not it.isEphemeral() + ) + + await self.checkMsgsInStore(tasksToValidate) + +proc reportTaskResult(self: SendService, task: DeliveryTask) = + case task.state + of DeliveryState.SuccessfullyPropagated: + # TODO: in case of of unable to strore check messages shall we report success instead? + info "Message successfully propagated", + requestId = task.requestId, msgHash = task.msgHash + MessagePropagatedEvent.emit(task.requestId, task.msgHash.toString()) + return + of DeliveryState.SuccessfullyValidated: + info "Message successfully sent", requestId = task.requestId, msgHash = task.msgHash + MessageSentEvent.emit(task.requestId, task.msgHash.toString()) + return + of DeliveryState.FailedToDeliver: + error "Failed to send message", + requestId = task.requestId, msgHash = task.msgHash, error = task.errorDesc + MessageErrorEvent.emit(task.requestId, task.msgHash.toString(), task.errorDesc) + return + else: + # rest of the states are intermediate and does not translate to event + discard + + if task.messageAge() > MaxTimeInCache: + error "Failed to send message", + requestId = task.requestId, msgHash = task.msgHash, error = "Message too old" + task.state = DeliveryState.FailedToDeliver + MessageErrorEvent.emit( + task.requestId, task.msgHash.toString(), "Unable to send within retry time window" + ) + +proc evaluateAndCleanUp(self: SendService) = + self.taskCache.forEach(self.reportTaskResult(it)) + self.taskCache.keepItIf( + it.state != DeliveryState.SuccessfullyValidated or + it.state != DeliveryState.FailedToDeliver + ) + + # remove propagated ephemeral messages as no store check is possible + self.taskCache.keepItIf( + not (it.isEphemeral() and it.state == DeliveryState.SuccessfullyPropagated) + ) + +proc trySendMessages(self: SendService) {.async.} = + let tasksToSend = self.taskCache.filterIt(it.state == DeliveryState.NextRoundRetry) + + for task in tasksToSend: + # Todo, check if it has any perf gain to run them concurrent... + await self.sendProcessor.process(task) + +proc serviceLoop(self: SendService) {.async.} = + ## Continuously monitors that the sent messages have been received by a store node + while true: + await self.trySendMessages() + await self.checkStoredMessages() + self.evaluateAndCleanUp() + ## TODO: add circuit breaker to avoid infinite looping in case of persistent failures + ## Use OnlienStateChange observers to pause/resume the loop + await sleepAsync(ServiceLoopInterval) + +proc startSendService*(self: SendService) = + self.serviceLoopHandle = self.serviceLoop() + +proc stopSendService*(self: SendService) = + if not self.serviceLoopHandle.isNil(): + discard self.serviceLoopHandle.cancelAndWait() + +proc send*(self: SendService, task: DeliveryTask): Future[void] {.async.} = + assert(not task.isNil(), "task for send must not be nil") + + await self.sendProcessor.process(task) + reportTaskResult(self, task) + if task.state != DeliveryState.FailedToDeliver: + self.addTask(task) diff --git a/waku/waku_relay/topic_health.nim b/waku/node/health_monitor/topic_health.nim similarity index 84% rename from waku/waku_relay/topic_health.nim rename to waku/node/health_monitor/topic_health.nim index 774abc584..5a1ea0a16 100644 --- a/waku/waku_relay/topic_health.nim +++ b/waku/node/health_monitor/topic_health.nim @@ -1,11 +1,12 @@ import chronos -import ../waku_core +import waku/waku_core type TopicHealth* = enum UNHEALTHY MINIMALLY_HEALTHY SUFFICIENTLY_HEALTHY + NOT_SUBSCRIBED proc `$`*(t: TopicHealth): string = result = @@ -13,6 +14,7 @@ proc `$`*(t: TopicHealth): string = of UNHEALTHY: "UnHealthy" of MINIMALLY_HEALTHY: "MinimallyHealthy" of SUFFICIENTLY_HEALTHY: "SufficientlyHealthy" + of NOT_SUBSCRIBED: "NotSubscribed" type TopicHealthChangeHandler* = proc( pubsubTopic: PubsubTopic, topicHealth: TopicHealth diff --git a/waku/node/kernel_api/lightpush.nim b/waku/node/kernel_api/lightpush.nim index 9451767ac..004b52766 100644 --- a/waku/node/kernel_api/lightpush.nim +++ b/waku/node/kernel_api/lightpush.nim @@ -188,7 +188,6 @@ proc lightpushPublishHandler( mixify: bool = false, ): Future[lightpush_protocol.WakuLightPushResult] {.async.} = let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() - if not node.wakuLightpushClient.isNil(): notice "publishing message with lightpush", pubsubTopic = pubsubTopic, @@ -196,21 +195,23 @@ proc lightpushPublishHandler( target_peer_id = peer.peerId, msg_hash = msgHash, mixify = mixify - if mixify: #indicates we want to use mix to send the message - #TODO: How to handle multiple addresses? - let conn = node.wakuMix.toConnection( - MixDestination.exitNode(peer.peerId), - WakuLightPushCodec, - MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1))), - # indicating we only want a single path to be used for reply hence numSurbs = 1 - ).valueOr: - error "could not create mix connection" - return lighpushErrorResult( - LightPushErrorCode.SERVICE_NOT_AVAILABLE, - "Waku lightpush with mix not available", - ) + if defined(libp2p_mix_experimental_exit_is_dest) and mixify: + #indicates we want to use mix to send the message + when defined(libp2p_mix_experimental_exit_is_dest): + #TODO: How to handle multiple addresses? + let conn = node.wakuMix.toConnection( + MixDestination.exitNode(peer.peerId), + WakuLightPushCodec, + MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1))), + # indicating we only want a single path to be used for reply hence numSurbs = 1 + ).valueOr: + error "could not create mix connection" + return lighpushErrorResult( + LightPushErrorCode.SERVICE_NOT_AVAILABLE, + "Waku lightpush with mix not available", + ) - return await node.wakuLightpushClient.publish(some(pubsubTopic), message, conn) + return await node.wakuLightpushClient.publish(some(pubsubTopic), message, conn) else: return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer) @@ -259,7 +260,7 @@ proc lightpushPublish*( LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers" ) - let pubsubForPublish = pubSubTopic.valueOr: + let pubsubForPublish = pubsubTopic.valueOr: if node.wakuAutoSharding.isNone(): let msg = "Pubsub topic must be specified when static sharding is enabled" error "lightpush publish error", error = msg diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index 827cc1e5f..819bbb7b9 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -165,7 +165,7 @@ proc unsubscribe*( proc publish*( node: WakuNode, pubsubTopicOp: Option[PubsubTopic], message: WakuMessage -): Future[Result[void, string]] {.async, gcsafe.} = +): Future[Result[int, string]] {.async, gcsafe.} = ## Publish a `WakuMessage`. Pubsub topic contains; none, a named or static shard. ## `WakuMessage` should contain a `contentTopic` field for light node functionality. ## It is also used to determine the shard. @@ -184,16 +184,20 @@ proc publish*( let msg = "Autosharding error: " & error return err(msg) - #TODO instead of discard return error when 0 peers received the message - discard await node.wakuRelay.publish(pubsubTopic, message) + let numPeers = (await node.wakuRelay.publish(pubsubTopic, message)).valueOr: + warn "waku.relay did not publish", error = error + # Todo: If NoPeersToPublish, we might want to return ok(0) instead!!! + return err($error) notice "waku.relay published", peerId = node.peerId, pubsubTopic = pubsubTopic, msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(), - publishTime = getNowInNanosecondTime() + publishTime = getNowInNanosecondTime(), + numPeers = numPeers - return ok() + # TODO: investigate if we can return error in case numPeers is 0 + ok(numPeers) proc mountRelay*( node: WakuNode, diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 65b2093bb..a31527035 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -27,38 +27,41 @@ import libp2p/protocols/mix/mix_protocol import - ../waku_core, - ../waku_core/topics/sharding, - ../waku_relay, - ../waku_archive, - ../waku_archive_legacy, - ../waku_store_legacy/protocol as legacy_store, - ../waku_store_legacy/client as legacy_store_client, - ../waku_store_legacy/common as legacy_store_common, - ../waku_store/protocol as store, - ../waku_store/client as store_client, - ../waku_store/common as store_common, - ../waku_store/resume, - ../waku_store_sync, - ../waku_filter_v2, - ../waku_filter_v2/client as filter_client, - ../waku_metadata, - ../waku_rendezvous/protocol, - ../waku_rendezvous/client as rendezvous_client, - ../waku_rendezvous/waku_peer_record, - ../waku_lightpush_legacy/client as legacy_ligntpuhs_client, - ../waku_lightpush_legacy as legacy_lightpush_protocol, - ../waku_lightpush/client as ligntpuhs_client, - ../waku_lightpush as lightpush_protocol, - ../waku_enr, - ../waku_peer_exchange, - ../waku_rln_relay, + waku/[ + waku_core, + waku_core/topics/sharding, + waku_relay, + waku_archive, + waku_archive_legacy, + waku_store_legacy/protocol as legacy_store, + waku_store_legacy/client as legacy_store_client, + waku_store_legacy/common as legacy_store_common, + waku_store/protocol as store, + waku_store/client as store_client, + waku_store/common as store_common, + waku_store/resume, + waku_store_sync, + waku_filter_v2, + waku_filter_v2/client as filter_client, + waku_metadata, + waku_rendezvous/protocol, + waku_rendezvous/client as rendezvous_client, + waku_rendezvous/waku_peer_record, + waku_lightpush_legacy/client as legacy_ligntpuhs_client, + waku_lightpush_legacy as legacy_lightpush_protocol, + waku_lightpush/client as ligntpuhs_client, + waku_lightpush as lightpush_protocol, + waku_enr, + waku_peer_exchange, + waku_rln_relay, + common/rate_limit/setting, + common/callbacks, + common/nimchronos, + waku_mix, + requests/node_requests, + ], ./net_config, - ./peer_manager, - ../common/rate_limit/setting, - ../common/callbacks, - ../common/nimchronos, - ../waku_mix + ./peer_manager declarePublicCounter waku_node_messages, "number of messages received", ["type"] @@ -131,6 +134,18 @@ type rateLimitSettings*: ProtocolRateLimitSettings wakuMix*: WakuMix +proc deduceRelayShard( + node: WakuNode, + contentTopic: ContentTopic, + pubsubTopicOp: Option[PubsubTopic] = none[PubsubTopic](), +): Result[RelayShard, string] = + let pubsubTopic = pubsubTopicOp.valueOr: + if node.wakuAutoSharding.isNone(): + return err("Pubsub topic must be specified when static sharding is enabled.") + node.wakuAutoSharding.get().getShard(contentTopic).valueOr: + let msg = "Deducing shard failed: " & error + return err(msg) + proc getShardsGetter(node: WakuNode): GetShards = return proc(): seq[uint16] {.closure, gcsafe, raises: [].} = # fetch pubsubTopics subscribed to relay and convert them to shards @@ -252,6 +267,7 @@ proc mountAutoSharding*( info "mounting auto sharding", clusterId = clusterId, shardCount = shardCount node.wakuAutoSharding = some(Sharding(clusterId: clusterId, shardCountGenZero: shardCount)) + return ok() proc getMixNodePoolSize*(node: WakuNode): int = @@ -443,6 +459,20 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] return ok() +proc startProvidersAndListeners*(node: WakuNode) = + RequestRelayShard.setProvider( + proc( + pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic + ): Future[Result[RequestRelayShard, string]] {.async.} = + let shard = node.deduceRelayShard(contentTopic, pubsubTopic).valueOr: + return err($error) + return ok(RequestRelayShard(relayShard: shard)) + ).isOkOr: + error "Can't set proveder for RequestRelayShard", error = error + +proc stopProvidersAndListeners*(node: WakuNode) = + RequestRelayShard.clearProvider() + proc start*(node: WakuNode) {.async.} = ## Starts a created Waku Node and ## all its mounted protocols. @@ -491,6 +521,8 @@ proc start*(node: WakuNode) {.async.} = ## The switch will update addresses after start using the addressMapper await node.switch.start() + node.startProvidersAndListeners() + node.started = true if not zeroPortPresent: @@ -503,6 +535,9 @@ proc start*(node: WakuNode) {.async.} = proc stop*(node: WakuNode) {.async.} = ## By stopping the switch we are stopping all the underlying mounted protocols + + node.stopProvidersAndListeners() + await node.switch.stop() node.peerManager.stop() diff --git a/waku/requests/health_request.nim b/waku/requests/health_request.nim new file mode 100644 index 000000000..6b3bc786c --- /dev/null +++ b/waku/requests/health_request.nim @@ -0,0 +1,23 @@ +import waku/common/broker/[request_broker, multi_request_broker] + +import waku/api/types +import waku/node/health_monitor/[protocol_health, topic_health] +import waku/waku_core/topics + +export protocol_health, topic_health + +RequestBroker: + type RequestNodeHealth* = object + healthStatus*: NodeHealth + +RequestBroker: + type RequestRelayTopicsHealth* = object + topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]] + + proc signature( + topics: seq[PubsubTopic] + ): Future[Result[RequestRelayTopicsHealth, string]] {.async.} + +MultiRequestBroker: + type RequestProtocolHealth* = object + healthStatus*: ProtocolHealth diff --git a/waku/requests/node_requests.nim b/waku/requests/node_requests.nim new file mode 100644 index 000000000..48b5617fb --- /dev/null +++ b/waku/requests/node_requests.nim @@ -0,0 +1,11 @@ +import std/options +import waku/common/broker/[request_broker, multi_request_broker] +import waku/waku_core/[topics] + +RequestBroker: + type RequestRelayShard* = object + relayShard*: RelayShard + + proc signature( + pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic + ): Future[Result[RequestRelayShard, string]] {.async.} diff --git a/waku/requests/requests.nim b/waku/requests/requests.nim new file mode 100644 index 000000000..03e10f882 --- /dev/null +++ b/waku/requests/requests.nim @@ -0,0 +1,3 @@ +import ./[health_request, rln_requests, node_requests] + +export health_request, rln_requests, node_requests diff --git a/waku/requests/rln_requests.nim b/waku/requests/rln_requests.nim new file mode 100644 index 000000000..8b61f9fcd --- /dev/null +++ b/waku/requests/rln_requests.nim @@ -0,0 +1,9 @@ +import waku/common/broker/request_broker, waku/waku_core/message/message + +RequestBroker: + type RequestGenerateRlnProof* = object + proof*: seq[byte] + + proc signature( + message: WakuMessage, senderEpoch: float64 + ): Future[Result[RequestGenerateRlnProof, string]] {.async.} diff --git a/waku/waku_core/event/event_emitter.nim b/waku/waku_core/event/event_emitter.nim new file mode 100644 index 000000000..ba6fd481c --- /dev/null +++ b/waku/waku_core/event/event_emitter.nim @@ -0,0 +1,20 @@ + + +type + EventEmitter* = object + # Placeholder for future event emitter implementation + observers*: seq[proc (data: EventData): void] + + +proc initEventEmitter*(): EventEmitter = + EventEmitter(observers: @[]) + +proc emitEvent*(emitter: var EventEmitter, data: EventData) = + for observer in emitter.observers: + asyncSpawn observer(data) + +proc subscribeToEvent*(emitter: var EventEmitter, observer: proc (data: EventData): void) = + emitter.observers.add(observer) + +proc unsubscribeFromEvent*(emitter: var EventEmitter, observer: proc (data: EventData): void) = + emitter.observers = emitter.observers.filterIt(it != observer) diff --git a/waku/waku_core/message/digest.nim b/waku/waku_core/message/digest.nim index 8b99abd7e..dd8d9433a 100644 --- a/waku/waku_core/message/digest.nim +++ b/waku/waku_core/message/digest.nim @@ -19,6 +19,11 @@ func shortLog*(hash: WakuMessageHash): string = func `$`*(hash: WakuMessageHash): string = shortLog(hash) +func toString*(hash: WakuMessageHash): string = + var hexhash = newStringOfCap(64) + hexhash &= hash.toOpenArray(hash.low, hash.high).to0xHex() + hexhash + const EmptyWakuMessageHash*: WakuMessageHash = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index c42bca3db..65aa0c8fa 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -10,9 +10,7 @@ import bearssl/rand, stew/byteutils import - ../node/peer_manager, - ../node/delivery_monitor/subscriptions_observer, - ../waku_core, + waku/[node/peer_manager, waku_core, events/delivery_events], ./common, ./protocol_metrics, ./rpc_codec, @@ -25,16 +23,12 @@ type WakuFilterClient* = ref object of LPProtocol rng: ref HmacDrbgContext peerManager: PeerManager pushHandlers: seq[FilterPushHandler] - subscrObservers: seq[SubscriptionObserver] func generateRequestId(rng: ref HmacDrbgContext): string = var bytes: array[10, byte] hmacDrbgGenerate(rng[], bytes) return toHex(bytes) -proc addSubscrObserver*(wfc: WakuFilterClient, obs: SubscriptionObserver) = - wfc.subscrObservers.add(obs) - proc sendSubscribeRequest( wfc: WakuFilterClient, servicePeer: RemotePeerInfo, @@ -132,8 +126,7 @@ proc subscribe*( ?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) - for obs in wfc.subscrObservers: - obs.onSubscribe(pubSubTopic, contentTopicSeq) + OnFilterSubscribeEvent.emit(pubSubTopic, contentTopicSeq) return ok() @@ -156,8 +149,7 @@ proc unsubscribe*( ?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) - for obs in wfc.subscrObservers: - obs.onUnsubscribe(pubSubTopic, contentTopicSeq) + OnFilterUnSubscribeEvent.emit(pubSubTopic, contentTopicSeq) return ok() diff --git a/waku/waku_lightpush/callbacks.nim b/waku/waku_lightpush/callbacks.nim index bde4e3e26..ac2e562b6 100644 --- a/waku/waku_lightpush/callbacks.nim +++ b/waku/waku_lightpush/callbacks.nim @@ -31,7 +31,7 @@ proc checkAndGenerateRLNProof*( proc getNilPushHandler*(): PushMessageHandler = return proc( - peer: PeerId, pubsubTopic: string, message: WakuMessage + pubsubTopic: string, message: WakuMessage ): Future[WakuLightPushResult] {.async.} = return lightpushResultInternalError("no waku relay found") @@ -39,7 +39,7 @@ proc getRelayPushHandler*( wakuRelay: WakuRelay, rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]() ): PushMessageHandler = return proc( - peer: PeerId, pubsubTopic: string, message: WakuMessage + pubsubTopic: string, message: WakuMessage ): Future[WakuLightPushResult] {.async.} = # append RLN proof let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message).valueOr: diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index b528b4c76..f0e20d51f 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -5,7 +5,6 @@ import libp2p/peerid, libp2p/stream/connection import ../waku_core/peers, ../node/peer_manager, - ../node/delivery_monitor/publish_observer, ../utils/requests, ../waku_core, ./common, @@ -19,16 +18,12 @@ logScope: type WakuLightPushClient* = ref object rng*: ref rand.HmacDrbgContext peerManager*: PeerManager - publishObservers: seq[PublishObserver] proc new*( T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext ): T = WakuLightPushClient(peerManager: peerManager, rng: rng) -proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) = - wl.publishObservers.add(obs) - proc ensureTimestampSet(message: var WakuMessage) = if message.timestamp == 0: message.timestamp = getNowInNanosecondTime() @@ -40,36 +35,43 @@ func shortPeerId(peer: PeerId): string = func shortPeerId(peer: RemotePeerInfo): string = shortLog(peer.peerId) -proc sendPushRequestToConn( - wl: WakuLightPushClient, request: LightPushRequest, conn: Connection +proc sendPushRequest( + wl: WakuLightPushClient, + req: LightPushRequest, + peer: PeerId | RemotePeerInfo, + conn: Option[Connection] = none(Connection), ): Future[WakuLightPushResult] {.async.} = - try: - await conn.writeLp(request.encode().buffer) - except LPStreamRemoteClosedError: - error "Failed to write request to peer", error = getCurrentExceptionMsg() - return lightpushResultInternalError( - "Failed to write request to peer: " & getCurrentExceptionMsg() - ) + let connection = conn.valueOr: + (await wl.peerManager.dialPeer(peer, WakuLightPushCodec)).valueOr: + waku_lightpush_v3_errors.inc(labelValues = [dialFailure]) + return lighpushErrorResult( + LightPushErrorCode.NO_PEERS_TO_RELAY, + dialFailure & ": " & $peer & " is not accessible", + ) + + defer: + await connection.closeWithEOF() + + await connection.writeLP(req.encode().buffer) var buffer: seq[byte] try: - buffer = await conn.readLp(DefaultMaxRpcSize.int) + buffer = await connection.readLp(DefaultMaxRpcSize.int) except LPStreamRemoteClosedError: error "Failed to read response from peer", error = getCurrentExceptionMsg() return lightpushResultInternalError( "Failed to read response from peer: " & getCurrentExceptionMsg() ) + let response = LightpushResponse.decode(buffer).valueOr: - error "failed to decode response", error = $error + error "failed to decode response" waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure]) return lightpushResultInternalError(decodeRpcFailure) - let requestIdMismatch = response.requestId != request.requestId - let tooManyRequests = response.statusCode == LightPushErrorCode.TOO_MANY_REQUESTS - if requestIdMismatch and (not tooManyRequests): - # response with TOO_MANY_REQUESTS error code has no requestId by design + if response.requestId != req.requestId and + response.statusCode != LightPushErrorCode.TOO_MANY_REQUESTS: error "response failure, requestId mismatch", - requestId = request.requestId, responseRequestId = response.requestId + requestId = req.requestId, responseRequestId = response.requestId return lightpushResultInternalError("response failure, requestId mismatch") return toPushResult(response) @@ -80,37 +82,34 @@ proc publish*( wakuMessage: WakuMessage, dest: Connection | PeerId | RemotePeerInfo, ): Future[WakuLightPushResult] {.async, gcsafe.} = - let conn = - when dest is Connection: - dest - else: - (await wl.peerManager.dialPeer(dest, WakuLightPushCodec)).valueOr: - waku_lightpush_v3_errors.inc(labelValues = [dialFailure]) - return lighpushErrorResult( - LightPushErrorCode.NO_PEERS_TO_RELAY, - "Peer is not accessible: " & dialFailure & " - " & $dest, - ) - - defer: - await conn.closeWithEOF() - var message = wakuMessage ensureTimestampSet(message) let msgHash = computeMessageHash(pubSubTopic.get(""), message).to0xHex() + + let peerIdStr = + when dest is Connection: + shortPeerId(dest.peerId) + else: + shortPeerId(dest) + info "publish", myPeerId = wl.peerManager.switch.peerInfo.peerId, - peerId = shortPeerId(conn.peerId), + peerId = peerIdStr, msgHash = msgHash, sentTime = getNowInNanosecondTime() let request = LightpushRequest( requestId: generateRequestId(wl.rng), pubsubTopic: pubSubTopic, message: message ) - let relayPeerCount = ?await wl.sendPushRequestToConn(request, conn) - for obs in wl.publishObservers: - obs.onMessagePublished(pubSubTopic.get(""), message) + let relayPeerCount = + when dest is Connection: + ?await wl.sendPushRequest(request, dest.peerId, some(dest)) + elif dest is RemotePeerInfo: + ?await wl.sendPushRequest(request, dest) + else: + ?await wl.sendPushRequest(request, dest) return lightpushSuccessResult(relayPeerCount) @@ -124,3 +123,12 @@ proc publishToAny*( LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers" ) return await wl.publish(some(pubsubTopic), wakuMessage, peer) + +proc publishWithConn*( + wl: WakuLightPushClient, + pubSubTopic: PubsubTopic, + message: WakuMessage, + conn: Connection, + destPeer: PeerId, +): Future[WakuLightPushResult] {.async, gcsafe.} = + return await wl.publish(some(pubSubTopic), message, conn) diff --git a/waku/waku_lightpush/common.nim b/waku/waku_lightpush/common.nim index 9c2ea7ced..f0762e2d2 100644 --- a/waku/waku_lightpush/common.nim +++ b/waku/waku_lightpush/common.nim @@ -25,7 +25,7 @@ type ErrorStatus* = tuple[code: LightpushStatusCode, desc: Option[string]] type WakuLightPushResult* = Result[uint32, ErrorStatus] type PushMessageHandler* = proc( - peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + pubsubTopic: PubsubTopic, message: WakuMessage ): Future[WakuLightPushResult] {.async.} const TooManyRequestsMessage* = "Request rejected due to too many requests" @@ -39,7 +39,7 @@ func toPushResult*(response: LightPushResponse): WakuLightPushResult = return ( if (relayPeerCount == 0): # Consider publishing to zero peers an error even if the service node - # sent us a "successful" response with zero peers + # sent us a "successful" response with zero peers err((LightPushErrorCode.NO_PEERS_TO_RELAY, response.statusDesc)) else: ok(relayPeerCount) diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 95bfc003e..ecbff8461 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -71,7 +71,7 @@ proc handleRequest( msg_hash = msg_hash, receivedTime = getNowInNanosecondTime() - let res = (await wl.pushHandler(peerId, pubsubTopic, pushRequest.message)).valueOr: + let res = (await wl.pushHandler(pubsubTopic, pushRequest.message)).valueOr: return err((code: error.code, desc: error.desc)) return ok(res) diff --git a/waku/waku_lightpush_legacy/callbacks.nim b/waku/waku_lightpush_legacy/callbacks.nim index 1fe4cf302..a5b88b5b8 100644 --- a/waku/waku_lightpush_legacy/callbacks.nim +++ b/waku/waku_lightpush_legacy/callbacks.nim @@ -30,7 +30,7 @@ proc checkAndGenerateRLNProof*( proc getNilPushHandler*(): PushMessageHandler = return proc( - peer: PeerId, pubsubTopic: string, message: WakuMessage + pubsubTopic: string, message: WakuMessage ): Future[WakuLightPushResult[void]] {.async.} = return err("no waku relay found") @@ -38,7 +38,7 @@ proc getRelayPushHandler*( wakuRelay: WakuRelay, rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]() ): PushMessageHandler = return proc( - peer: PeerId, pubsubTopic: string, message: WakuMessage + pubsubTopic: string, message: WakuMessage ): Future[WakuLightPushResult[void]] {.async.} = # append RLN proof let msgWithProof = ?checkAndGenerateRLNProof(rlnPeer, message) diff --git a/waku/waku_lightpush_legacy/client.nim b/waku/waku_lightpush_legacy/client.nim index 0e3c9bd6f..ab489bec9 100644 --- a/waku/waku_lightpush_legacy/client.nim +++ b/waku/waku_lightpush_legacy/client.nim @@ -5,7 +5,6 @@ import libp2p/peerid import ../waku_core/peers, ../node/peer_manager, - ../node/delivery_monitor/publish_observer, ../utils/requests, ../waku_core, ./common, @@ -19,7 +18,6 @@ logScope: type WakuLegacyLightPushClient* = ref object peerManager*: PeerManager rng*: ref rand.HmacDrbgContext - publishObservers: seq[PublishObserver] proc new*( T: type WakuLegacyLightPushClient, @@ -28,9 +26,6 @@ proc new*( ): T = WakuLegacyLightPushClient(peerManager: peerManager, rng: rng) -proc addPublishObserver*(wl: WakuLegacyLightPushClient, obs: PublishObserver) = - wl.publishObservers.add(obs) - proc sendPushRequest( wl: WakuLegacyLightPushClient, req: PushRequest, peer: PeerId | RemotePeerInfo ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = @@ -86,9 +81,6 @@ proc publish*( let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) ?await wl.sendPushRequest(pushRequest, peer) - for obs in wl.publishObservers: - obs.onMessagePublished(pubSubTopic, message) - notice "publishing message with lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, @@ -111,7 +103,4 @@ proc publishToAny*( let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) ?await wl.sendPushRequest(pushRequest, peer) - for obs in wl.publishObservers: - obs.onMessagePublished(pubSubTopic, message) - return ok() diff --git a/waku/waku_lightpush_legacy/common.nim b/waku/waku_lightpush_legacy/common.nim index fcdf1814c..1b40ba72b 100644 --- a/waku/waku_lightpush_legacy/common.nim +++ b/waku/waku_lightpush_legacy/common.nim @@ -9,7 +9,7 @@ export WakuLegacyLightPushCodec type WakuLightPushResult*[T] = Result[T, string] type PushMessageHandler* = proc( - peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + pubsubTopic: PubsubTopic, message: WakuMessage ): Future[WakuLightPushResult[void]] {.async.} const TooManyRequestsMessage* = "TOO_MANY_REQUESTS" diff --git a/waku/waku_lightpush_legacy/protocol.nim b/waku/waku_lightpush_legacy/protocol.nim index d51943cff..72fc963ee 100644 --- a/waku/waku_lightpush_legacy/protocol.nim +++ b/waku/waku_lightpush_legacy/protocol.nim @@ -53,7 +53,7 @@ proc handleRequest*( msg_hash = msg_hash, receivedTime = getNowInNanosecondTime() - let handleRes = await wl.pushHandler(peerId, pubsubTopic, message) + let handleRes = await wl.pushHandler(pubsubTopic, message) isSuccess = handleRes.isOk() pushResponseInfo = (if isSuccess: "OK" else: handleRes.error) diff --git a/waku/waku_relay.nim b/waku/waku_relay.nim index 96328d984..a91033cf1 100644 --- a/waku/waku_relay.nim +++ b/waku/waku_relay.nim @@ -1,3 +1,4 @@ -import ./waku_relay/[protocol, topic_health] +import ./waku_relay/protocol +import waku/node/health_monitor/topic_health export protocol, topic_health diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index cbf9123dd..47a898b6d 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -17,8 +17,12 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/stream/connection, libp2p/switch + import - ../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer + waku/waku_core, + waku/node/health_monitor/topic_health, + waku/requests/health_request, + ./message_id from ../waku_core/codecs import WakuRelayCodec export WakuRelayCodec @@ -157,7 +161,6 @@ type # map topic with its assigned validator within pubsub topicHandlers: Table[PubsubTopic, TopicHandler] # map topic with the TopicHandler proc in charge of attending topic's incoming message events - publishObservers: seq[PublishObserver] topicsHealth*: Table[string, TopicHealth] onTopicHealthChange*: TopicHealthChangeHandler topicHealthLoopHandle*: Future[void] @@ -321,6 +324,19 @@ proc initRelayObservers(w: WakuRelay) = w.addObserver(administrativeObserver) +proc initRequestProviders(w: WakuRelay) = + RequestRelayTopicsHealth.setProvider( + proc( + topics: seq[PubsubTopic] + ): Future[Result[RequestRelayTopicsHealth, string]] {.async.} = + var collectedRes: RequestRelayTopicsHealth + for topic in topics: + let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED) + collectedRes.topicHealth.add((topic, health)) + return ok(collectedRes) + ).isOkOr: + error "Cannot set Relay Topics Health request provider", error = error + proc new*( T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize) ): WakuRelayResult[T] = @@ -340,9 +356,10 @@ proc new*( ) procCall GossipSub(w).initPubSub() + w.topicsHealth = initTable[string, TopicHealth]() w.initProtocolHandler() w.initRelayObservers() - w.topicsHealth = initTable[string, TopicHealth]() + w.initRequestProviders() except InitializationError: return err("initialization error: " & getCurrentExceptionMsg()) @@ -353,12 +370,6 @@ proc addValidator*( ) {.gcsafe.} = w.wakuValidators.add((handler, errorMessage)) -proc addPublishObserver*(w: WakuRelay, obs: PublishObserver) = - ## Observer when the api client performed a publish operation. This - ## is initially aimed for bringing an additional layer of delivery reliability thanks - ## to store - w.publishObservers.add(obs) - proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = ## Observes when a message is sent/received from the GossipSub PoV procCall GossipSub(w).addObserver(observer) @@ -628,9 +639,6 @@ proc publish*( if relayedPeerCount <= 0: return err(NoPeersToPublish) - for obs in w.publishObservers: - obs.onMessagePublished(pubSubTopic, message) - return ok(relayedPeerCount) proc getConnectedPubSubPeers*( diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 6a8fea2b5..a5f0850b4 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -24,10 +24,13 @@ import ./nonce_manager import - ../common/error_handling, - ../waku_relay, # for WakuRelayHandler - ../waku_core, - ../waku_keystore + waku/[ + common/error_handling, + waku_relay, # for WakuRelayHandler + waku_core, + requests/rln_requests, + waku_keystore, + ] logScope: topics = "waku rln_relay" @@ -91,6 +94,7 @@ proc stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} = # stop the group sync, and flush data to tree db info "stopping rln-relay" + RequestGenerateRlnProof.clearProvider() await rlnPeer.groupManager.stop() proc hasDuplicate*( @@ -275,11 +279,11 @@ proc validateMessageAndUpdateLog*( return isValidMessage -proc appendRLNProof*( - rlnPeer: WakuRLNRelay, msg: var WakuMessage, senderEpochTime: float64 -): RlnRelayResult[void] = - ## returns true if it can create and append a `RateLimitProof` to the supplied `msg` - ## returns false otherwise +proc createRlnProof( + rlnPeer: WakuRLNRelay, msg: WakuMessage, senderEpochTime: float64 +): RlnRelayResult[seq[byte]] = + ## returns a new `RateLimitProof` for the supplied `msg` + ## returns an error if it cannot create the proof ## `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds. ## The `epoch` field of `RateLimitProof` is derived from the provided `senderEpochTime` (using `calcEpoch()`) @@ -291,7 +295,14 @@ proc appendRLNProof*( let proof = rlnPeer.groupManager.generateProof(input, epoch, nonce).valueOr: return err("could not generate rln-v2 proof: " & $error) - msg.proof = proof.encode().buffer + return ok(proof.encode().buffer) + +proc appendRLNProof*( + rlnPeer: WakuRLNRelay, msg: var WakuMessage, senderEpochTime: float64 +): RlnRelayResult[void] = + msg.proof = rlnPeer.createRlnProof(msg, senderEpochTime).valueOr: + return err($error) + return ok() proc clearNullifierLog*(rlnPeer: WakuRlnRelay) = @@ -438,6 +449,18 @@ proc mount( # Start epoch monitoring in the background wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay) + + RequestGenerateRlnProof.setProvider( + proc( + msg: WakuMessage, senderEpochTime: float64 + ): Future[Result[RequestGenerateRlnProof, string]] {.async.} = + let proof = createRlnProof(wakuRlnRelay, msg, senderEpochTime).valueOr: + return err("Could not create RLN proof: " & $error) + + return ok(RequestGenerateRlnProof(proof: proof)) + ).isOkOr: + return err("Proof generator provider cannot be set") + return ok(wakuRlnRelay) proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} = From a9bd1f2f8cbb29965096324b5d70d5113c79126f Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Sun, 14 Dec 2025 10:45:54 +0100 Subject: [PATCH 2/8] Fix edge mode config and test added --- examples/api_example/api_example.nim | 25 +++++++++++++++++++++++++ tests/api/test_node_conf.nim | 21 +++++++++++++++++++++ waku/api/api_conf.nim | 10 +++++++++- 3 files changed, 55 insertions(+), 1 deletion(-) diff --git a/examples/api_example/api_example.nim b/examples/api_example/api_example.nim index f2524f319..adfb5bbd7 100644 --- a/examples/api_example/api_example.nim +++ b/examples/api_example/api_example.nim @@ -8,6 +8,29 @@ type CliArgs = object .}: string proc periodicSender(w: Waku): Future[void] {.async.} = + let sentListener = MessageSentEvent.listen( + proc(event: MessageSentEvent) {.async: (raises: []).} = + echo "Message sent with request ID: ", + event.requestId, " hash: ", event.messageHash + ) + + let errorListener = MessageErrorEvent.listen( + proc(event: MessageErrorEvent) {.async: (raises: []).} = + echo "Message failed to send with request ID: ", + event.requestId, " error: ", event.error + ) + + let propagatedListener = MessagePropagatedEvent.listen( + proc(event: MessagePropagatedEvent) {.async: (raises: []).} = + echo "Message propagated with request ID: ", + event.requestId, " hash: ", event.messageHash + ) + + defer: + MessageSentEvent.dropListener(sentListener) + MessageErrorEvent.dropListener(errorListener) + MessagePropagatedEvent.dropListener(propagatedListener) + ## Periodically sends a Waku message every 30 seconds var counter = 0 while true: @@ -20,6 +43,8 @@ proc periodicSender(w: Waku): Future[void] {.async.} = echo "Failed to send message: ", error quit(QuitFailure) + echo "Sending message with request ID: ", sendRequestId, " counter: ", counter + counter += 1 await sleepAsync(30.seconds) diff --git a/tests/api/test_node_conf.nim b/tests/api/test_node_conf.nim index 232ffc7d2..4dfbd4b51 100644 --- a/tests/api/test_node_conf.nim +++ b/tests/api/test_node_conf.nim @@ -21,6 +21,27 @@ suite "LibWaku Conf - toWakuConf": wakuConf.shardingConf.numShardsInCluster == 8 wakuConf.staticNodes.len == 0 + test "Edge mode configuration": + ## Given + let protocolsConfig = ProtocolsConfig.init(entryNodes = @[], clusterId = 1) + + let nodeConfig = NodeConfig.init(mode = Edge, protocolsConfig = protocolsConfig) + + ## When + let wakuConfRes = toWakuConf(nodeConfig) + + ## Then + require wakuConfRes.isOk() + let wakuConf = wakuConfRes.get() + require wakuConf.validate().isOk() + check: + wakuConf.relay == false + wakuConf.lightPush == false + wakuConf.filterServiceConf.isSome() == false + wakuConf.storeServiceConf.isSome() == false + wakuConf.peerExchangeService == true + wakuConf.clusterId == 1 + test "Core mode configuration": ## Given let protocolsConfig = ProtocolsConfig.init(entryNodes = @[], clusterId = 1) diff --git a/waku/api/api_conf.nim b/waku/api/api_conf.nim index 692ada821..5dd0dcc30 100644 --- a/waku/api/api_conf.nim +++ b/waku/api/api_conf.nim @@ -133,7 +133,15 @@ proc toWakuConf*(nodeConfig: NodeConfig): Result[WakuConf, string] = of Edge: # All client side protocols are mounted by default # Peer exchange client is always enabled and start_node will start the px loop - discard + # Metadata is always mounted + b.withPeerExchange(true) + # switch off all service side protocols and relay + b.withRelay(false) + b.filterServiceConf.withEnabled(false) + b.withLightPush(false) + b.storeServiceConf.withEnabled(false) + # Leave discv5 and rendezvous for user choice + ## Network Conf let protocolsConfig = nodeConfig.protocolsConfig From 38ba74135d4e0b46b8728621f6a9c9b9acbe3d1c Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Mon, 15 Dec 2025 13:05:07 +0100 Subject: [PATCH 3/8] Fix some import issues, start and stop waku shall not throw exception but return with result properly --- tests/api/test_api_send.nim | 153 ++++++++++++++++++++++++++++++++++++ waku.nim | 4 +- waku/api.nim | 3 +- waku/factory/waku.nim | 77 +++++++++++------- 4 files changed, 204 insertions(+), 33 deletions(-) create mode 100644 tests/api/test_api_send.nim diff --git a/tests/api/test_api_send.nim b/tests/api/test_api_send.nim new file mode 100644 index 000000000..f14d57d44 --- /dev/null +++ b/tests/api/test_api_send.nim @@ -0,0 +1,153 @@ +{.used.} + +import + std/[options, sequtils, strutils], + chronos, + testutils/unittests, + stew/byteutils, + libp2p/[switch, peerinfo] +import ../testlib/[common, wakucore, wakunode, testasync, futures, testutils] +import + waku, + waku/ + [ + waku_node, + waku_core, + waku_relay/protocol, + waku_filter_v2/common, + waku_store/common, + ] +import waku/api/api_conf, waku/factory/waku_conf, waku/factory/networks_config + +suite "Waku API - Send": + var + relayNode1 {.threadvar.}: WakuNode + relayNode1PeerInfo {.threadvar.}: RemotePeerInfo + relayNode1PeerId {.threadvar.}: PeerId + + relayNode2 {.threadvar.}: WakuNode + relayNode2PeerInfo {.threadvar.}: RemotePeerInfo + relayNode2PeerId {.threadvar.}: PeerId + + lightpushNode {.threadvar.}: WakuNode + lightpushNodePeerInfo {.threadvar.}: RemotePeerInfo + lightpushNodePeerId {.threadvar.}: PeerId + + storeNode {.threadvar.}: WakuNode + storeNodePeerInfo {.threadvar.}: RemotePeerInfo + storeNodePeerId {.threadvar.}: PeerId + + asyncSetup: + # handlerFuture = newPushHandlerFuture() + # handler = proc( + # peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + # ): Future[WakuLightPushResult[void]] {.async.} = + # handlerFuture.complete((pubsubTopic, message)) + # return ok() + + relayNode1 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + relayNode2 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + lightpushNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + storeNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + await allFutures( + relayNode1.start(), relayNode2.start(), lightpushNode.start(), storeNode.start() + ) + + (await relayNode1.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + + (await relayNode2.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + + (await lightpushNode.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + await lightpushNode.mountLightPush() + + (await storeNode.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + await storeNode.mountStore() + + relayNode1PeerInfo = relayNode1.peerInfo.toRemotePeerInfo() + relayNode1PeerId = relayNode1.peerInfo.peerId + + relayNode2PeerInfo = relayNode2.peerInfo.toRemotePeerInfo() + relayNode2PeerId = relayNode2.peerInfo.peerId + + lightpushNodePeerInfo = lightpushNode.peerInfo.toRemotePeerInfo() + lightpushNodePeerId = lightpushNode.peerInfo.peerId + + storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo() + storeNodePeerId = storeNode.peerInfo.peerId + asyncTeardown: + await allFutures( + relayNode1.stop(), relayNode2.stop(), lightpushNode.stop(), storeNode.stop() + ) + + asyncTest "Check API availability (unhealthy node)": + # Create a node config that doesn't start or has no peers + let nodeConfig = NodeConfig.init( + mode = WakuMode.Core, + protocolsConfig = ProtocolsConfig.init( + entryNodes = @[], + clusterId = 1, + autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), + ), + ) + + let wakuConfRes = toWakuConf(nodeConfig) + + ## Then + require wakuConfRes.isOk() + let wakuConf = wakuConfRes.get() + require wakuConf.validate().isOk() + check: + wakuConf.clusterId == 1 + wakuConf.shardingConf.numShardsInCluster == 1 + + var node = (await createNode(nodeConfig)).valueOr: + raiseAssert error + + let sentListener = MessageSentEvent.listen( + proc(event: MessageSentEvent) {.async: (raises: []).} = + raiseAssert "Should not be called" + ).valueOr: + raiseAssert error + + let errorListener = MessageErrorEvent.listen( + proc(event: MessageErrorEvent) {.async: (raises: []).} = + check true + ).valueOr: + raiseAssert error + + let propagatedListener = MessagePropagatedEvent.listen( + proc(event: MessagePropagatedEvent) {.async: (raises: []).} = + raiseAssert "Should not be called" + ).valueOr: + raiseAssert error + defer: + MessageSentEvent.dropListener(sentListener) + MessageErrorEvent.dropListener(errorListener) + MessagePropagatedEvent.dropListener(propagatedListener) + + let envelope = MessageEnvelope.init( + ContentTopic("/waku/2/default-content/proto"), "test payload" + ) + + let sendResult = await node.send(envelope) + + if sendResult.isErr(): + echo "Send error: ", sendResult.error + + check: + sendResult.isErr() + # Depending on implementation, it might say "not healthy" + sendResult.error.contains("not healthy") + + (await node.stop()).isOkOr: + raiseAssert "Failed to stop node: " & error diff --git a/waku.nim b/waku.nim index 12e69fdaf..65a017c5a 100644 --- a/waku.nim +++ b/waku.nim @@ -3,8 +3,8 @@ ## This module re-exports the public API for creating and managing Waku nodes ## when using nwaku as a library dependency. -import waku/api/[api, api_conf, types] -export api, api_conf, types +import waku/api +export api import waku/factory/waku export waku diff --git a/waku/api.nim b/waku/api.nim index c3211867d..110a8f431 100644 --- a/waku/api.nim +++ b/waku/api.nim @@ -1,3 +1,4 @@ import ./api/[api, api_conf, entry_nodes] +import ./events/message_events -export api, api_conf, entry_nodes +export api, api_conf, entry_nodes, message_events diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 057c78810..40b9110dd 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -200,9 +200,7 @@ proc new*( return err("Failed setting up app callbacks: " & $error) ## Delivery Monitor - let deliveryService = DeliveryService.new( - wakuConf.p2pReliability, node, - ).valueOr: + let deliveryService = DeliveryService.new(wakuConf.p2pReliability, node).valueOr: return err("could not create delivery service: " & $error) var waku = Waku( @@ -350,7 +348,7 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} = error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg() return -proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = +proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: []).} = if waku[].node.started: warn "startWaku: waku node already started" return ok() @@ -360,9 +358,15 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = if conf.dnsDiscoveryConf.isSome(): let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get() - let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes( - dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers - ) + let dynamicBootstrapNodesRes = + try: + await waku_dnsdisc.retrieveDynamicBootstrapNodes( + dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers + ) + except CatchableError: + Result[seq[RemotePeerInfo], string].err( + "Retrieving dynamic bootstrap nodes failed: " & getCurrentExceptionMsg() + ) if dynamicBootstrapNodesRes.isErr(): error "Retrieving dynamic bootstrap nodes failed", @@ -376,8 +380,11 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = return err("error while calling startNode: " & $error) ## Update waku data that is set dynamically on node start - (await updateWaku(waku)).isOkOr: - return err("Error in updateApp: " & $error) + try: + (await updateWaku(waku)).isOkOr: + return err("Error in updateApp: " & $error) + except CatchableError: + return err("Error in updateApp: " & getCurrentExceptionMsg()) ## Discv5 if conf.discv5Conf.isSome(): @@ -419,44 +426,54 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = return err ("Starting protocols support REST server failed: " & $error) if conf.metricsServerConf.isSome(): - waku[].metricsServer = ( - await ( - waku_metrics.startMetricsServerAndLogging( - conf.metricsServerConf.get(), conf.portsShift + try: + waku[].metricsServer = ( + await ( + waku_metrics.startMetricsServerAndLogging( + conf.metricsServerConf.get(), conf.portsShift + ) ) + ).valueOr: + return err("Starting monitoring and external interfaces failed: " & error) + except CatchableError: + return err( + "Starting monitoring and external interfaces failed: " & getCurrentExceptionMsg() ) - ).valueOr: - return err("Starting monitoring and external interfaces failed: " & error) - waku[].healthMonitor.setOverallHealth(HealthStatus.READY) return ok() -proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} = +proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = ## Waku shutdown if not waku.node.started: warn "stop: attempting to stop node that isn't running" - waku.healthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN) + try: + waku.healthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN) - if not waku.metricsServer.isNil(): - await waku.metricsServer.stop() + if not waku.metricsServer.isNil(): + await waku.metricsServer.stop() - if not waku.wakuDiscv5.isNil(): - await waku.wakuDiscv5.stop() + if not waku.wakuDiscv5.isNil(): + await waku.wakuDiscv5.stop() - if not waku.node.isNil(): - await waku.node.stop() + if not waku.node.isNil(): + await waku.node.stop() - if not waku.dnsRetryLoopHandle.isNil(): - await waku.dnsRetryLoopHandle.cancelAndWait() + if not waku.dnsRetryLoopHandle.isNil(): + await waku.dnsRetryLoopHandle.cancelAndWait() - if not waku.healthMonitor.isNil(): - await waku.healthMonitor.stopHealthMonitor() + if not waku.healthMonitor.isNil(): + await waku.healthMonitor.stopHealthMonitor() - if not waku.restServer.isNil(): - await waku.restServer.stop() + if not waku.restServer.isNil(): + await waku.restServer.stop() + except Exception: + error "waku stop failed: " & getCurrentExceptionMsg() + return err("waku stop failed: " & getCurrentExceptionMsg()) + + return ok() proc isModeCoreAvailable*(waku: Waku): bool = return not waku.node.wakuRelay.isNil() From a308e839594e844ef3c49752cc4670d26f4e6d03 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Mon, 15 Dec 2025 13:10:02 +0100 Subject: [PATCH 4/8] cherry pick from chore-sync-mode-request-broker to get latest sync mode RequestBroker --- tests/common/test_request_broker.nim | 310 ++++++++++++++++++- waku/common/broker/request_broker.nim | 427 ++++++++++++++++++++------ 2 files changed, 633 insertions(+), 104 deletions(-) diff --git a/tests/common/test_request_broker.nim b/tests/common/test_request_broker.nim index 2ffd9cbf8..c7a8d8453 100644 --- a/tests/common/test_request_broker.nim +++ b/tests/common/test_request_broker.nim @@ -6,6 +6,10 @@ import std/strutils import waku/common/broker/request_broker +## --------------------------------------------------------------------------- +## Async-mode brokers + tests +## --------------------------------------------------------------------------- + RequestBroker: type SimpleResponse = object value*: string @@ -31,11 +35,14 @@ RequestBroker: suffix: string ): Future[Result[DualResponse, string]] {.async.} -RequestBroker: +RequestBroker(async): type ImplicitResponse = ref object note*: string -suite "RequestBroker macro": +static: + doAssert typeof(SimpleResponse.request()) is Future[Result[SimpleResponse, string]] + +suite "RequestBroker macro (async mode)": test "serves zero-argument providers": check SimpleResponse .setProvider( @@ -78,7 +85,6 @@ suite "RequestBroker macro": .setProvider( proc(key: string, subKey: int): Future[Result[KeyedResponse, string]] {.async.} = raise newException(ValueError, "simulated failure") - ok(KeyedResponse(key: key, payload: "")) ) .isOk() @@ -196,3 +202,301 @@ suite "RequestBroker macro": check nowSuccWithOverride.value.count == 1 DualResponse.clearProvider() + +## --------------------------------------------------------------------------- +## Sync-mode brokers + tests +## --------------------------------------------------------------------------- + +RequestBroker(sync): + type SimpleResponseSync = object + value*: string + + proc signatureFetch*(): Result[SimpleResponseSync, string] + +RequestBroker(sync): + type KeyedResponseSync = object + key*: string + payload*: string + + proc signatureFetchWithKey*( + key: string, subKey: int + ): Result[KeyedResponseSync, string] + +RequestBroker(sync): + type DualResponseSync = object + note*: string + count*: int + + proc signatureNoInput*(): Result[DualResponseSync, string] + proc signatureWithInput*(suffix: string): Result[DualResponseSync, string] + +RequestBroker(sync): + type ImplicitResponseSync = ref object + note*: string + +static: + doAssert typeof(SimpleResponseSync.request()) is Result[SimpleResponseSync, string] + doAssert not ( + typeof(SimpleResponseSync.request()) is Future[Result[SimpleResponseSync, string]] + ) + doAssert typeof(KeyedResponseSync.request("topic", 1)) is + Result[KeyedResponseSync, string] + +suite "RequestBroker macro (sync mode)": + test "serves zero-argument providers (sync)": + check SimpleResponseSync + .setProvider( + proc(): Result[SimpleResponseSync, string] = + ok(SimpleResponseSync(value: "hi")) + ) + .isOk() + + let res = SimpleResponseSync.request() + check res.isOk() + check res.value.value == "hi" + + SimpleResponseSync.clearProvider() + + test "zero-argument request errors when unset (sync)": + let res = SimpleResponseSync.request() + check res.isErr + check res.error.contains("no zero-arg provider") + + test "serves input-based providers (sync)": + var seen: seq[string] = @[] + check KeyedResponseSync + .setProvider( + proc(key: string, subKey: int): Result[KeyedResponseSync, string] = + seen.add(key) + ok(KeyedResponseSync(key: key, payload: key & "-payload+" & $subKey)) + ) + .isOk() + + let res = KeyedResponseSync.request("topic", 1) + check res.isOk() + check res.value.key == "topic" + check res.value.payload == "topic-payload+1" + check seen == @["topic"] + + KeyedResponseSync.clearProvider() + + test "catches provider exception (sync)": + check KeyedResponseSync + .setProvider( + proc(key: string, subKey: int): Result[KeyedResponseSync, string] = + raise newException(ValueError, "simulated failure") + ) + .isOk() + + let res = KeyedResponseSync.request("neglected", 11) + check res.isErr() + check res.error.contains("simulated failure") + + KeyedResponseSync.clearProvider() + + test "input request errors when unset (sync)": + let res = KeyedResponseSync.request("foo", 2) + check res.isErr + check res.error.contains("input signature") + + test "supports both provider types simultaneously (sync)": + check DualResponseSync + .setProvider( + proc(): Result[DualResponseSync, string] = + ok(DualResponseSync(note: "base", count: 1)) + ) + .isOk() + + check DualResponseSync + .setProvider( + proc(suffix: string): Result[DualResponseSync, string] = + ok(DualResponseSync(note: "base" & suffix, count: suffix.len)) + ) + .isOk() + + let noInput = DualResponseSync.request() + check noInput.isOk + check noInput.value.note == "base" + + let withInput = DualResponseSync.request("-extra") + check withInput.isOk + check withInput.value.note == "base-extra" + check withInput.value.count == 6 + + DualResponseSync.clearProvider() + + test "clearProvider resets both entries (sync)": + check DualResponseSync + .setProvider( + proc(): Result[DualResponseSync, string] = + ok(DualResponseSync(note: "temp", count: 0)) + ) + .isOk() + DualResponseSync.clearProvider() + + let res = DualResponseSync.request() + check res.isErr + + test "implicit zero-argument provider works by default (sync)": + check ImplicitResponseSync + .setProvider( + proc(): Result[ImplicitResponseSync, string] = + ok(ImplicitResponseSync(note: "auto")) + ) + .isOk() + + let res = ImplicitResponseSync.request() + check res.isOk + + ImplicitResponseSync.clearProvider() + check res.value.note == "auto" + + test "implicit zero-argument request errors when unset (sync)": + let res = ImplicitResponseSync.request() + check res.isErr + check res.error.contains("no zero-arg provider") + + test "implicit zero-argument provider raises error (sync)": + check ImplicitResponseSync + .setProvider( + proc(): Result[ImplicitResponseSync, string] = + raise newException(ValueError, "simulated failure") + ) + .isOk() + + let res = ImplicitResponseSync.request() + check res.isErr + check res.error.contains("simulated failure") + + ImplicitResponseSync.clearProvider() + +## --------------------------------------------------------------------------- +## POD / external type brokers + tests (distinct/alias behavior) +## --------------------------------------------------------------------------- + +type ExternalDefinedTypeAsync = object + label*: string + +type ExternalDefinedTypeSync = object + label*: string + +type ExternalDefinedTypeShared = object + label*: string + +RequestBroker: + type PodResponse = int + + proc signatureFetch*(): Future[Result[PodResponse, string]] {.async.} + +RequestBroker: + type ExternalAliasedResponse = ExternalDefinedTypeAsync + + proc signatureFetch*(): Future[Result[ExternalAliasedResponse, string]] {.async.} + +RequestBroker(sync): + type ExternalAliasedResponseSync = ExternalDefinedTypeSync + + proc signatureFetch*(): Result[ExternalAliasedResponseSync, string] + +RequestBroker(sync): + type DistinctStringResponseA = distinct string + +RequestBroker(sync): + type DistinctStringResponseB = distinct string + +RequestBroker(sync): + type ExternalDistinctResponseA = distinct ExternalDefinedTypeShared + +RequestBroker(sync): + type ExternalDistinctResponseB = distinct ExternalDefinedTypeShared + +suite "RequestBroker macro (POD/external types)": + test "supports non-object response types (async)": + check PodResponse + .setProvider( + proc(): Future[Result[PodResponse, string]] {.async.} = + ok(PodResponse(123)) + ) + .isOk() + + let res = waitFor PodResponse.request() + check res.isOk() + check int(res.value) == 123 + + PodResponse.clearProvider() + + test "supports aliased external types (async)": + check ExternalAliasedResponse + .setProvider( + proc(): Future[Result[ExternalAliasedResponse, string]] {.async.} = + ok(ExternalAliasedResponse(ExternalDefinedTypeAsync(label: "ext"))) + ) + .isOk() + + let res = waitFor ExternalAliasedResponse.request() + check res.isOk() + check ExternalDefinedTypeAsync(res.value).label == "ext" + + ExternalAliasedResponse.clearProvider() + + test "supports aliased external types (sync)": + check ExternalAliasedResponseSync + .setProvider( + proc(): Result[ExternalAliasedResponseSync, string] = + ok(ExternalAliasedResponseSync(ExternalDefinedTypeSync(label: "ext"))) + ) + .isOk() + + let res = ExternalAliasedResponseSync.request() + check res.isOk() + check ExternalDefinedTypeSync(res.value).label == "ext" + + ExternalAliasedResponseSync.clearProvider() + + test "distinct response types avoid overload ambiguity (sync)": + check DistinctStringResponseA + .setProvider( + proc(): Result[DistinctStringResponseA, string] = + ok(DistinctStringResponseA("a")) + ) + .isOk() + + check DistinctStringResponseB + .setProvider( + proc(): Result[DistinctStringResponseB, string] = + ok(DistinctStringResponseB("b")) + ) + .isOk() + + check ExternalDistinctResponseA + .setProvider( + proc(): Result[ExternalDistinctResponseA, string] = + ok(ExternalDistinctResponseA(ExternalDefinedTypeShared(label: "ea"))) + ) + .isOk() + + check ExternalDistinctResponseB + .setProvider( + proc(): Result[ExternalDistinctResponseB, string] = + ok(ExternalDistinctResponseB(ExternalDefinedTypeShared(label: "eb"))) + ) + .isOk() + + let resA = DistinctStringResponseA.request() + let resB = DistinctStringResponseB.request() + check resA.isOk + check resB.isOk + check string(resA.value) == "a" + check string(resB.value) == "b" + + let resEA = ExternalDistinctResponseA.request() + let resEB = ExternalDistinctResponseB.request() + check resEA.isOk + check resEB.isOk + check ExternalDefinedTypeShared(resEA.value).label == "ea" + check ExternalDefinedTypeShared(resEB.value).label == "eb" + + DistinctStringResponseA.clearProvider() + DistinctStringResponseB.clearProvider() + ExternalDistinctResponseA.clearProvider() + ExternalDistinctResponseB.clearProvider() diff --git a/waku/common/broker/request_broker.nim b/waku/common/broker/request_broker.nim index a8a6651d7..b4000e1f3 100644 --- a/waku/common/broker/request_broker.nim +++ b/waku/common/broker/request_broker.nim @@ -24,6 +24,56 @@ ## proc signature*(arg1: ArgType, arg2: AnotherArgType): Future[Result[TypeName, string]] ## ## ``` +## +## Sync mode (no `async` / `Future`) can be generated with: +## +## ```nim +## RequestBroker(sync): +## type TypeName = object +## field1*: FieldType +## +## proc signature*(): Result[TypeName, string] +## proc signature*(arg1: ArgType): Result[TypeName, string] +## ``` +## +## Note: When the request type is declared as a POD / alias / externally-defined +## type (i.e. not an inline `object` / `ref object` definition), RequestBroker +## will wrap it in `distinct` automatically unless you already used `distinct`. +## This avoids overload ambiguity when multiple brokers share the same +## underlying base type (Nim overload resolution does not consider return type). +## +## This means that for non-object request types you typically: +## - construct values with an explicit cast/constructor, e.g. `MyType("x")` +## - unwrap with a cast when needed, e.g. `string(myVal)` or `BaseType(myVal)` +## +## Example (POD response type): +## ```nim +## RequestBroker(sync): +## type MyCount = int # exported as: `distinct int` +## +## MyCount.setProvider(proc(): Result[MyCount, string] = ok(MyCount(42))) +## let res = MyCount.request() +## if res.isOk(): +## let raw = int(res.get()) +## ``` +## +## Example (externally-defined type): +## ```nim +## type External = object +## label*: string +## +## RequestBroker: +## type MyExternal = External # exported as: `distinct External` +## +## MyExternal.setProvider( +## proc(): Future[Result[MyExternal, string]] {.async.} = +## ok(MyExternal(External(label: "hi"))) +## ) +## let res = await MyExternal.request() +## if res.isOk(): +## let base = External(res.get()) +## echo base.label +## ``` ## The 'TypeName' object defines the requestable data (but also can be seen as request for action with return value). ## The 'signature' proc defines the provider(s) signature, that is enforced at compile time. ## One signature can be with no arguments, another with any number of arguments - where the input arguments are @@ -31,12 +81,12 @@ ## ## After this, you can register a provider anywhere in your code with ## `TypeName.setProvider(...)`, which returns error if already having a provider. -## Providers are async procs or lambdas that take no arguments and return a Future[Result[TypeName, string]]. +## Providers are async procs/lambdas in default mode and sync procs in sync mode. ## Only one provider can be registered at a time per signature type (zero arg and/or multi arg). ## ## Requests can be made from anywhere with no direct dependency on the provider by ## calling `TypeName.request()` - with arguments respecting the signature(s). -## This will asynchronously call the registered provider and return a Future[Result[TypeName, string]]. +## In async mode, this returns a Future[Result[TypeName, string]]. In sync mode, it returns Result[TypeName, string]. ## ## Whenever you no want to process requests (or your object instance that provides the request goes out of scope), ## you can remove it from the broker with `TypeName.clearProvider()`. @@ -49,10 +99,10 @@ ## text*: string ## ## ## Define the request and provider signature, that is enforced at compile time. -## proc signature*(): Future[Result[Greeting, string]] +## proc signature*(): Future[Result[Greeting, string]] {.async.} ## ## ## Also possible to define signature with arbitrary input arguments. -## proc signature*(lang: string): Future[Result[Greeting, string]] +## proc signature*(lang: string): Future[Result[Greeting, string]] {.async.} ## ## ... ## Greeting.setProvider( @@ -60,6 +110,23 @@ ## ok(Greeting(text: "hello")) ## ) ## let res = await Greeting.request() +## +## +## ... +## # using POD type as response for a synchronous request. +## RequestBroker(sync): +## type NeedThatInfo = string +## +##... +## NeedThatInfo.setProvider( +## proc(): Result[NeedThatInfo, string] = +## ok("this is the info you wanted") +## ) +## let res = NeedThatInfo.request().valueOr: +## echo "not ok due to: " & error +## NeedThatInfo(":-(") +## +## echo string(res) ## ``` ## If no `signature` proc is declared, a zero-argument form is generated ## automatically, so the caller only needs to provide the type definition. @@ -77,7 +144,11 @@ proc errorFuture[T](message: string): Future[Result[T, string]] {.inline.} = fut.complete(err(Result[T, string], message)) fut -proc isReturnTypeValid(returnType, typeIdent: NimNode): bool = +type RequestBrokerMode = enum + rbAsync + rbSync + +proc isAsyncReturnTypeValid(returnType, typeIdent: NimNode): bool = ## Accept Future[Result[TypeIdent, string]] as the contract. if returnType.kind != nnkBracketExpr or returnType.len != 2: return false @@ -92,6 +163,23 @@ proc isReturnTypeValid(returnType, typeIdent: NimNode): bool = return false inner[2].kind == nnkIdent and inner[2].eqIdent("string") +proc isSyncReturnTypeValid(returnType, typeIdent: NimNode): bool = + ## Accept Result[TypeIdent, string] as the contract. + if returnType.kind != nnkBracketExpr or returnType.len != 3: + return false + if returnType[0].kind != nnkIdent or not returnType[0].eqIdent("Result"): + return false + if returnType[1].kind != nnkIdent or not returnType[1].eqIdent($typeIdent): + return false + returnType[2].kind == nnkIdent and returnType[2].eqIdent("string") + +proc isReturnTypeValid(returnType, typeIdent: NimNode, mode: RequestBrokerMode): bool = + case mode + of rbAsync: + isAsyncReturnTypeValid(returnType, typeIdent) + of rbSync: + isSyncReturnTypeValid(returnType, typeIdent) + proc cloneParams(params: seq[NimNode]): seq[NimNode] = ## Deep copy parameter definitions so they can be inserted in multiple places. result = @[] @@ -109,73 +197,122 @@ proc collectParamNames(params: seq[NimNode]): seq[NimNode] = continue result.add(ident($nameNode)) -proc makeProcType(returnType: NimNode, params: seq[NimNode]): NimNode = +proc makeProcType( + returnType: NimNode, params: seq[NimNode], mode: RequestBrokerMode +): NimNode = var formal = newTree(nnkFormalParams) formal.add(returnType) for param in params: formal.add(param) - let pragmas = newTree(nnkPragma, ident("async")) - newTree(nnkProcTy, formal, pragmas) + case mode + of rbAsync: + let pragmas = newTree(nnkPragma, ident("async")) + newTree(nnkProcTy, formal, pragmas) + of rbSync: + let raisesPragma = newTree( + nnkExprColonExpr, ident("raises"), newTree(nnkBracket, ident("CatchableError")) + ) + let pragmas = newTree(nnkPragma, raisesPragma) + newTree(nnkProcTy, formal, pragmas) -macro RequestBroker*(body: untyped): untyped = +proc parseMode(modeNode: NimNode): RequestBrokerMode = + ## Parses the mode selector for the 2-argument macro overload. + ## Supported spellings: `sync` / `async` (case-insensitive). + let raw = ($modeNode).strip().toLowerAscii() + case raw + of "sync", "rbsync": + rbSync + of "async", "rbasync": + rbAsync + else: + error("RequestBroker mode must be `sync` or `async` (default is async)", modeNode) + +proc ensureDistinctType(rhs: NimNode): NimNode = + ## For PODs / aliases / externally-defined types, wrap in `distinct` unless + ## it's already distinct. + if rhs.kind == nnkDistinctTy: + return copyNimTree(rhs) + newTree(nnkDistinctTy, copyNimTree(rhs)) + +proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = when defined(requestBrokerDebug): echo body.treeRepr + echo "RequestBroker mode: ", $mode var typeIdent: NimNode = nil var objectDef: NimNode = nil - var isRefObject = false for stmt in body: if stmt.kind == nnkTypeSection: for def in stmt: if def.kind != nnkTypeDef: continue + if not typeIdent.isNil(): + error("Only one type may be declared inside RequestBroker", def) + + typeIdent = baseTypeIdent(def[0]) let rhs = def[2] - var objectType: NimNode + + ## Support inline object types (fields are auto-exported) + ## AND non-object types / aliases (e.g. `string`, `int`, `OtherType`). case rhs.kind of nnkObjectTy: - objectType = rhs + let recList = rhs[2] + if recList.kind != nnkRecList: + error("RequestBroker object must declare a standard field list", rhs) + var exportedRecList = newTree(nnkRecList) + for field in recList: + case field.kind + of nnkIdentDefs: + ensureFieldDef(field) + var cloned = copyNimTree(field) + for i in 0 ..< cloned.len - 2: + cloned[i] = exportIdentNode(cloned[i]) + exportedRecList.add(cloned) + of nnkEmpty: + discard + else: + error( + "RequestBroker object definition only supports simple field declarations", + field, + ) + objectDef = newTree( + nnkObjectTy, copyNimTree(rhs[0]), copyNimTree(rhs[1]), exportedRecList + ) of nnkRefTy: - isRefObject = true - if rhs.len != 1 or rhs[0].kind != nnkObjectTy: - error( - "RequestBroker ref object must wrap a concrete object definition", rhs + if rhs.len != 1: + error("RequestBroker ref type must have a single base", rhs) + if rhs[0].kind == nnkObjectTy: + let obj = rhs[0] + let recList = obj[2] + if recList.kind != nnkRecList: + error("RequestBroker object must declare a standard field list", obj) + var exportedRecList = newTree(nnkRecList) + for field in recList: + case field.kind + of nnkIdentDefs: + ensureFieldDef(field) + var cloned = copyNimTree(field) + for i in 0 ..< cloned.len - 2: + cloned[i] = exportIdentNode(cloned[i]) + exportedRecList.add(cloned) + of nnkEmpty: + discard + else: + error( + "RequestBroker object definition only supports simple field declarations", + field, + ) + let exportedObjectType = newTree( + nnkObjectTy, copyNimTree(obj[0]), copyNimTree(obj[1]), exportedRecList ) - objectType = rhs[0] - else: - continue - if not typeIdent.isNil(): - error("Only one object type may be declared inside RequestBroker", def) - typeIdent = baseTypeIdent(def[0]) - let recList = objectType[2] - if recList.kind != nnkRecList: - error("RequestBroker object must declare a standard field list", objectType) - var exportedRecList = newTree(nnkRecList) - for field in recList: - case field.kind - of nnkIdentDefs: - ensureFieldDef(field) - var cloned = copyNimTree(field) - for i in 0 ..< cloned.len - 2: - cloned[i] = exportIdentNode(cloned[i]) - exportedRecList.add(cloned) - of nnkEmpty: - discard + objectDef = newTree(nnkRefTy, exportedObjectType) else: - error( - "RequestBroker object definition only supports simple field declarations", - field, - ) - let exportedObjectType = newTree( - nnkObjectTy, - copyNimTree(objectType[0]), - copyNimTree(objectType[1]), - exportedRecList, - ) - if isRefObject: - objectDef = newTree(nnkRefTy, exportedObjectType) + ## `ref SomeType` (SomeType can be defined elsewhere) + objectDef = ensureDistinctType(rhs) else: - objectDef = exportedObjectType + ## Non-object type / alias (e.g. `string`, `int`, `SomeExternalType`). + objectDef = ensureDistinctType(rhs) if typeIdent.isNil(): - error("RequestBroker body must declare exactly one object type", body) + error("RequestBroker body must declare exactly one type", body) when defined(requestBrokerDebug): echo "RequestBroker generating type: ", $typeIdent @@ -183,7 +320,6 @@ macro RequestBroker*(body: untyped): untyped = let exportedTypeIdent = postfix(copyNimTree(typeIdent), "*") let typeDisplayName = sanitizeIdentName(typeIdent) let typeNameLit = newLit(typeDisplayName) - let isRefObjectLit = newLit(isRefObject) var zeroArgSig: NimNode = nil var zeroArgProviderName: NimNode = nil var zeroArgFieldName: NimNode = nil @@ -211,10 +347,14 @@ macro RequestBroker*(body: untyped): untyped = if params.len == 0: error("Signature must declare a return type", stmt) let returnType = params[0] - if not isReturnTypeValid(returnType, typeIdent): - error( - "Signature must return Future[Result[`" & $typeIdent & "`, string]]", stmt - ) + if not isReturnTypeValid(returnType, typeIdent, mode): + case mode + of rbAsync: + error( + "Signature must return Future[Result[`" & $typeIdent & "`, string]]", stmt + ) + of rbSync: + error("Signature must return Result[`" & $typeIdent & "`, string]", stmt) let paramCount = params.len - 1 if paramCount == 0: if zeroArgSig != nil: @@ -258,14 +398,20 @@ macro RequestBroker*(body: untyped): untyped = var typeSection = newTree(nnkTypeSection) typeSection.add(newTree(nnkTypeDef, exportedTypeIdent, newEmptyNode(), objectDef)) - let returnType = quote: - Future[Result[`typeIdent`, string]] + let returnType = + case mode + of rbAsync: + quote: + Future[Result[`typeIdent`, string]] + of rbSync: + quote: + Result[`typeIdent`, string] if not zeroArgSig.isNil(): - let procType = makeProcType(returnType, @[]) + let procType = makeProcType(returnType, @[], mode) typeSection.add(newTree(nnkTypeDef, zeroArgProviderName, newEmptyNode(), procType)) if not argSig.isNil(): - let procType = makeProcType(returnType, cloneParams(argParams)) + let procType = makeProcType(returnType, cloneParams(argParams), mode) typeSection.add(newTree(nnkTypeDef, argProviderName, newEmptyNode(), procType)) var brokerRecList = newTree(nnkRecList) @@ -316,33 +462,69 @@ macro RequestBroker*(body: untyped): untyped = quote do: `accessProcIdent`().`zeroArgFieldName` = nil ) - result.add( - quote do: - proc request*( - _: typedesc[`typeIdent`] - ): Future[Result[`typeIdent`, string]] {.async: (raises: []).} = - let provider = `accessProcIdent`().`zeroArgFieldName` - if provider.isNil(): - return err( - "RequestBroker(" & `typeNameLit` & "): no zero-arg provider registered" - ) - let catchedRes = catch: - await provider() + case mode + of rbAsync: + result.add( + quote do: + proc request*( + _: typedesc[`typeIdent`] + ): Future[Result[`typeIdent`, string]] {.async: (raises: []).} = + let provider = `accessProcIdent`().`zeroArgFieldName` + if provider.isNil(): + return err( + "RequestBroker(" & `typeNameLit` & "): no zero-arg provider registered" + ) + let catchedRes = catch: + await provider() - if catchedRes.isErr(): - return err("Request failed:" & catchedRes.error.msg) + if catchedRes.isErr(): + return err( + "RequestBroker(" & `typeNameLit` & "): provider threw exception: " & + catchedRes.error.msg + ) - let providerRes = catchedRes.get() - when `isRefObjectLit`: + let providerRes = catchedRes.get() if providerRes.isOk(): let resultValue = providerRes.get() - if resultValue.isNil(): - return err( - "RequestBroker(" & `typeNameLit` & "): provider returned nil result" - ) - return providerRes + when compiles(resultValue.isNil()): + if resultValue.isNil(): + return err( + "RequestBroker(" & `typeNameLit` & "): provider returned nil result" + ) + return providerRes - ) + ) + of rbSync: + result.add( + quote do: + proc request*( + _: typedesc[`typeIdent`] + ): Result[`typeIdent`, string] {.raises: [].} = + let provider = `accessProcIdent`().`zeroArgFieldName` + if provider.isNil(): + return err( + "RequestBroker(" & `typeNameLit` & "): no zero-arg provider registered" + ) + + var providerRes: Result[`typeIdent`, string] + try: + providerRes = provider() + except CatchableError as e: + return err( + "RequestBroker(" & `typeNameLit` & "): provider threw exception: " & + e.msg + ) + + if providerRes.isOk(): + let resultValue = providerRes.get() + when compiles(resultValue.isNil()): + if resultValue.isNil(): + return err( + "RequestBroker(" & `typeNameLit` & "): provider returned nil result" + ) + return providerRes + + ) if not argSig.isNil(): result.add( quote do: @@ -363,10 +545,7 @@ macro RequestBroker*(body: untyped): untyped = let argNameIdents = collectParamNames(requestParamDefs) let providerSym = genSym(nskLet, "provider") var formalParams = newTree(nnkFormalParams) - formalParams.add( - quote do: - Future[Result[`typeIdent`, string]] - ) + formalParams.add(copyNimTree(returnType)) formalParams.add( newTree( nnkIdentDefs, @@ -378,8 +557,14 @@ macro RequestBroker*(body: untyped): untyped = for paramDef in requestParamDefs: formalParams.add(paramDef) - let requestPragmas = quote: - {.async: (raises: []), gcsafe.} + let requestPragmas = + case mode + of rbAsync: + quote: + {.async: (raises: []), gcsafe.} + of rbSync: + quote: + {.raises: [].} var providerCall = newCall(providerSym) for argName in argNameIdents: providerCall.add(argName) @@ -396,23 +581,49 @@ macro RequestBroker*(body: untyped): untyped = "): no provider registered for input signature" ) ) - requestBody.add( - quote do: - let catchedRes = catch: - await `providerCall` - if catchedRes.isErr(): - return err("Request failed:" & catchedRes.error.msg) - let providerRes = catchedRes.get() - when `isRefObjectLit`: + case mode + of rbAsync: + requestBody.add( + quote do: + let catchedRes = catch: + await `providerCall` + if catchedRes.isErr(): + return err( + "RequestBroker(" & `typeNameLit` & "): provider threw exception: " & + catchedRes.error.msg + ) + + let providerRes = catchedRes.get() if providerRes.isOk(): let resultValue = providerRes.get() - if resultValue.isNil(): - return err( - "RequestBroker(" & `typeNameLit` & "): provider returned nil result" - ) - return providerRes - ) + when compiles(resultValue.isNil()): + if resultValue.isNil(): + return err( + "RequestBroker(" & `typeNameLit` & "): provider returned nil result" + ) + return providerRes + ) + of rbSync: + requestBody.add( + quote do: + var providerRes: Result[`typeIdent`, string] + try: + providerRes = `providerCall` + except CatchableError as e: + return err( + "RequestBroker(" & `typeNameLit` & "): provider threw exception: " & e.msg + ) + + if providerRes.isOk(): + let resultValue = providerRes.get() + when compiles(resultValue.isNil()): + if resultValue.isNil(): + return err( + "RequestBroker(" & `typeNameLit` & "): provider returned nil result" + ) + return providerRes + ) # requestBody.add(providerCall) result.add( newTree( @@ -436,3 +647,17 @@ macro RequestBroker*(body: untyped): untyped = when defined(requestBrokerDebug): echo result.repr + + return result + +macro RequestBroker*(body: untyped): untyped = + ## Default (async) mode. + generateRequestBroker(body, rbAsync) + +macro RequestBroker*(mode: untyped, body: untyped): untyped = + ## Explicit mode selector. + ## Example: + ## RequestBroker(sync): + ## type Foo = object + ## proc signature*(): Result[Foo, string] + generateRequestBroker(body, parseMode(mode)) From 130ebd75092203ea2074f11b13b5d31cc2cb3575 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Mon, 15 Dec 2025 14:19:20 +0100 Subject: [PATCH 5/8] Adapt gcsafe pragma for RequestBroker sync requests and provider signatures as requirement --- waku/common/broker/request_broker.nim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/waku/common/broker/request_broker.nim b/waku/common/broker/request_broker.nim index b4000e1f3..7e2873616 100644 --- a/waku/common/broker/request_broker.nim +++ b/waku/common/broker/request_broker.nim @@ -212,7 +212,7 @@ proc makeProcType( let raisesPragma = newTree( nnkExprColonExpr, ident("raises"), newTree(nnkBracket, ident("CatchableError")) ) - let pragmas = newTree(nnkPragma, raisesPragma) + let pragmas = newTree(nnkPragma, raisesPragma, ident("gcsafe")) newTree(nnkProcTy, formal, pragmas) proc parseMode(modeNode: NimNode): RequestBrokerMode = @@ -499,7 +499,7 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = quote do: proc request*( _: typedesc[`typeIdent`] - ): Result[`typeIdent`, string] {.raises: [].} = + ): Result[`typeIdent`, string] {.gcsafe, raises: [].} = let provider = `accessProcIdent`().`zeroArgFieldName` if provider.isNil(): return err( @@ -564,7 +564,7 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = {.async: (raises: []), gcsafe.} of rbSync: quote: - {.raises: [].} + {.gcsafe, raises: [].} var providerCall = newCall(providerSym) for argName in argNameIdents: providerCall.add(argName) From 15f0ab9e5277c91f3ecc9fa70e32d10c93ffab4c Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Mon, 15 Dec 2025 14:20:25 +0100 Subject: [PATCH 6/8] Utlize sync RequestBroker, adapt to non-async broker usage and gcsafe where appropriate, removed leftover --- examples/api_example/api_example.nim | 12 +++++++++--- waku/api/api.nim | 15 ++------------- waku/api/subscribe/subscribe.nim | 12 ------------ .../send_service/lightpush_processor.nim | 2 +- .../send_service/relay_processor.nim | 8 ++++---- .../send_service/send_processor.nim | 6 ++++-- waku/requests/health_request.nim | 8 +++----- waku/waku_relay/protocol.nim | 4 +--- 8 files changed, 24 insertions(+), 43 deletions(-) delete mode 100644 waku/api/subscribe/subscribe.nim diff --git a/examples/api_example/api_example.nim b/examples/api_example/api_example.nim index adfb5bbd7..37dd5d34b 100644 --- a/examples/api_example/api_example.nim +++ b/examples/api_example/api_example.nim @@ -12,19 +12,25 @@ proc periodicSender(w: Waku): Future[void] {.async.} = proc(event: MessageSentEvent) {.async: (raises: []).} = echo "Message sent with request ID: ", event.requestId, " hash: ", event.messageHash - ) + ).valueOr: + echo "Failed to listen to message sent event: ", error + return let errorListener = MessageErrorEvent.listen( proc(event: MessageErrorEvent) {.async: (raises: []).} = echo "Message failed to send with request ID: ", event.requestId, " error: ", event.error - ) + ).valueOr: + echo "Failed to listen to message error event: ", error + return let propagatedListener = MessagePropagatedEvent.listen( proc(event: MessagePropagatedEvent) {.async: (raises: []).} = echo "Message propagated with request ID: ", event.requestId, " hash: ", event.messageHash - ) + ).valueOr: + echo "Failed to listen to message propagated event: ", error + return defer: MessageSentEvent.dropListener(sentListener) diff --git a/waku/api/api.nim b/waku/api/api.nim index 020cd6c6d..a63379f12 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -3,7 +3,7 @@ import chronicles, chronos, results import waku/factory/waku import waku/[requests/health_request, waku_core, waku_node] import waku/node/delivery_service/send_service -import ./[api_conf, types], ./subscribe/subscribe +import ./[api_conf, types] logScope: topics = "api" @@ -26,7 +26,7 @@ proc checkApiAvailability(w: Waku): Result[void, string] = # check if health is satisfactory # If Node is not healthy, return err("Waku node is not healthy") - let healthStatus = waitFor RequestNodeHealth.request() + let healthStatus = RequestNodeHealth.request() if healthStatus.isErr(): warn "Failed to get Waku node health status: ", error = healthStatus.error @@ -37,17 +37,6 @@ proc checkApiAvailability(w: Waku): Result[void, string] = return ok() -proc subscribe*( - w: Waku, contentTopic: ContentTopic -): Future[Result[RequestId, string]] {.async.} = - ?checkApiAvailability(w) - - let requestId = newRequestId(w.rng) - - asyncSpawn w.subscribeImpl(requestId, contentTopic) - - return ok(requestId) - proc send*( w: Waku, envelope: MessageEnvelope ): Future[Result[RequestId, string]] {.async.} = diff --git a/waku/api/subscribe/subscribe.nim b/waku/api/subscribe/subscribe.nim deleted file mode 100644 index 9283936cf..000000000 --- a/waku/api/subscribe/subscribe.nim +++ /dev/null @@ -1,12 +0,0 @@ -# import chronicles, chronos, results -import chronos -import waku/waku_core -import waku/api/types -import waku/factory/waku - -proc subscribeImpl*( - w: Waku, requestId: RequestId, contentTopic: ContentTopic -): Future[void] {.async.} = - ## Implementation of the subscribe API - ## This is a placeholder implementation - await sleepAsync(1000) # Simulate async work diff --git a/waku/node/delivery_service/send_service/lightpush_processor.nim b/waku/node/delivery_service/send_service/lightpush_processor.nim index 8d7ab5a5e..1233b3bc6 100644 --- a/waku/node/delivery_service/send_service/lightpush_processor.nim +++ b/waku/node/delivery_service/send_service/lightpush_processor.nim @@ -30,7 +30,7 @@ proc isLightpushPeerAvailable( method isValidProcessor*( self: LightpushSendProcessor, task: DeliveryTask -): Future[bool] {.async.} = +): bool {.gcsafe.} = return self.isLightpushPeerAvailable(task.pubsubTopic) method sendImpl*( diff --git a/waku/node/delivery_service/send_service/relay_processor.nim b/waku/node/delivery_service/send_service/relay_processor.nim index 7f7fdc8dc..51a68c839 100644 --- a/waku/node/delivery_service/send_service/relay_processor.nim +++ b/waku/node/delivery_service/send_service/relay_processor.nim @@ -26,8 +26,8 @@ proc new*( return RelaySendProcessor(publishProc: publishProc, fallbackStateToSet: fallbackStateToSet) -proc isTopicHealthy(topic: PubsubTopic): Future[bool] {.async.} = - let healthReport = (await RequestRelayTopicsHealth.request(@[topic])).valueOr: +proc isTopicHealthy(topic: PubsubTopic): bool {.gcsafe.} = + let healthReport = RequestRelayTopicsHealth.request(@[topic]).valueOr: return false if healthReport.topicHealth.len() < 1: @@ -37,8 +37,8 @@ proc isTopicHealthy(topic: PubsubTopic): Future[bool] {.async.} = method isValidProcessor*( self: RelaySendProcessor, task: DeliveryTask -): Future[bool] {.async.} = - return await isTopicHealthy(task.pubsubTopic) +): bool {.gcsafe.} = + return isTopicHealthy(task.pubsubTopic) method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} = task.tryCount.inc() diff --git a/waku/node/delivery_service/send_service/send_processor.nim b/waku/node/delivery_service/send_service/send_processor.nim index 3969560c5..9de425d9f 100644 --- a/waku/node/delivery_service/send_service/send_processor.nim +++ b/waku/node/delivery_service/send_service/send_processor.nim @@ -1,6 +1,8 @@ import chronos import ./delivery_task +{.push raises: [].} + type BaseSendProcessor* = ref object of RootObj fallbackProcessor*: BaseSendProcessor @@ -9,7 +11,7 @@ proc chain*(self: BaseSendProcessor, next: BaseSendProcessor) = method isValidProcessor*( self: BaseSendProcessor, task: DeliveryTask -): Future[bool] {.async, base.} = +): bool {.base, gcsafe.} = return false method sendImpl*( @@ -23,7 +25,7 @@ method process*( var currentProcessor: BaseSendProcessor = self var keepTrying = true while not currentProcessor.isNil() and keepTrying: - if await currentProcessor.isValidProcessor(task): + if currentProcessor.isValidProcessor(task): await currentProcessor.sendImpl(task) currentProcessor = currentProcessor.fallbackProcessor keepTrying = task.state == DeliveryState.FallbackRetry diff --git a/waku/requests/health_request.nim b/waku/requests/health_request.nim index 6b3bc786c..9f98eba67 100644 --- a/waku/requests/health_request.nim +++ b/waku/requests/health_request.nim @@ -6,17 +6,15 @@ import waku/waku_core/topics export protocol_health, topic_health -RequestBroker: +RequestBroker(sync): type RequestNodeHealth* = object healthStatus*: NodeHealth -RequestBroker: +RequestBroker(sync): type RequestRelayTopicsHealth* = object topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]] - proc signature( - topics: seq[PubsubTopic] - ): Future[Result[RequestRelayTopicsHealth, string]] {.async.} + proc signature(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] MultiRequestBroker: type RequestProtocolHealth* = object diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 47a898b6d..812bf204a 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -326,9 +326,7 @@ proc initRelayObservers(w: WakuRelay) = proc initRequestProviders(w: WakuRelay) = RequestRelayTopicsHealth.setProvider( - proc( - topics: seq[PubsubTopic] - ): Future[Result[RequestRelayTopicsHealth, string]] {.async.} = + proc(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] = var collectedRes: RequestRelayTopicsHealth for topic in topics: let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED) From 0caef60f489f6a6ca908aa293f43a03af6f1bb2c Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Mon, 15 Dec 2025 15:53:39 +0100 Subject: [PATCH 7/8] add api_example app to examples2 --- waku.nimble | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku.nimble b/waku.nimble index 405ee7cf9..e94412b89 100644 --- a/waku.nimble +++ b/waku.nimble @@ -133,7 +133,7 @@ task testwakunode2, "Build & run wakunode2 app tests": test "all_tests_wakunode2" task example2, "Build Waku examples": - buildBinary "waku_example", "examples/" + buildBinary "api_example", "examples/api_example/" buildBinary "publisher", "examples/" buildBinary "subscriber", "examples/" buildBinary "filter_subscriber", "examples/" From 2ce8bdee66f41b66a918bc55a51dcd76f329a3c2 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Mon, 15 Dec 2025 16:05:19 +0100 Subject: [PATCH 8/8] Adapt after merge from master --- tests/api/test_api_send.nim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/api/test_api_send.nim b/tests/api/test_api_send.nim index f14d57d44..71a305b38 100644 --- a/tests/api/test_api_send.nim +++ b/tests/api/test_api_send.nim @@ -67,7 +67,8 @@ suite "Waku API - Send": (await lightpushNode.mountRelay()).isOkOr: raiseAssert "Failed to mount relay" - await lightpushNode.mountLightPush() + (await lightpushNode.mountLightPush()).isOkOr: + raiseAssert "Failed to mount lightpush" (await storeNode.mountRelay()).isOkOr: raiseAssert "Failed to mount relay"