From 02c821ce327d83b02c4ea591c6541acdbc578c10 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Mon, 12 Jan 2026 10:14:09 +0100 Subject: [PATCH] Adapt code for using broker context --- waku/api/api.nim | 4 +- waku/api/types.nim | 15 ++-- waku/common/broker/broker_context.nim | 74 +++++++++++++++---- waku/factory/waku.nim | 7 +- .../recv_service/recv_service.nim | 22 ++++-- .../send_service/delivery_task.nim | 10 ++- .../send_service/lightpush_processor.nim | 7 +- .../send_service/relay_processor.nim | 15 ++-- .../send_service/send_processor.nim | 2 + .../send_service/send_service.nim | 23 ++++-- waku/node/waku_node.nim | 5 ++ waku/waku_filter_v2/client.nim | 10 ++- 12 files changed, 143 insertions(+), 51 deletions(-) diff --git a/waku/api/api.nim b/waku/api/api.nim index a63379f12..4079e9b0d 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -26,7 +26,7 @@ proc checkApiAvailability(w: Waku): Result[void, string] = # check if health is satisfactory # If Node is not healthy, return err("Waku node is not healthy") - let healthStatus = RequestNodeHealth.request() + let healthStatus = RequestNodeHealth.request(w.brokerCtx) if healthStatus.isErr(): warn "Failed to get Waku node health status: ", error = healthStatus.error @@ -44,7 +44,7 @@ proc send*( let requestId = newRequestId(w.rng) - let deliveryTask = DeliveryTask.create(requestId, envelope).valueOr: + let deliveryTask = DeliveryTask.create(requestId, envelope, w.brokerCtx).valueOr: return err("Failed to create delivery task: " & error) asyncSpawn w.deliveryService.sendService.send(deliveryTask) diff --git a/waku/api/types.nim b/waku/api/types.nim index c42718d1b..e78cd87e3 100644 --- a/waku/api/types.nim +++ b/waku/api/types.nim @@ -51,14 +51,15 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = timestamp: getNanosecondTime(getTime().toUnixFloat()), ) - # TODO: First find out if proof is needed at all - let requestedProof = ( - waitFor RequestGenerateRlnProof.request(wm, getTime().toUnixFloat()) - ).valueOr: - warn "Failed to add RLN proof to WakuMessage: ", error = error - return wm + ## TODO: First find out if proof is needed at all + ## Follow up: left it to the send logic to add RLN proof if needed and possible + # let requestedProof = ( + # waitFor RequestGenerateRlnProof.request(wm, getTime().toUnixFloat()) + # ).valueOr: + # warn "Failed to add RLN proof to WakuMessage: ", error = error + # return wm - wm.proof = requestedProof.proof + # wm.proof = requestedProof.proof return wm {.pop.} diff --git a/waku/common/broker/broker_context.nim b/waku/common/broker/broker_context.nim index 1b8235f6a..483a2e3a7 100644 --- a/waku/common/broker/broker_context.nim +++ b/waku/common/broker/broker_context.nim @@ -1,26 +1,68 @@ -import std/[strutils, sysrand] +{.push raises: [].} + +import std/[strutils, concurrency/atomics], chronos type BrokerContext* = distinct uint32 -func `==`*(a, b: BrokerContext): bool {.borrow.} +func `==`*(a, b: BrokerContext): bool = + uint32(a) == uint32(b) + +func `!=`*(a, b: BrokerContext): bool = + uint32(a) != uint32(b) func `$`*(bc: BrokerContext): string = toHex(uint32(bc), 8) const DefaultBrokerContext* = BrokerContext(0xCAFFE14E'u32) -proc NewBrokerContext*(): BrokerContext = - ## Generates a random non-default broker context (as a raw uint32). +# Global broker context accessor. +# +# NOTE: This intentionally creates a *single* active BrokerContext per process +# (per event loop thread). Use only if you accept serialization of all broker +# context usage through the lock. +var globalBrokerContextLock {.threadvar.}: AsyncLock +globalBrokerContextLock = newAsyncLock() +var globalBrokerContextValue {.threadvar.}: BrokerContext +globalBrokerContextValue = DefaultBrokerContext +proc globalBrokerContext*(): BrokerContext = + ## Returns the currently active global broker context. ## - ## The default broker context is reserved for the provider at index 0. - ## This helper never returns that value. - for _ in 0 ..< 16: - let b = urandom(4) - if b.len != 4: - continue - let key = - (uint32(b[0]) shl 24) or (uint32(b[1]) shl 16) or (uint32(b[2]) shl 8) or - uint32(b[3]) - if key != uint32(DefaultBrokerContext): - return BrokerContext(key) - BrokerContext(1'u32) + ## This is intentionally lock-free; callers should use it inside + ## `withNewGlobalBrokerContext` / `withGlobalBrokerContext`. + globalBrokerContextValue + +var gContextCounter: Atomic[uint32] + +proc NewBrokerContext*(): BrokerContext = + var nextId = gContextCounter.fetchAdd(1, moRelaxed) + if nextId == uint32(DefaultBrokerContext): + nextId = gContextCounter.fetchAdd(1, moRelaxed) + return BrokerContext(nextId) + +template lockGlobalBrokerContext*(brokerCtx: BrokerContext, body: untyped): untyped = + ## Runs `body` while holding the global broker context lock with the provided + ## `brokerCtx` installed as the globally accessible context. + ## + ## This template is intended for use from within `chronos` async procs. + block: + await noCancel(globalBrokerContextLock.acquire()) + let previousBrokerCtx = globalBrokerContextValue + globalBrokerContextValue = brokerCtx + try: + body + finally: + globalBrokerContextValue = previousBrokerCtx + try: + globalBrokerContextLock.release() + except AsyncLockError: + doAssert false, "globalBrokerContextLock.release(): lock not held" + +template lockNewGlobalBrokerContext*(body: untyped): untyped = + ## Runs `body` while holding the global broker context lock with a freshly + ## generated broker context installed as the global accessor. + ## + ## The previous global broker context (if any) is restored on exit. + lockGlobalBrokerContext(NewBrokerContext()): + body + +{.pop.} diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 40b9110dd..970b36eee 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -44,7 +44,8 @@ import ../factory/internal_config, ../factory/app_callbacks, ../waku_enr/multiaddr, - ./waku_conf + ./waku_conf, + ../common/broker/broker_context logScope: topics = "wakunode waku" @@ -75,6 +76,8 @@ type Waku* = ref object metricsServer*: MetricsHttpServerRef appCallbacks*: AppCallbacks + brokerCtx*: BrokerContext + func version*(waku: Waku): string = waku.version @@ -163,6 +166,7 @@ proc new*( T: type Waku, wakuConf: WakuConf, appCallbacks: AppCallbacks = nil ): Future[Result[Waku, string]] {.async.} = let rng = crypto.newRng() + let brokerCtx = globalBrokerContext() logging.setupLog(wakuConf.logLevel, wakuConf.logFormat) @@ -213,6 +217,7 @@ proc new*( deliveryService: deliveryService, appCallbacks: appCallbacks, restServer: restServer, + brokerCtx: brokerCtx, ) waku.setupSwitchServices(wakuConf, relay, rng) diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 9fe98b4bd..91fe7ec4e 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -13,6 +13,7 @@ import waku_core/topics, events/delivery_events, waku_node, + common/broker/broker_context, ] const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries @@ -32,6 +33,7 @@ type RecvMessage = object ## timestamp of the rx message. We will not keep the rx messages forever type RecvService* = ref object of RootObj + brokerCtx: BrokerContext topicsInterest: Table[PubsubTopic, seq[ContentTopic]] ## Tracks message verification requests and when was the last time a ## pubsub topic was verified for missing messages @@ -76,7 +78,12 @@ proc performDeliveryFeedback( success, dir, comment, msg_hash = shortLog(msgHash) DeliveryFeedbackEvent.emit( - success = success, dir = dir, comment = comment, msgHash = msgHash, msg = msg + brokerCtx = self.brokerCtx, + success = success, + dir = dir, + comment = comment, + msgHash = msgHash, + msg = msg, ) proc msgChecker(self: RecvService) {.async.} = @@ -153,7 +160,8 @@ proc new*(T: type RecvService, node: WakuNode): T = ## The storeClient will help to acquire any possible missed messages let now = getNowInNanosecondTime() - var recvService = RecvService(node: node, startTimeToCheck: now) + var recvService = + RecvService(node: node, startTimeToCheck: now, brokerCtx: node.brokerCtx) if not node.wakuFilterClient.isNil(): let filterPushHandler = proc( @@ -180,22 +188,24 @@ proc startRecvService*(self: RecvService) = self.msgPrunerHandler = self.loopPruneOldMessages() self.onSubscribeListener = OnFilterSubscribeEvent.listen( + self.brokerCtx, proc(subsEv: OnFilterSubscribeEvent): Future[void] {.async: (raises: []).} = - self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics) + self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics), ).valueOr: error "Failed to set OnFilterSubscribeEvent listener", error = error quit(QuitFailure) self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen( + self.brokerCtx, proc(subsEv: OnFilterUnsubscribeEvent): Future[void] {.async: (raises: []).} = - self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics) + self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics), ).valueOr: error "Failed to set OnFilterUnsubscribeEvent listener", error = error quit(QuitFailure) proc stopRecvService*(self: RecvService) {.async.} = - OnFilterSubscribeEvent.dropListener(self.onSubscribeListener) - OnFilterUnSubscribeEvent.dropListener(self.onUnsubscribeListener) + OnFilterSubscribeEvent.dropListener(self.brokerCtx, self.onSubscribeListener) + OnFilterUnsubscribeEvent.dropListener(self.brokerCtx, self.onUnsubscribeListener) if not self.msgCheckerHandler.isNil(): await self.msgCheckerHandler.cancelAndWait() if not self.msgPrunerHandler.isNil(): diff --git a/waku/node/delivery_service/send_service/delivery_task.nim b/waku/node/delivery_service/send_service/delivery_task.nim index efd272ad7..0f3c9d902 100644 --- a/waku/node/delivery_service/send_service/delivery_task.nim +++ b/waku/node/delivery_service/send_service/delivery_task.nim @@ -1,5 +1,6 @@ import std/[options, times], chronos import waku/waku_core, waku/api/types, waku/requests/node_requests +import waku/common/broker/broker_context type DeliveryState* {.pure.} = enum Entry @@ -20,12 +21,17 @@ type DeliveryTask* = ref object errorDesc*: string proc create*( - T: type DeliveryTask, requestId: RequestId, envelop: MessageEnvelope + T: type DeliveryTask, + requestId: RequestId, + envelop: MessageEnvelope, + brokerCtx: BrokerContext, ): Result[T, string] = let msg = envelop.toWakuMessage() # TODO: use sync request for such as soon as available let relayShardRes = ( - waitFor RequestRelayShard.request(none[PubsubTopic](), envelop.contentTopic) + waitFor RequestRelayShard.request( + brokerCtx, none[PubsubTopic](), envelop.contentTopic + ) ).valueOr: return err($error) diff --git a/waku/node/delivery_service/send_service/lightpush_processor.nim b/waku/node/delivery_service/send_service/lightpush_processor.nim index 1233b3bc6..f6f077ff7 100644 --- a/waku/node/delivery_service/send_service/lightpush_processor.nim +++ b/waku/node/delivery_service/send_service/lightpush_processor.nim @@ -5,7 +5,8 @@ import waku/waku_node, waku/waku_core, waku/node/peer_manager, - waku/waku_lightpush/[callbacks, common, client, rpc] + waku/waku_lightpush/[callbacks, common, client, rpc], + waku/common/broker/broker_context import ./[delivery_task, send_processor] @@ -20,8 +21,10 @@ proc new*( T: type LightpushSendProcessor, peerManager: PeerManager, lightpushClient: WakuLightPushClient, + brokerCtx: BrokerContext, ): T = - return T(peerManager: peerManager, lightpushClient: lightpushClient) + return + T(peerManager: peerManager, lightpushClient: lightpushClient, brokerCtx: brokerCtx) proc isLightpushPeerAvailable( self: LightpushSendProcessor, pubsubTopic: PubsubTopic diff --git a/waku/node/delivery_service/send_service/relay_processor.nim b/waku/node/delivery_service/send_service/relay_processor.nim index 51a68c839..4b826bd93 100644 --- a/waku/node/delivery_service/send_service/relay_processor.nim +++ b/waku/node/delivery_service/send_service/relay_processor.nim @@ -2,6 +2,7 @@ import chronos, chronicles import std/options import waku/[waku_node, waku_core], waku/waku_lightpush/[common, callbacks, rpc] import waku/requests/health_request +import waku/common/broker/broker_context import waku/api/types import ./[delivery_task, send_processor] @@ -16,6 +17,7 @@ proc new*( T: type RelaySendProcessor, lightpushAvailable: bool, publishProc: PushMessageHandler, + brokerCtx: BrokerContext, ): RelaySendProcessor = let fallbackStateToSet = if lightpushAvailable: @@ -23,11 +25,14 @@ proc new*( else: DeliveryState.FailedToDeliver - return - RelaySendProcessor(publishProc: publishProc, fallbackStateToSet: fallbackStateToSet) + return RelaySendProcessor( + publishProc: publishProc, + fallbackStateToSet: fallbackStateToSet, + brokerCtx: brokerCtx, + ) -proc isTopicHealthy(topic: PubsubTopic): bool {.gcsafe.} = - let healthReport = RequestRelayTopicsHealth.request(@[topic]).valueOr: +proc isTopicHealthy(self: RelaySendProcessor, topic: PubsubTopic): bool {.gcsafe.} = + let healthReport = RequestRelayTopicsHealth.request(self.brokerCtx, @[topic]).valueOr: return false if healthReport.topicHealth.len() < 1: @@ -38,7 +43,7 @@ proc isTopicHealthy(topic: PubsubTopic): bool {.gcsafe.} = method isValidProcessor*( self: RelaySendProcessor, task: DeliveryTask ): bool {.gcsafe.} = - return isTopicHealthy(task.pubsubTopic) + return self.isTopicHealthy(task.pubsubTopic) method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} = task.tryCount.inc() diff --git a/waku/node/delivery_service/send_service/send_processor.nim b/waku/node/delivery_service/send_service/send_processor.nim index 9de425d9f..0108eacd0 100644 --- a/waku/node/delivery_service/send_service/send_processor.nim +++ b/waku/node/delivery_service/send_service/send_processor.nim @@ -1,10 +1,12 @@ import chronos import ./delivery_task +import waku/common/broker/broker_context {.push raises: [].} type BaseSendProcessor* = ref object of RootObj fallbackProcessor*: BaseSendProcessor + brokerCtx*: BrokerContext proc chain*(self: BaseSendProcessor, next: BaseSendProcessor) = self.fallbackProcessor = next diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index 82e27d637..1a54da4a7 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -18,6 +18,7 @@ import waku_lightpush/callbacks, events/delivery_events, events/message_events, + common/broker/broker_context, ] logScope: @@ -48,6 +49,7 @@ const ArchiveTime = chronos.seconds(3) ## received and archived by a store node type SendService* = ref object of RootObj + brokerCtx: BrokerContext taskCache: seq[DeliveryTask] ## Cache that contains the delivery task per message hash. ## This is needed to make sure the published messages are properly published @@ -63,6 +65,7 @@ proc setupSendProcessorChain( lightpushClient: WakuLightPushClient, relay: WakuRelay, rlnRelay: WakuRLNRelay, + brokerCtx: BrokerContext, ): Result[BaseSendProcessor, string] = let isRelayAvail = not relay.isNil() let isLightPushAvail = not lightpushClient.isNil() @@ -80,9 +83,9 @@ proc setupSendProcessorChain( some(rlnRelay) let publishProc = getRelayPushHandler(relay, rln) - processors.add(RelaySendProcessor.new(isLightPushAvail, publishProc)) + processors.add(RelaySendProcessor.new(isLightPushAvail, publishProc, brokerCtx)) if isLightPushAvail: - processors.add(LightpushSendProcessor.new(peerManager, lightpushClient)) + processors.add(LightpushSendProcessor.new(peerManager, lightpushClient, brokerCtx)) var currentProcessor: BaseSendProcessor = processors[0] for i in 1 ..< processors.len(): @@ -102,11 +105,12 @@ proc new*( let checkStoreForMessages = preferP2PReliability and not w.wakuStoreClient.isNil() let sendProcessorChain = setupSendProcessorChain( - w.peerManager, w.wakuLightPushClient, w.wakuRelay, w.wakuRlnRelay + w.peerManager, w.wakuLightPushClient, w.wakuRelay, w.wakuRlnRelay, w.brokerCtx ).valueOr: return err(error) let sendService = SendService( + brokerCtx: w.brokerCtx, taskCache: newSeq[DeliveryTask](), serviceLoopHandle: nil, sendProcessor: sendProcessorChain, @@ -170,16 +174,18 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) = # TODO: in case of of unable to strore check messages shall we report success instead? info "Message successfully propagated", requestId = task.requestId, msgHash = task.msgHash - MessagePropagatedEvent.emit(task.requestId, task.msgHash.toString()) + MessagePropagatedEvent.emit(self.brokerCtx, task.requestId, task.msgHash.toString()) return of DeliveryState.SuccessfullyValidated: info "Message successfully sent", requestId = task.requestId, msgHash = task.msgHash - MessageSentEvent.emit(task.requestId, task.msgHash.toString()) + MessageSentEvent.emit(self.brokerCtx, task.requestId, task.msgHash.toString()) return of DeliveryState.FailedToDeliver: error "Failed to send message", requestId = task.requestId, msgHash = task.msgHash, error = task.errorDesc - MessageErrorEvent.emit(task.requestId, task.msgHash.toString(), task.errorDesc) + MessageErrorEvent.emit( + self.brokerCtx, task.requestId, task.msgHash.toString(), task.errorDesc + ) return else: # rest of the states are intermediate and does not translate to event @@ -190,7 +196,10 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) = requestId = task.requestId, msgHash = task.msgHash, error = "Message too old" task.state = DeliveryState.FailedToDeliver MessageErrorEvent.emit( - task.requestId, task.msgHash.toString(), "Unable to send within retry time window" + self.brokerCtx, + task.requestId, + task.msgHash.toString(), + "Unable to send within retry time window", ) proc evaluateAndCleanUp(self: SendService) = diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index dc3f0b3a3..e946fbb35 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -57,6 +57,7 @@ import common/rate_limit/setting, common/callbacks, common/nimchronos, + common/broker/broker_context, waku_mix, requests/node_requests, ], @@ -126,6 +127,7 @@ type enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext + brokerCtx*: BrokerContext wakuRendezvous*: WakuRendezVous wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient announcedAddresses*: seq[MultiAddress] @@ -192,11 +194,14 @@ proc new*( info "Initializing networking", addrs = $netConfig.announcedAddresses + let brokerCtx = globalBrokerContext() + let queue = newAsyncEventQueue[SubscriptionEvent](0) let node = WakuNode( peerManager: peerManager, switch: switch, rng: rng, + brokerCtx: brokerCtx, enr: enr, announcedAddresses: netConfig.announcedAddresses, topicSubscriptionQueue: queue, diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index 65aa0c8fa..816b6a83a 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -10,7 +10,7 @@ import bearssl/rand, stew/byteutils import - waku/[node/peer_manager, waku_core, events/delivery_events], + waku/[node/peer_manager, waku_core, events/delivery_events, common/broker/broker_context], ./common, ./protocol_metrics, ./rpc_codec, @@ -20,6 +20,7 @@ logScope: topics = "waku filter client" type WakuFilterClient* = ref object of LPProtocol + brokerCtx: BrokerContext rng: ref HmacDrbgContext peerManager: PeerManager pushHandlers: seq[FilterPushHandler] @@ -126,7 +127,7 @@ proc subscribe*( ?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) - OnFilterSubscribeEvent.emit(pubSubTopic, contentTopicSeq) + OnFilterSubscribeEvent.emit(wfc.brokerCtx, pubsubTopic, contentTopicSeq) return ok() @@ -202,6 +203,9 @@ proc initProtocolHandler(wfc: WakuFilterClient) = proc new*( T: type WakuFilterClient, peerManager: PeerManager, rng: ref HmacDrbgContext ): T = - let wfc = WakuFilterClient(rng: rng, peerManager: peerManager, pushHandlers: @[]) + let brokerCtx = globalBrokerContext() + let wfc = WakuFilterClient( + brokerCtx: brokerCtx, rng: rng, peerManager: peerManager, pushHandlers: @[] + ) wfc.initProtocolHandler() wfc