From 15f0ab9e5277c91f3ecc9fa70e32d10c93ffab4c Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Mon, 15 Dec 2025 14:20:25 +0100 Subject: [PATCH] Utlize sync RequestBroker, adapt to non-async broker usage and gcsafe where appropriate, removed leftover --- examples/api_example/api_example.nim | 12 +++++++++--- waku/api/api.nim | 15 ++------------- waku/api/subscribe/subscribe.nim | 12 ------------ .../send_service/lightpush_processor.nim | 2 +- .../send_service/relay_processor.nim | 8 ++++---- .../send_service/send_processor.nim | 6 ++++-- waku/requests/health_request.nim | 8 +++----- waku/waku_relay/protocol.nim | 4 +--- 8 files changed, 24 insertions(+), 43 deletions(-) delete mode 100644 waku/api/subscribe/subscribe.nim diff --git a/examples/api_example/api_example.nim b/examples/api_example/api_example.nim index adfb5bbd7..37dd5d34b 100644 --- a/examples/api_example/api_example.nim +++ b/examples/api_example/api_example.nim @@ -12,19 +12,25 @@ proc periodicSender(w: Waku): Future[void] {.async.} = proc(event: MessageSentEvent) {.async: (raises: []).} = echo "Message sent with request ID: ", event.requestId, " hash: ", event.messageHash - ) + ).valueOr: + echo "Failed to listen to message sent event: ", error + return let errorListener = MessageErrorEvent.listen( proc(event: MessageErrorEvent) {.async: (raises: []).} = echo "Message failed to send with request ID: ", event.requestId, " error: ", event.error - ) + ).valueOr: + echo "Failed to listen to message error event: ", error + return let propagatedListener = MessagePropagatedEvent.listen( proc(event: MessagePropagatedEvent) {.async: (raises: []).} = echo "Message propagated with request ID: ", event.requestId, " hash: ", event.messageHash - ) + ).valueOr: + echo "Failed to listen to message propagated event: ", error + return defer: MessageSentEvent.dropListener(sentListener) diff --git a/waku/api/api.nim b/waku/api/api.nim index 020cd6c6d..a63379f12 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -3,7 +3,7 @@ import chronicles, chronos, results import waku/factory/waku import waku/[requests/health_request, waku_core, waku_node] import waku/node/delivery_service/send_service -import ./[api_conf, types], ./subscribe/subscribe +import ./[api_conf, types] logScope: topics = "api" @@ -26,7 +26,7 @@ proc checkApiAvailability(w: Waku): Result[void, string] = # check if health is satisfactory # If Node is not healthy, return err("Waku node is not healthy") - let healthStatus = waitFor RequestNodeHealth.request() + let healthStatus = RequestNodeHealth.request() if healthStatus.isErr(): warn "Failed to get Waku node health status: ", error = healthStatus.error @@ -37,17 +37,6 @@ proc checkApiAvailability(w: Waku): Result[void, string] = return ok() -proc subscribe*( - w: Waku, contentTopic: ContentTopic -): Future[Result[RequestId, string]] {.async.} = - ?checkApiAvailability(w) - - let requestId = newRequestId(w.rng) - - asyncSpawn w.subscribeImpl(requestId, contentTopic) - - return ok(requestId) - proc send*( w: Waku, envelope: MessageEnvelope ): Future[Result[RequestId, string]] {.async.} = diff --git a/waku/api/subscribe/subscribe.nim b/waku/api/subscribe/subscribe.nim deleted file mode 100644 index 9283936cf..000000000 --- a/waku/api/subscribe/subscribe.nim +++ /dev/null @@ -1,12 +0,0 @@ -# import chronicles, chronos, results -import chronos -import waku/waku_core -import waku/api/types -import waku/factory/waku - -proc subscribeImpl*( - w: Waku, requestId: RequestId, contentTopic: ContentTopic -): Future[void] {.async.} = - ## Implementation of the subscribe API - ## This is a placeholder implementation - await sleepAsync(1000) # Simulate async work diff --git a/waku/node/delivery_service/send_service/lightpush_processor.nim b/waku/node/delivery_service/send_service/lightpush_processor.nim index 8d7ab5a5e..1233b3bc6 100644 --- a/waku/node/delivery_service/send_service/lightpush_processor.nim +++ b/waku/node/delivery_service/send_service/lightpush_processor.nim @@ -30,7 +30,7 @@ proc isLightpushPeerAvailable( method isValidProcessor*( self: LightpushSendProcessor, task: DeliveryTask -): Future[bool] {.async.} = +): bool {.gcsafe.} = return self.isLightpushPeerAvailable(task.pubsubTopic) method sendImpl*( diff --git a/waku/node/delivery_service/send_service/relay_processor.nim b/waku/node/delivery_service/send_service/relay_processor.nim index 7f7fdc8dc..51a68c839 100644 --- a/waku/node/delivery_service/send_service/relay_processor.nim +++ b/waku/node/delivery_service/send_service/relay_processor.nim @@ -26,8 +26,8 @@ proc new*( return RelaySendProcessor(publishProc: publishProc, fallbackStateToSet: fallbackStateToSet) -proc isTopicHealthy(topic: PubsubTopic): Future[bool] {.async.} = - let healthReport = (await RequestRelayTopicsHealth.request(@[topic])).valueOr: +proc isTopicHealthy(topic: PubsubTopic): bool {.gcsafe.} = + let healthReport = RequestRelayTopicsHealth.request(@[topic]).valueOr: return false if healthReport.topicHealth.len() < 1: @@ -37,8 +37,8 @@ proc isTopicHealthy(topic: PubsubTopic): Future[bool] {.async.} = method isValidProcessor*( self: RelaySendProcessor, task: DeliveryTask -): Future[bool] {.async.} = - return await isTopicHealthy(task.pubsubTopic) +): bool {.gcsafe.} = + return isTopicHealthy(task.pubsubTopic) method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} = task.tryCount.inc() diff --git a/waku/node/delivery_service/send_service/send_processor.nim b/waku/node/delivery_service/send_service/send_processor.nim index 3969560c5..9de425d9f 100644 --- a/waku/node/delivery_service/send_service/send_processor.nim +++ b/waku/node/delivery_service/send_service/send_processor.nim @@ -1,6 +1,8 @@ import chronos import ./delivery_task +{.push raises: [].} + type BaseSendProcessor* = ref object of RootObj fallbackProcessor*: BaseSendProcessor @@ -9,7 +11,7 @@ proc chain*(self: BaseSendProcessor, next: BaseSendProcessor) = method isValidProcessor*( self: BaseSendProcessor, task: DeliveryTask -): Future[bool] {.async, base.} = +): bool {.base, gcsafe.} = return false method sendImpl*( @@ -23,7 +25,7 @@ method process*( var currentProcessor: BaseSendProcessor = self var keepTrying = true while not currentProcessor.isNil() and keepTrying: - if await currentProcessor.isValidProcessor(task): + if currentProcessor.isValidProcessor(task): await currentProcessor.sendImpl(task) currentProcessor = currentProcessor.fallbackProcessor keepTrying = task.state == DeliveryState.FallbackRetry diff --git a/waku/requests/health_request.nim b/waku/requests/health_request.nim index 6b3bc786c..9f98eba67 100644 --- a/waku/requests/health_request.nim +++ b/waku/requests/health_request.nim @@ -6,17 +6,15 @@ import waku/waku_core/topics export protocol_health, topic_health -RequestBroker: +RequestBroker(sync): type RequestNodeHealth* = object healthStatus*: NodeHealth -RequestBroker: +RequestBroker(sync): type RequestRelayTopicsHealth* = object topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]] - proc signature( - topics: seq[PubsubTopic] - ): Future[Result[RequestRelayTopicsHealth, string]] {.async.} + proc signature(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] MultiRequestBroker: type RequestProtocolHealth* = object diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 47a898b6d..812bf204a 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -326,9 +326,7 @@ proc initRelayObservers(w: WakuRelay) = proc initRequestProviders(w: WakuRelay) = RequestRelayTopicsHealth.setProvider( - proc( - topics: seq[PubsubTopic] - ): Future[Result[RequestRelayTopicsHealth, string]] {.async.} = + proc(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] = var collectedRes: RequestRelayTopicsHealth for topic in topics: let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED)