From cc32e91074598b013c35b66af62d26691db1f6b7 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 28 Jan 2026 20:36:03 +0100 Subject: [PATCH] Addressing all remaining review findings. Removed leftovers. Fixed loggings and typos --- waku/api/api.nim | 5 +++-- waku/api/send_api.md | 12 +++++------ waku/api/types.nim | 5 +---- waku/factory/waku.nim | 18 ++++++++--------- .../recv_service/recv_service.nim | 4 ++-- .../send_service/delivery_task.nim | 14 +++++++------ .../send_service/lightpush_processor.nim | 17 +++++++++------- .../send_service/relay_processor.nim | 8 ++++---- .../send_service/send_service.nim | 12 +++++++---- waku/node/kernel_api/relay.nim | 2 +- waku/node/waku_node.nim | 7 +++---- waku/waku_core/event/event_emitter.nim | 20 ------------------- waku/waku_filter_v2/client.nim | 5 +++-- waku/waku_lightpush/client.nim | 2 -- waku/waku_rln_relay/rln_relay.nim | 2 +- 15 files changed, 58 insertions(+), 75 deletions(-) delete mode 100644 waku/waku_core/event/event_emitter.nim diff --git a/waku/api/api.nim b/waku/api/api.nim index d9ec208b8..41f4fd240 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -57,14 +57,15 @@ proc send*( let requestId = RequestId.new(w.rng) - let deliveryTask = DeliveryTask.create(requestId, envelope, w.brokerCtx).valueOr: + let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr: return err("API send: Failed to create delivery task: " & error) info "API send: scheduling delivery task", requestId = $requestId, pubsubTopic = deliveryTask.pubsubTopic, contentTopic = deliveryTask.msg.contentTopic, - msgHash = deliveryTask.msgHash.shortLog() + msgHash = deliveryTask.msgHash.to0xHex(), + myPeerId = w.node.peerId() asyncSpawn w.deliveryService.sendService.send(deliveryTask) diff --git a/waku/api/send_api.md b/waku/api/send_api.md index 69f55dc9e..2a5a2f8a4 100644 --- a/waku/api/send_api.md +++ b/waku/api/send_api.md @@ -7,19 +7,19 @@ This document collects logic and todo's around the Send API. ## Overview Send api hides the complex logic of using raw protocols for reliable message delivery. -The delivery method is chosen based on the node configuration and actual availabilites of peers. +The delivery method is chosen based on the node configuration and actual availabilities of peers. ## Delivery task -Each message send request is boundled into a task that not just holds the composed message but also the state of the delivery. +Each message send request is bundled into a task that not just holds the composed message but also the state of the delivery. ## Delivery methods Depending on the configuration and the availability of store client protocol + actual configured and/or discovered store nodes: -- P2PReliability validation - checking network store node wheather the message is reached at least a store node. +- P2PReliability validation - checking network store node whether the message is reached at least a store node. - Simple retry until message is propagated to the network - Relay says >0 peers as publish result - - LightushClient returns with success + - LightpushClient returns with success Depending on node config: - Relay @@ -28,9 +28,9 @@ Depending on node config: These methods are used in combination to achieve the best reliability. Fallback mechanism is used to switch between methods if the current one fails. -Relay+StoreCheck -> Relay+simeple retry -> Lightpush+StoreCheck -> Lightpush simple retry -> Error +Relay+StoreCheck -> Relay+simple retry -> Lightpush+StoreCheck -> Lightpush simple retry -> Error -Combination is dynamicly chosen on node configuration. Levels can be skipped depending on actual connectivity. +Combination is dynamically chosen on node configuration. Levels can be skipped depending on actual connectivity. Actual connectivity is checked: - Relay's topic health check - at least dLow peers in the mesh for the topic - Store nodes availability - at least one store service node is available in peer manager diff --git a/waku/api/types.nim b/waku/api/types.nim index 390c18230..a0626e98c 100644 --- a/waku/api/types.nim +++ b/waku/api/types.nim @@ -1,14 +1,11 @@ {.push raises: [].} -import bearssl/rand, std/times, chronos, chronicles +import bearssl/rand, std/times, chronos import stew/byteutils import waku/utils/requests as request_utils import waku/waku_core/[topics/content_topic, message/message, time] import waku/requests/requests -logScope: - topics = "message envelope" - type MessageEnvelope* = object contentTopic*: ContentTopic diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 9bc28b2d5..c452d44c5 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -367,9 +367,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: await waku_dnsdisc.retrieveDynamicBootstrapNodes( dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers ) - except CatchableError: + except CatchableError as exc: Result[seq[RemotePeerInfo], string].err( - "Retrieving dynamic bootstrap nodes failed: " & getCurrentExceptionMsg() + "Retrieving dynamic bootstrap nodes failed: " & exc.msg ) if dynamicBootstrapNodesRes.isErr(): @@ -388,7 +388,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: (await updateWaku(waku)).isOkOr: return err("Error in updateApp: " & $error) except CatchableError: - return err("Error in updateApp: " & getCurrentExceptionMsg()) + return err("Caught exception in updateApp: " & getCurrentExceptionMsg()) ## Discv5 if conf.discv5Conf.isSome(): @@ -464,9 +464,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: filterClientReady = filterClientReady, details = $(healthReport) - ok(RequestNodeHealth(healthStatus: nodeHealth)) - except CatchableError: - err("Failed to read health report: " & getCurrentExceptionMsg()), + return ok(RequestNodeHealth(healthStatus: nodeHealth)) + except CatchableError as exc: + err("Failed to read health report: " & exc.msg), ).isOkOr: error "Failed to set RequestNodeHealth provider", error = error @@ -496,7 +496,8 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: return err("Starting monitoring and external interfaces failed: " & error) except CatchableError: return err( - "Starting monitoring and external interfaces failed: " & getCurrentExceptionMsg() + "Caught exception starting monitoring and external interfaces failed: " & + getCurrentExceptionMsg() ) waku[].healthMonitor.setOverallHealth(HealthStatus.READY) @@ -545,7 +546,4 @@ proc isModeEdgeAvailable*(waku: Waku): bool = waku.node.wakuRelay.isNil() and not waku.node.wakuStoreClient.isNil() and not waku.node.wakuFilterClient.isNil() and not waku.node.wakuLightPushClient.isNil() -proc isP2PReliabilityEnabled*(waku: Waku): bool = - return not waku.deliveryService.isNil() - {.pop.} diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 46c35201e..12780033a 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -196,7 +196,7 @@ proc startRecvService*(self: RecvService) = self.onSubscribeListener = OnFilterSubscribeEvent.listen( self.brokerCtx, - proc(subsEv: OnFilterSubscribeEvent): Future[void] {.async: (raises: []).} = + proc(subsEv: OnFilterSubscribeEvent) {.async: (raises: []).} = self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics), ).valueOr: error "Failed to set OnFilterSubscribeEvent listener", error = error @@ -204,7 +204,7 @@ proc startRecvService*(self: RecvService) = self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen( self.brokerCtx, - proc(subsEv: OnFilterUnsubscribeEvent): Future[void] {.async: (raises: []).} = + proc(subsEv: OnFilterUnsubscribeEvent) {.async: (raises: []).} = self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics), ).valueOr: error "Failed to set OnFilterUnsubscribeEvent listener", error = error diff --git a/waku/node/delivery_service/send_service/delivery_task.nim b/waku/node/delivery_service/send_service/delivery_task.nim index 0a32a9858..0ff151f6e 100644 --- a/waku/node/delivery_service/send_service/delivery_task.nim +++ b/waku/node/delivery_service/send_service/delivery_task.nim @@ -5,10 +5,12 @@ import waku/common/broker/broker_context type DeliveryState* {.pure.} = enum Entry SuccessfullyPropagated + # message is known to be sent to the network but not yet validated SuccessfullyValidated - FallbackRetry - NextRoundRetry - FailedToDeliver + # message is known to be stored at least on one store node, thus validated + FallbackRetry # retry sending with fallback processor if available + NextRoundRetry # try sending in next loop + FailedToDeliver # final state of failed delivery type DeliveryTask* = ref object requestId*: RequestId @@ -21,7 +23,7 @@ type DeliveryTask* = ref object propagateEventEmitted*: bool errorDesc*: string -proc create*( +proc new*( T: typedesc[DeliveryTask], requestId: RequestId, envelop: MessageEnvelope, @@ -32,8 +34,8 @@ proc create*( let relayShardRes = ( RequestRelayShard.request(brokerCtx, none[PubsubTopic](), envelop.contentTopic) ).valueOr: - echo "RequestRelayShard.request error", $error - return err($error) + error "RequestRelayShard.request failed", error = error + return err("Failed create DeliveryTask: " & $error) let pubsubTopic = relayShardRes.relayShard.toPubsubTopic() let msgHash = computeMessageHash(pubsubTopic, msg) diff --git a/waku/node/delivery_service/send_service/lightpush_processor.nim b/waku/node/delivery_service/send_service/lightpush_processor.nim index 5311ed267..40a754757 100644 --- a/waku/node/delivery_service/send_service/lightpush_processor.nim +++ b/waku/node/delivery_service/send_service/lightpush_processor.nim @@ -45,15 +45,16 @@ method sendImpl*( tryCount = task.tryCount let peer = self.peerManager.selectPeer(WakuLightPushCodec, some(task.pubsubTopic)).valueOr: + debug "No peer available for Lightpush, request pushed back for next round", + requestId = task.requestId task.state = DeliveryState.NextRoundRetry return - let pushResult = + let numLightpushServers = ( await self.lightpushClient.publish(some(task.pubsubTopic), task.msg, peer) - if pushResult.isErr: - error "LightpushSendProcessor sendImpl failed", - error = pushResult.error.desc.get($pushResult.error.code) - case pushResult.error.code + ).valueOr: + error "LightpushSendProcessor.sendImpl failed", error = error.desc.get($error.code) + case error.code of LightPushErrorCode.NO_PEERS_TO_RELAY, LightPushErrorCode.TOO_MANY_REQUESTS, LightPushErrorCode.OUT_OF_RLN_PROOF, LightPushErrorCode.SERVICE_NOT_AVAILABLE, LightPushErrorCode.INTERNAL_SERVER_ERROR: @@ -61,11 +62,11 @@ method sendImpl*( else: # the message is malformed, send error task.state = DeliveryState.FailedToDeliver - task.errorDesc = pushResult.error.desc.get($pushResult.error.code) + task.errorDesc = error.desc.get($error.code) task.deliveryTime = Moment.now() return - if pushResult.isOk and pushResult.get() > 0: + if numLightpushServers > 0: info "Message propagated via Lightpush", requestId = task.requestId, msgHash = task.msgHash.to0xHex() task.state = DeliveryState.SuccessfullyPropagated @@ -73,6 +74,8 @@ method sendImpl*( # TODO: with a simple retry processor it might be more accurate to say `Sent` else: # Controversial state, publish says ok but no peer. It should not happen. + debug "Lightpush publish returned zero peers, request pushed back for next round", + requestId = task.requestId task.state = DeliveryState.NextRoundRetry return diff --git a/waku/node/delivery_service/send_service/relay_processor.nim b/waku/node/delivery_service/send_service/relay_processor.nim index addb8ee36..94cb63776 100644 --- a/waku/node/delivery_service/send_service/relay_processor.nim +++ b/waku/node/delivery_service/send_service/relay_processor.nim @@ -1,5 +1,5 @@ -import chronos, chronicles import std/options +import chronos, chronicles import waku/[waku_core], waku/waku_lightpush/[common, rpc] import waku/requests/health_request import waku/common/broker/broker_context @@ -14,7 +14,7 @@ type RelaySendProcessor* = ref object of BaseSendProcessor fallbackStateToSet: DeliveryState proc new*( - T: type RelaySendProcessor, + T: typedesc[RelaySendProcessor], lightpushAvailable: bool, publishProc: PushMessageHandler, brokerCtx: BrokerContext, @@ -50,7 +50,7 @@ method isValidProcessor*( # return self.isTopicHealthy(task.pubsubTopic) return true -method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} = +method sendImpl*(self: RelaySendProcessor, task: DeliveryTask) {.async.} = task.tryCount.inc() info "Trying message delivery via Relay", requestId = task.requestId, @@ -70,7 +70,7 @@ method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.a if noOfPublishedPeers > 0: info "Message propagated via Relay", - requestId = task.requestId, msgHash = task.msgHash + requestId = task.requestId, msgHash = task.msgHash.to0xHex(), noOfPeers = noOfPublishedPeers task.state = DeliveryState.SuccessfullyPropagated task.deliveryTime = Moment.now() else: diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index 9d71cac96..f6a6ac94c 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -95,7 +95,10 @@ proc setupSendProcessorChain( return ok(processors[0]) proc new*( - T: type SendService, preferP2PReliability: bool, w: WakuNode, s: SubscriptionService + T: typedesc[SendService], + preferP2PReliability: bool, + w: WakuNode, + s: SubscriptionService, ): Result[T, string] = if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil(): return err( @@ -107,7 +110,7 @@ proc new*( let sendProcessorChain = setupSendProcessorChain( w.peerManager, w.wakuLightPushClient, w.wakuRelay, w.wakuRlnRelay, w.brokerCtx ).valueOr: - return err(error) + return err("failed to setup SendProcessorChain: " & $error) let sendService = SendService( brokerCtx: w.brokerCtx, @@ -137,6 +140,7 @@ proc checkMsgsInStore(self: SendService, tasksToValidate: seq[DeliveryTask]) {.a return var hashesToValidate = tasksToValidate.mapIt(it.msgHash) + # TODO: confirm hash format for store query!!! let storeResp: StoreQueryResponse = ( await self.node.wakuStoreClient.queryToAny( @@ -172,7 +176,7 @@ proc checkStoredMessages(self: SendService) {.async.} = proc reportTaskResult(self: SendService, task: DeliveryTask) = case task.state of DeliveryState.SuccessfullyPropagated: - # TODO: in case of of unable to strore check messages shall we report success instead? + # TODO: in case of unable to strore check messages shall we report success instead? if not task.propagateEventEmitted: info "Message successfully propagated", requestId = task.requestId, msgHash = task.msgHash.to0xHex() @@ -239,7 +243,7 @@ proc serviceLoop(self: SendService) {.async.} = await self.checkStoredMessages() self.evaluateAndCleanUp() ## TODO: add circuit breaker to avoid infinite looping in case of persistent failures - ## Use OnlienStateChange observers to pause/resume the loop + ## Use OnlineStateChange observers to pause/resume the loop await sleepAsync(ServiceLoopInterval) proc startSendService*(self: SendService) = diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index a2940db0c..a0a128449 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -194,7 +194,7 @@ proc publish*( let numPeers = (await node.wakuRelay.publish(pubsubTopic, message)).valueOr: warn "waku.relay did not publish", error = error # Todo: If NoPeersToPublish, we might want to return ok(0) instead!!! - return err($error) + return err("publish failed in relay: " & $error) notice "waku.relay published", peerId = node.peerId, diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 47e8bb2ad..d556811ac 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -57,7 +57,6 @@ import common/rate_limit/setting, common/callbacks, common/nimchronos, - common/broker/broker_context, waku_mix, requests/node_requests, common/broker/broker_context, @@ -470,7 +469,7 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] return ok() -proc startProvidersAndListeners*(node: WakuNode) = +proc startProvidersAndListeners(node: WakuNode) = RequestRelayShard.setProvider( node.brokerCtx, proc( @@ -480,9 +479,9 @@ proc startProvidersAndListeners*(node: WakuNode) = return err($error) return ok(RequestRelayShard(relayShard: shard)), ).isOkOr: - error "Can't set proveder for RequestRelayShard", error = error + error "Can't set provider for RequestRelayShard", error = error -proc stopProvidersAndListeners*(node: WakuNode) = +proc stopProvidersAndListeners(node: WakuNode) = RequestRelayShard.clearProvider(node.brokerCtx) proc start*(node: WakuNode) {.async.} = diff --git a/waku/waku_core/event/event_emitter.nim b/waku/waku_core/event/event_emitter.nim deleted file mode 100644 index ba6fd481c..000000000 --- a/waku/waku_core/event/event_emitter.nim +++ /dev/null @@ -1,20 +0,0 @@ - - -type - EventEmitter* = object - # Placeholder for future event emitter implementation - observers*: seq[proc (data: EventData): void] - - -proc initEventEmitter*(): EventEmitter = - EventEmitter(observers: @[]) - -proc emitEvent*(emitter: var EventEmitter, data: EventData) = - for observer in emitter.observers: - asyncSpawn observer(data) - -proc subscribeToEvent*(emitter: var EventEmitter, observer: proc (data: EventData): void) = - emitter.observers.add(observer) - -proc unsubscribeFromEvent*(emitter: var EventEmitter, observer: proc (data: EventData): void) = - emitter.observers = emitter.observers.filterIt(it != observer) diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index 816b6a83a..323cc6da8 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -10,7 +10,8 @@ import bearssl/rand, stew/byteutils import - waku/[node/peer_manager, waku_core, events/delivery_events, common/broker/broker_context], + waku/ + [node/peer_manager, waku_core, events/delivery_events, common/broker/broker_context], ./common, ./protocol_metrics, ./rpc_codec, @@ -150,7 +151,7 @@ proc unsubscribe*( ?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) - OnFilterUnSubscribeEvent.emit(pubSubTopic, contentTopicSeq) + OnFilterUnSubscribeEvent.emit(wfc.brokerCtx, pubsubTopic, contentTopicSeq) return ok() diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index f0e20d51f..fd12c49d2 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -106,8 +106,6 @@ proc publish*( let relayPeerCount = when dest is Connection: ?await wl.sendPushRequest(request, dest.peerId, some(dest)) - elif dest is RemotePeerInfo: - ?await wl.sendPushRequest(request, dest) else: ?await wl.sendPushRequest(request, dest) diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index a5f0850b4..7ece93ad5 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -459,7 +459,7 @@ proc mount( return ok(RequestGenerateRlnProof(proof: proof)) ).isOkOr: - return err("Proof generator provider cannot be set") + return err("Proof generator provider cannot be set: " & $error) return ok(wakuRlnRelay)