Adapt code for using broker context

This commit is contained in:
NagyZoltanPeter 2026-01-12 10:14:09 +01:00
parent 0f2574e1ec
commit 02c821ce32
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
12 changed files with 143 additions and 51 deletions

View File

@ -26,7 +26,7 @@ proc checkApiAvailability(w: Waku): Result[void, string] =
# check if health is satisfactory
# If Node is not healthy, return err("Waku node is not healthy")
let healthStatus = RequestNodeHealth.request()
let healthStatus = RequestNodeHealth.request(w.brokerCtx)
if healthStatus.isErr():
warn "Failed to get Waku node health status: ", error = healthStatus.error
@ -44,7 +44,7 @@ proc send*(
let requestId = newRequestId(w.rng)
let deliveryTask = DeliveryTask.create(requestId, envelope).valueOr:
let deliveryTask = DeliveryTask.create(requestId, envelope, w.brokerCtx).valueOr:
return err("Failed to create delivery task: " & error)
asyncSpawn w.deliveryService.sendService.send(deliveryTask)

View File

@ -51,14 +51,15 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
timestamp: getNanosecondTime(getTime().toUnixFloat()),
)
# TODO: First find out if proof is needed at all
let requestedProof = (
waitFor RequestGenerateRlnProof.request(wm, getTime().toUnixFloat())
).valueOr:
warn "Failed to add RLN proof to WakuMessage: ", error = error
return wm
## TODO: First find out if proof is needed at all
## Follow up: left it to the send logic to add RLN proof if needed and possible
# let requestedProof = (
# waitFor RequestGenerateRlnProof.request(wm, getTime().toUnixFloat())
# ).valueOr:
# warn "Failed to add RLN proof to WakuMessage: ", error = error
# return wm
wm.proof = requestedProof.proof
# wm.proof = requestedProof.proof
return wm
{.pop.}

View File

@ -1,26 +1,68 @@
import std/[strutils, sysrand]
{.push raises: [].}
import std/[strutils, concurrency/atomics], chronos
type BrokerContext* = distinct uint32
func `==`*(a, b: BrokerContext): bool {.borrow.}
func `==`*(a, b: BrokerContext): bool =
uint32(a) == uint32(b)
func `!=`*(a, b: BrokerContext): bool =
uint32(a) != uint32(b)
func `$`*(bc: BrokerContext): string =
toHex(uint32(bc), 8)
const DefaultBrokerContext* = BrokerContext(0xCAFFE14E'u32)
proc NewBrokerContext*(): BrokerContext =
## Generates a random non-default broker context (as a raw uint32).
# Global broker context accessor.
#
# NOTE: This intentionally creates a *single* active BrokerContext per process
# (per event loop thread). Use only if you accept serialization of all broker
# context usage through the lock.
var globalBrokerContextLock {.threadvar.}: AsyncLock
globalBrokerContextLock = newAsyncLock()
var globalBrokerContextValue {.threadvar.}: BrokerContext
globalBrokerContextValue = DefaultBrokerContext
proc globalBrokerContext*(): BrokerContext =
## Returns the currently active global broker context.
##
## The default broker context is reserved for the provider at index 0.
## This helper never returns that value.
for _ in 0 ..< 16:
let b = urandom(4)
if b.len != 4:
continue
let key =
(uint32(b[0]) shl 24) or (uint32(b[1]) shl 16) or (uint32(b[2]) shl 8) or
uint32(b[3])
if key != uint32(DefaultBrokerContext):
return BrokerContext(key)
BrokerContext(1'u32)
## This is intentionally lock-free; callers should use it inside
## `withNewGlobalBrokerContext` / `withGlobalBrokerContext`.
globalBrokerContextValue
var gContextCounter: Atomic[uint32]
proc NewBrokerContext*(): BrokerContext =
var nextId = gContextCounter.fetchAdd(1, moRelaxed)
if nextId == uint32(DefaultBrokerContext):
nextId = gContextCounter.fetchAdd(1, moRelaxed)
return BrokerContext(nextId)
template lockGlobalBrokerContext*(brokerCtx: BrokerContext, body: untyped): untyped =
## Runs `body` while holding the global broker context lock with the provided
## `brokerCtx` installed as the globally accessible context.
##
## This template is intended for use from within `chronos` async procs.
block:
await noCancel(globalBrokerContextLock.acquire())
let previousBrokerCtx = globalBrokerContextValue
globalBrokerContextValue = brokerCtx
try:
body
finally:
globalBrokerContextValue = previousBrokerCtx
try:
globalBrokerContextLock.release()
except AsyncLockError:
doAssert false, "globalBrokerContextLock.release(): lock not held"
template lockNewGlobalBrokerContext*(body: untyped): untyped =
## Runs `body` while holding the global broker context lock with a freshly
## generated broker context installed as the global accessor.
##
## The previous global broker context (if any) is restored on exit.
lockGlobalBrokerContext(NewBrokerContext()):
body
{.pop.}

View File

@ -44,7 +44,8 @@ import
../factory/internal_config,
../factory/app_callbacks,
../waku_enr/multiaddr,
./waku_conf
./waku_conf,
../common/broker/broker_context
logScope:
topics = "wakunode waku"
@ -75,6 +76,8 @@ type Waku* = ref object
metricsServer*: MetricsHttpServerRef
appCallbacks*: AppCallbacks
brokerCtx*: BrokerContext
func version*(waku: Waku): string =
waku.version
@ -163,6 +166,7 @@ proc new*(
T: type Waku, wakuConf: WakuConf, appCallbacks: AppCallbacks = nil
): Future[Result[Waku, string]] {.async.} =
let rng = crypto.newRng()
let brokerCtx = globalBrokerContext()
logging.setupLog(wakuConf.logLevel, wakuConf.logFormat)
@ -213,6 +217,7 @@ proc new*(
deliveryService: deliveryService,
appCallbacks: appCallbacks,
restServer: restServer,
brokerCtx: brokerCtx,
)
waku.setupSwitchServices(wakuConf, relay, rng)

View File

@ -13,6 +13,7 @@ import
waku_core/topics,
events/delivery_events,
waku_node,
common/broker/broker_context,
]
const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries
@ -32,6 +33,7 @@ type RecvMessage = object
## timestamp of the rx message. We will not keep the rx messages forever
type RecvService* = ref object of RootObj
brokerCtx: BrokerContext
topicsInterest: Table[PubsubTopic, seq[ContentTopic]]
## Tracks message verification requests and when was the last time a
## pubsub topic was verified for missing messages
@ -76,7 +78,12 @@ proc performDeliveryFeedback(
success, dir, comment, msg_hash = shortLog(msgHash)
DeliveryFeedbackEvent.emit(
success = success, dir = dir, comment = comment, msgHash = msgHash, msg = msg
brokerCtx = self.brokerCtx,
success = success,
dir = dir,
comment = comment,
msgHash = msgHash,
msg = msg,
)
proc msgChecker(self: RecvService) {.async.} =
@ -153,7 +160,8 @@ proc new*(T: type RecvService, node: WakuNode): T =
## The storeClient will help to acquire any possible missed messages
let now = getNowInNanosecondTime()
var recvService = RecvService(node: node, startTimeToCheck: now)
var recvService =
RecvService(node: node, startTimeToCheck: now, brokerCtx: node.brokerCtx)
if not node.wakuFilterClient.isNil():
let filterPushHandler = proc(
@ -180,22 +188,24 @@ proc startRecvService*(self: RecvService) =
self.msgPrunerHandler = self.loopPruneOldMessages()
self.onSubscribeListener = OnFilterSubscribeEvent.listen(
self.brokerCtx,
proc(subsEv: OnFilterSubscribeEvent): Future[void] {.async: (raises: []).} =
self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics)
self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
).valueOr:
error "Failed to set OnFilterSubscribeEvent listener", error = error
quit(QuitFailure)
self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen(
self.brokerCtx,
proc(subsEv: OnFilterUnsubscribeEvent): Future[void] {.async: (raises: []).} =
self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics)
self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
).valueOr:
error "Failed to set OnFilterUnsubscribeEvent listener", error = error
quit(QuitFailure)
proc stopRecvService*(self: RecvService) {.async.} =
OnFilterSubscribeEvent.dropListener(self.onSubscribeListener)
OnFilterUnSubscribeEvent.dropListener(self.onUnsubscribeListener)
OnFilterSubscribeEvent.dropListener(self.brokerCtx, self.onSubscribeListener)
OnFilterUnsubscribeEvent.dropListener(self.brokerCtx, self.onUnsubscribeListener)
if not self.msgCheckerHandler.isNil():
await self.msgCheckerHandler.cancelAndWait()
if not self.msgPrunerHandler.isNil():

View File

@ -1,5 +1,6 @@
import std/[options, times], chronos
import waku/waku_core, waku/api/types, waku/requests/node_requests
import waku/common/broker/broker_context
type DeliveryState* {.pure.} = enum
Entry
@ -20,12 +21,17 @@ type DeliveryTask* = ref object
errorDesc*: string
proc create*(
T: type DeliveryTask, requestId: RequestId, envelop: MessageEnvelope
T: type DeliveryTask,
requestId: RequestId,
envelop: MessageEnvelope,
brokerCtx: BrokerContext,
): Result[T, string] =
let msg = envelop.toWakuMessage()
# TODO: use sync request for such as soon as available
let relayShardRes = (
waitFor RequestRelayShard.request(none[PubsubTopic](), envelop.contentTopic)
waitFor RequestRelayShard.request(
brokerCtx, none[PubsubTopic](), envelop.contentTopic
)
).valueOr:
return err($error)

View File

@ -5,7 +5,8 @@ import
waku/waku_node,
waku/waku_core,
waku/node/peer_manager,
waku/waku_lightpush/[callbacks, common, client, rpc]
waku/waku_lightpush/[callbacks, common, client, rpc],
waku/common/broker/broker_context
import ./[delivery_task, send_processor]
@ -20,8 +21,10 @@ proc new*(
T: type LightpushSendProcessor,
peerManager: PeerManager,
lightpushClient: WakuLightPushClient,
brokerCtx: BrokerContext,
): T =
return T(peerManager: peerManager, lightpushClient: lightpushClient)
return
T(peerManager: peerManager, lightpushClient: lightpushClient, brokerCtx: brokerCtx)
proc isLightpushPeerAvailable(
self: LightpushSendProcessor, pubsubTopic: PubsubTopic

View File

@ -2,6 +2,7 @@ import chronos, chronicles
import std/options
import waku/[waku_node, waku_core], waku/waku_lightpush/[common, callbacks, rpc]
import waku/requests/health_request
import waku/common/broker/broker_context
import waku/api/types
import ./[delivery_task, send_processor]
@ -16,6 +17,7 @@ proc new*(
T: type RelaySendProcessor,
lightpushAvailable: bool,
publishProc: PushMessageHandler,
brokerCtx: BrokerContext,
): RelaySendProcessor =
let fallbackStateToSet =
if lightpushAvailable:
@ -23,11 +25,14 @@ proc new*(
else:
DeliveryState.FailedToDeliver
return
RelaySendProcessor(publishProc: publishProc, fallbackStateToSet: fallbackStateToSet)
return RelaySendProcessor(
publishProc: publishProc,
fallbackStateToSet: fallbackStateToSet,
brokerCtx: brokerCtx,
)
proc isTopicHealthy(topic: PubsubTopic): bool {.gcsafe.} =
let healthReport = RequestRelayTopicsHealth.request(@[topic]).valueOr:
proc isTopicHealthy(self: RelaySendProcessor, topic: PubsubTopic): bool {.gcsafe.} =
let healthReport = RequestRelayTopicsHealth.request(self.brokerCtx, @[topic]).valueOr:
return false
if healthReport.topicHealth.len() < 1:
@ -38,7 +43,7 @@ proc isTopicHealthy(topic: PubsubTopic): bool {.gcsafe.} =
method isValidProcessor*(
self: RelaySendProcessor, task: DeliveryTask
): bool {.gcsafe.} =
return isTopicHealthy(task.pubsubTopic)
return self.isTopicHealthy(task.pubsubTopic)
method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} =
task.tryCount.inc()

View File

@ -1,10 +1,12 @@
import chronos
import ./delivery_task
import waku/common/broker/broker_context
{.push raises: [].}
type BaseSendProcessor* = ref object of RootObj
fallbackProcessor*: BaseSendProcessor
brokerCtx*: BrokerContext
proc chain*(self: BaseSendProcessor, next: BaseSendProcessor) =
self.fallbackProcessor = next

View File

@ -18,6 +18,7 @@ import
waku_lightpush/callbacks,
events/delivery_events,
events/message_events,
common/broker/broker_context,
]
logScope:
@ -48,6 +49,7 @@ const ArchiveTime = chronos.seconds(3)
## received and archived by a store node
type SendService* = ref object of RootObj
brokerCtx: BrokerContext
taskCache: seq[DeliveryTask]
## Cache that contains the delivery task per message hash.
## This is needed to make sure the published messages are properly published
@ -63,6 +65,7 @@ proc setupSendProcessorChain(
lightpushClient: WakuLightPushClient,
relay: WakuRelay,
rlnRelay: WakuRLNRelay,
brokerCtx: BrokerContext,
): Result[BaseSendProcessor, string] =
let isRelayAvail = not relay.isNil()
let isLightPushAvail = not lightpushClient.isNil()
@ -80,9 +83,9 @@ proc setupSendProcessorChain(
some(rlnRelay)
let publishProc = getRelayPushHandler(relay, rln)
processors.add(RelaySendProcessor.new(isLightPushAvail, publishProc))
processors.add(RelaySendProcessor.new(isLightPushAvail, publishProc, brokerCtx))
if isLightPushAvail:
processors.add(LightpushSendProcessor.new(peerManager, lightpushClient))
processors.add(LightpushSendProcessor.new(peerManager, lightpushClient, brokerCtx))
var currentProcessor: BaseSendProcessor = processors[0]
for i in 1 ..< processors.len():
@ -102,11 +105,12 @@ proc new*(
let checkStoreForMessages = preferP2PReliability and not w.wakuStoreClient.isNil()
let sendProcessorChain = setupSendProcessorChain(
w.peerManager, w.wakuLightPushClient, w.wakuRelay, w.wakuRlnRelay
w.peerManager, w.wakuLightPushClient, w.wakuRelay, w.wakuRlnRelay, w.brokerCtx
).valueOr:
return err(error)
let sendService = SendService(
brokerCtx: w.brokerCtx,
taskCache: newSeq[DeliveryTask](),
serviceLoopHandle: nil,
sendProcessor: sendProcessorChain,
@ -170,16 +174,18 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) =
# TODO: in case of of unable to strore check messages shall we report success instead?
info "Message successfully propagated",
requestId = task.requestId, msgHash = task.msgHash
MessagePropagatedEvent.emit(task.requestId, task.msgHash.toString())
MessagePropagatedEvent.emit(self.brokerCtx, task.requestId, task.msgHash.toString())
return
of DeliveryState.SuccessfullyValidated:
info "Message successfully sent", requestId = task.requestId, msgHash = task.msgHash
MessageSentEvent.emit(task.requestId, task.msgHash.toString())
MessageSentEvent.emit(self.brokerCtx, task.requestId, task.msgHash.toString())
return
of DeliveryState.FailedToDeliver:
error "Failed to send message",
requestId = task.requestId, msgHash = task.msgHash, error = task.errorDesc
MessageErrorEvent.emit(task.requestId, task.msgHash.toString(), task.errorDesc)
MessageErrorEvent.emit(
self.brokerCtx, task.requestId, task.msgHash.toString(), task.errorDesc
)
return
else:
# rest of the states are intermediate and does not translate to event
@ -190,7 +196,10 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) =
requestId = task.requestId, msgHash = task.msgHash, error = "Message too old"
task.state = DeliveryState.FailedToDeliver
MessageErrorEvent.emit(
task.requestId, task.msgHash.toString(), "Unable to send within retry time window"
self.brokerCtx,
task.requestId,
task.msgHash.toString(),
"Unable to send within retry time window",
)
proc evaluateAndCleanUp(self: SendService) =

View File

@ -57,6 +57,7 @@ import
common/rate_limit/setting,
common/callbacks,
common/nimchronos,
common/broker/broker_context,
waku_mix,
requests/node_requests,
],
@ -126,6 +127,7 @@ type
enr*: enr.Record
libp2pPing*: Ping
rng*: ref rand.HmacDrbgContext
brokerCtx*: BrokerContext
wakuRendezvous*: WakuRendezVous
wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient
announcedAddresses*: seq[MultiAddress]
@ -192,11 +194,14 @@ proc new*(
info "Initializing networking", addrs = $netConfig.announcedAddresses
let brokerCtx = globalBrokerContext()
let queue = newAsyncEventQueue[SubscriptionEvent](0)
let node = WakuNode(
peerManager: peerManager,
switch: switch,
rng: rng,
brokerCtx: brokerCtx,
enr: enr,
announcedAddresses: netConfig.announcedAddresses,
topicSubscriptionQueue: queue,

View File

@ -10,7 +10,7 @@ import
bearssl/rand,
stew/byteutils
import
waku/[node/peer_manager, waku_core, events/delivery_events],
waku/[node/peer_manager, waku_core, events/delivery_events, common/broker/broker_context],
./common,
./protocol_metrics,
./rpc_codec,
@ -20,6 +20,7 @@ logScope:
topics = "waku filter client"
type WakuFilterClient* = ref object of LPProtocol
brokerCtx: BrokerContext
rng: ref HmacDrbgContext
peerManager: PeerManager
pushHandlers: seq[FilterPushHandler]
@ -126,7 +127,7 @@ proc subscribe*(
?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
OnFilterSubscribeEvent.emit(pubSubTopic, contentTopicSeq)
OnFilterSubscribeEvent.emit(wfc.brokerCtx, pubsubTopic, contentTopicSeq)
return ok()
@ -202,6 +203,9 @@ proc initProtocolHandler(wfc: WakuFilterClient) =
proc new*(
T: type WakuFilterClient, peerManager: PeerManager, rng: ref HmacDrbgContext
): T =
let wfc = WakuFilterClient(rng: rng, peerManager: peerManager, pushHandlers: @[])
let brokerCtx = globalBrokerContext()
let wfc = WakuFilterClient(
brokerCtx: brokerCtx, rng: rng, peerManager: peerManager, pushHandlers: @[]
)
wfc.initProtocolHandler()
wfc