From 07a35f8bd07577126ad6965c4280660c3c53e220 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 17 Dec 2025 00:36:03 +0100 Subject: [PATCH 1/8] Refactor RequestBroker to support context aware use - introduction of BrokerContext --- tests/common/test_request_broker.nim | 163 ++++++++++++ waku/common/broker/broker_context.nim | 26 ++ waku/common/broker/request_broker.nim | 349 +++++++++++++++++++++++--- 3 files changed, 497 insertions(+), 41 deletions(-) create mode 100644 waku/common/broker/broker_context.nim diff --git a/tests/common/test_request_broker.nim b/tests/common/test_request_broker.nim index a534216dc..87065a916 100644 --- a/tests/common/test_request_broker.nim +++ b/tests/common/test_request_broker.nim @@ -203,6 +203,104 @@ suite "RequestBroker macro (async mode)": DualResponse.clearProvider() + test "supports keyed providers (async, zero-arg)": + SimpleResponse.clearProvider() + + check SimpleResponse + .setProvider( + proc(): Future[Result[SimpleResponse, string]] {.async.} = + ok(SimpleResponse(value: "default")) + ) + .isOk() + + check SimpleResponse + .setProvider( + BrokerContext(0x11111111'u32), + proc(): Future[Result[SimpleResponse, string]] {.async.} = + ok(SimpleResponse(value: "one")), + ) + .isOk() + + check SimpleResponse + .setProvider( + BrokerContext(0x22222222'u32), + proc(): Future[Result[SimpleResponse, string]] {.async.} = + ok(SimpleResponse(value: "two")), + ) + .isOk() + + let defaultRes = waitFor SimpleResponse.request() + check defaultRes.isOk() + check defaultRes.value.value == "default" + + let res1 = waitFor SimpleResponse.request(BrokerContext(0x11111111'u32)) + check res1.isOk() + check res1.value.value == "one" + + let res2 = waitFor SimpleResponse.request(BrokerContext(0x22222222'u32)) + check res2.isOk() + check res2.value.value == "two" + + let missing = waitFor SimpleResponse.request(BrokerContext(0x33333333'u32)) + check missing.isErr() + check missing.error.contains("no provider registered for broker context") + + check SimpleResponse + .setProvider( + BrokerContext(0x11111111'u32), + proc(): Future[Result[SimpleResponse, string]] {.async.} = + ok(SimpleResponse(value: "dup")), + ) + .isErr() + + SimpleResponse.clearProvider() + + test "supports keyed providers (async, with args)": + KeyedResponse.clearProvider() + + check KeyedResponse + .setProvider( + proc(key: string, subKey: int): Future[Result[KeyedResponse, string]] {.async.} = + ok(KeyedResponse(key: "default-" & key, payload: $subKey)) + ) + .isOk() + + check KeyedResponse + .setProvider( + BrokerContext(0xABCDEF01'u32), + proc(key: string, subKey: int): Future[Result[KeyedResponse, string]] {.async.} = + ok(KeyedResponse(key: "k1-" & key, payload: "p" & $subKey)), + ) + .isOk() + + check KeyedResponse + .setProvider( + BrokerContext(0xABCDEF02'u32), + proc(key: string, subKey: int): Future[Result[KeyedResponse, string]] {.async.} = + ok(KeyedResponse(key: "k2-" & key, payload: "q" & $subKey)), + ) + .isOk() + + let d = waitFor KeyedResponse.request("topic", 7) + check d.isOk() + check d.value.key == "default-topic" + + let k1 = waitFor KeyedResponse.request(BrokerContext(0xABCDEF01'u32), "topic", 7) + check k1.isOk() + check k1.value.key == "k1-topic" + check k1.value.payload == "p7" + + let k2 = waitFor KeyedResponse.request(BrokerContext(0xABCDEF02'u32), "topic", 7) + check k2.isOk() + check k2.value.key == "k2-topic" + check k2.value.payload == "q7" + + let miss = waitFor KeyedResponse.request(BrokerContext(0xDEADBEEF'u32), "topic", 7) + check miss.isErr() + check miss.error.contains("no provider registered for broker context") + + KeyedResponse.clearProvider() + ## --------------------------------------------------------------------------- ## Sync-mode brokers + tests ## --------------------------------------------------------------------------- @@ -370,6 +468,71 @@ suite "RequestBroker macro (sync mode)": ImplicitResponseSync.clearProvider() + test "supports keyed providers (sync, zero-arg)": + SimpleResponseSync.clearProvider() + + check SimpleResponseSync + .setProvider( + proc(): Result[SimpleResponseSync, string] = + ok(SimpleResponseSync(value: "default")) + ) + .isOk() + + check SimpleResponseSync + .setProvider( + BrokerContext(0x10101010'u32), + proc(): Result[SimpleResponseSync, string] = + ok(SimpleResponseSync(value: "ten")), + ) + .isOk() + + let defaultRes = SimpleResponseSync.request() + check defaultRes.isOk() + check defaultRes.value.value == "default" + + let keyedRes = SimpleResponseSync.request(BrokerContext(0x10101010'u32)) + check keyedRes.isOk() + check keyedRes.value.value == "ten" + + let miss = SimpleResponseSync.request(BrokerContext(0x20202020'u32)) + check miss.isErr() + check miss.error.contains("no provider registered for broker context") + + SimpleResponseSync.clearProvider() + + test "supports keyed providers (sync, with args)": + KeyedResponseSync.clearProvider() + + check KeyedResponseSync + .setProvider( + proc(key: string, subKey: int): Result[KeyedResponseSync, string] = + ok(KeyedResponseSync(key: "default-" & key, payload: $subKey)) + ) + .isOk() + + check KeyedResponseSync + .setProvider( + BrokerContext(0xA0A0A0A0'u32), + proc(key: string, subKey: int): Result[KeyedResponseSync, string] = + ok(KeyedResponseSync(key: "k-" & key, payload: "p" & $subKey)), + ) + .isOk() + + let d = KeyedResponseSync.request("topic", 2) + check d.isOk() + check d.value.key == "default-topic" + + let keyed = KeyedResponseSync.request(BrokerContext(0xA0A0A0A0'u32), "topic", 2) + check keyed.isOk() + check keyed.value.key == "k-topic" + check keyed.value.payload == "p2" + + let miss = KeyedResponseSync.request(BrokerContext(0xB0B0B0B0'u32), "topic", 2) + check miss.isErr() + check miss.error.contains("no provider registered for broker context") + + KeyedResponseSync.clearProvider() + ## --------------------------------------------------------------------------- ## POD / external type brokers + tests (distinct/alias behavior) ## --------------------------------------------------------------------------- diff --git a/waku/common/broker/broker_context.nim b/waku/common/broker/broker_context.nim new file mode 100644 index 000000000..1b8235f6a --- /dev/null +++ b/waku/common/broker/broker_context.nim @@ -0,0 +1,26 @@ +import std/[strutils, sysrand] + +type BrokerContext* = distinct uint32 + +func `==`*(a, b: BrokerContext): bool {.borrow.} + +func `$`*(bc: BrokerContext): string = + toHex(uint32(bc), 8) + +const DefaultBrokerContext* = BrokerContext(0xCAFFE14E'u32) + +proc NewBrokerContext*(): BrokerContext = + ## Generates a random non-default broker context (as a raw uint32). + ## + ## The default broker context is reserved for the provider at index 0. + ## This helper never returns that value. + for _ in 0 ..< 16: + let b = urandom(4) + if b.len != 4: + continue + let key = + (uint32(b[0]) shl 24) or (uint32(b[1]) shl 16) or (uint32(b[2]) shl 8) or + uint32(b[3]) + if key != uint32(DefaultBrokerContext): + return BrokerContext(key) + BrokerContext(1'u32) diff --git a/waku/common/broker/request_broker.nim b/waku/common/broker/request_broker.nim index dece77381..c71f90229 100644 --- a/waku/common/broker/request_broker.nim +++ b/waku/common/broker/request_broker.nim @@ -16,6 +16,18 @@ ## `async` mode is better to be used when you request date that may involve some long IO operation ## or action. ## +## Default vs. context aware use: +## Every generated broker is a thread-local global instance. This means each RequestBroker enables decoupled +## data exchange threadwise. Sometimes we use brokers inside a context - like inside a component that has many modules or subsystems. +## In case you would instantiate multiple such components in a single thread, and each component must has its own provider for the same RequestBroker type, +## in order to avoid provider collision, you can use context aware RequestBroker. +## Context awareness is supported through the `BrokerContext` argument for `setProvider`, `request`, `clearProvider` interfaces. +## Suce use requires generating a new unique `BrokerContext` value per component instance, and spread it to all modules using the brokers. +## Example, store the `BrokerContext` as a field inside the top level component instance, and spread around at initialization of the subcomponents.. +## +## Default broker context is defined as `DefaultBrokerContext` constant. But if you don't need context awareness, you can use the +## interfaces without context argument. +## ## Usage: ## Declare your desired request type inside a `RequestBroker` macro, add any number of fields. ## Define the provider signature, that is enforced at compile time. @@ -89,7 +101,13 @@ ## After this, you can register a provider anywhere in your code with ## `TypeName.setProvider(...)`, which returns error if already having a provider. ## Providers are async procs/lambdas in default mode and sync procs in sync mode. -## Only one provider can be registered at a time per signature type (zero arg and/or multi arg). +## +## Providers are stored as a broker-context keyed list: +## - the default provider is always stored at index 0 (reserved broker context: 0) +## - additional providers can be registered under arbitrary non-zero broker contexts +## +## The original `setProvider(handler)` / `request(...)` APIs continue to operate +## on the default provider (broker context 0) for backward compatibility. ## ## Requests can be made from anywhere with no direct dependency on the provider by ## calling `TypeName.request()` - with arguments respecting the signature(s). @@ -139,11 +157,12 @@ ## automatically, so the caller only needs to provide the type definition. import std/[macros, strutils] +from std/sequtils import keepItIf import chronos import results -import ./helper/broker_utils +import ./helper/broker_utils, broker_context -export results, chronos +export results, chronos, keepItIf, broker_context proc errorFuture[T](message: string): Future[Result[T, string]] {.inline.} = ## Build a future that is already completed with an error result. @@ -329,11 +348,9 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = let typeNameLit = newLit(typeDisplayName) var zeroArgSig: NimNode = nil var zeroArgProviderName: NimNode = nil - var zeroArgFieldName: NimNode = nil var argSig: NimNode = nil var argParams: seq[NimNode] = @[] var argProviderName: NimNode = nil - var argFieldName: NimNode = nil for stmt in body: case stmt.kind @@ -368,7 +385,6 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = error("Only one zero-argument signature is allowed", stmt) zeroArgSig = stmt zeroArgProviderName = ident(sanitizeIdentName(typeIdent) & "ProviderNoArgs") - zeroArgFieldName = ident("providerNoArgs") elif paramCount >= 1: if argSig != nil: error("Only one argument-based signature is allowed", stmt) @@ -391,7 +407,6 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = error("Signature parameter must declare a name", paramDef) argParams.add(copyNimTree(paramDef)) argProviderName = ident(sanitizeIdentName(typeIdent) & "ProviderWithArgs") - argFieldName = ident("providerWithArgs") of nnkTypeSection, nnkEmpty: discard else: @@ -400,7 +415,6 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = if zeroArgSig.isNil() and argSig.isNil(): zeroArgSig = newEmptyNode() zeroArgProviderName = ident(sanitizeIdentName(typeIdent) & "ProviderNoArgs") - zeroArgFieldName = ident("providerNoArgs") var typeSection = newTree(nnkTypeSection) typeSection.add(newTree(nnkTypeDef, exportedTypeIdent, newEmptyNode(), objectDef)) @@ -423,12 +437,29 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = var brokerRecList = newTree(nnkRecList) if not zeroArgSig.isNil(): + let zeroArgProvidersFieldName = ident("providersNoArgs") + let zeroArgProvidersTupleTy = newTree( + nnkTupleTy, + newTree(nnkIdentDefs, ident("brokerCtx"), ident("BrokerContext"), newEmptyNode()), + newTree(nnkIdentDefs, ident("handler"), zeroArgProviderName, newEmptyNode()), + ) + let zeroArgProvidersSeqTy = + newTree(nnkBracketExpr, ident("seq"), zeroArgProvidersTupleTy) brokerRecList.add( - newTree(nnkIdentDefs, zeroArgFieldName, zeroArgProviderName, newEmptyNode()) + newTree( + nnkIdentDefs, zeroArgProvidersFieldName, zeroArgProvidersSeqTy, newEmptyNode() + ) ) if not argSig.isNil(): + let argProvidersFieldName = ident("providersWithArgs") + let argProvidersTupleTy = newTree( + nnkTupleTy, + newTree(nnkIdentDefs, ident("brokerCtx"), ident("BrokerContext"), newEmptyNode()), + newTree(nnkIdentDefs, ident("handler"), argProviderName, newEmptyNode()), + ) + let argProvidersSeqTy = newTree(nnkBracketExpr, ident("seq"), argProvidersTupleTy) brokerRecList.add( - newTree(nnkIdentDefs, argFieldName, argProviderName, newEmptyNode()) + newTree(nnkIdentDefs, argProvidersFieldName, argProvidersSeqTy, newEmptyNode()) ) let brokerTypeIdent = ident(sanitizeIdentName(typeIdent) & "Broker") let brokerTypeDef = newTree( @@ -443,31 +474,97 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = let globalVarIdent = ident("g" & sanitizeIdentName(typeIdent) & "Broker") let accessProcIdent = ident("access" & sanitizeIdentName(typeIdent) & "Broker") + + var brokerNewBody = newStmtList() + if not zeroArgSig.isNil(): + brokerNewBody.add( + quote do: + result.providersNoArgs = + @[(brokerCtx: DefaultBrokerContext, handler: default(`zeroArgProviderName`))] + ) + if not argSig.isNil(): + brokerNewBody.add( + quote do: + result.providersWithArgs = + @[(brokerCtx: DefaultBrokerContext, handler: default(`argProviderName`))] + ) + + var brokerInitChecks = newStmtList() + if not zeroArgSig.isNil(): + brokerInitChecks.add( + quote do: + if `globalVarIdent`.providersNoArgs.len == 0: + `globalVarIdent` = `brokerTypeIdent`.new() + ) + if not argSig.isNil(): + brokerInitChecks.add( + quote do: + if `globalVarIdent`.providersWithArgs.len == 0: + `globalVarIdent` = `brokerTypeIdent`.new() + ) + result.add( quote do: var `globalVarIdent` {.threadvar.}: `brokerTypeIdent` + proc new(_: type `brokerTypeIdent`): `brokerTypeIdent` = + result = `brokerTypeIdent`() + `brokerNewBody` + proc `accessProcIdent`(): var `brokerTypeIdent` = + `brokerInitChecks` `globalVarIdent` ) - var clearBody = newStmtList() + var clearBodyKeyed = newStmtList() + let brokerCtxParamIdent = ident("brokerCtx") if not zeroArgSig.isNil(): + let zeroArgProvidersFieldName = ident("providersNoArgs") result.add( quote do: proc setProvider*( _: typedesc[`typeIdent`], handler: `zeroArgProviderName` ): Result[void, string] = - if not `accessProcIdent`().`zeroArgFieldName`.isNil(): + if not `accessProcIdent`().`zeroArgProvidersFieldName`[0].handler.isNil(): return err("Zero-arg provider already set") - `accessProcIdent`().`zeroArgFieldName` = handler + `accessProcIdent`().`zeroArgProvidersFieldName`[0].handler = handler return ok() ) - clearBody.add( + + result.add( quote do: - `accessProcIdent`().`zeroArgFieldName` = nil + proc setProvider*( + _: typedesc[`typeIdent`], + brokerCtx: BrokerContext, + handler: `zeroArgProviderName`, + ): Result[void, string] = + if brokerCtx == DefaultBrokerContext: + return setProvider(`typeIdent`, handler) + + for entry in `accessProcIdent`().`zeroArgProvidersFieldName`: + if entry.brokerCtx == brokerCtx: + return err( + "RequestBroker(" & `typeNameLit` & + "): provider already set for broker context " & $brokerCtx + ) + + `accessProcIdent`().`zeroArgProvidersFieldName`.add( + (brokerCtx: brokerCtx, handler: handler) + ) + return ok() + + ) + clearBodyKeyed.add( + quote do: + if `brokerCtxParamIdent` == DefaultBrokerContext: + `accessProcIdent`().`zeroArgProvidersFieldName`[0].handler = + default(`zeroArgProviderName`) + else: + `accessProcIdent`().`zeroArgProvidersFieldName`.keepItIf( + it.brokerCtx != `brokerCtxParamIdent` + ) ) case mode of rbAsync: @@ -476,11 +573,34 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = proc request*( _: typedesc[`typeIdent`] ): Future[Result[`typeIdent`, string]] {.async: (raises: []).} = - let provider = `accessProcIdent`().`zeroArgFieldName` + return await request(`typeIdent`, DefaultBrokerContext) + + ) + + result.add( + quote do: + proc request*( + _: typedesc[`typeIdent`], brokerCtx: BrokerContext + ): Future[Result[`typeIdent`, string]] {.async: (raises: []).} = + var provider: `zeroArgProviderName` + if brokerCtx == DefaultBrokerContext: + provider = `accessProcIdent`().`zeroArgProvidersFieldName`[0].handler + else: + for entry in `accessProcIdent`().`zeroArgProvidersFieldName`: + if entry.brokerCtx == brokerCtx: + provider = entry.handler + break + if provider.isNil(): + if brokerCtx == DefaultBrokerContext: + return err( + "RequestBroker(" & `typeNameLit` & "): no zero-arg provider registered" + ) return err( - "RequestBroker(" & `typeNameLit` & "): no zero-arg provider registered" + "RequestBroker(" & `typeNameLit` & + "): no provider registered for broker context " & $brokerCtx ) + let catchedRes = catch: await provider() @@ -507,10 +627,32 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = proc request*( _: typedesc[`typeIdent`] ): Result[`typeIdent`, string] {.gcsafe, raises: [].} = - let provider = `accessProcIdent`().`zeroArgFieldName` + return request(`typeIdent`, DefaultBrokerContext) + + ) + + result.add( + quote do: + proc request*( + _: typedesc[`typeIdent`], brokerCtx: BrokerContext + ): Result[`typeIdent`, string] {.gcsafe, raises: [].} = + var provider: `zeroArgProviderName` + if brokerCtx == DefaultBrokerContext: + provider = `accessProcIdent`().`zeroArgProvidersFieldName`[0].handler + else: + for entry in `accessProcIdent`().`zeroArgProvidersFieldName`: + if entry.brokerCtx == brokerCtx: + provider = entry.handler + break + if provider.isNil(): + if brokerCtx == DefaultBrokerContext: + return err( + "RequestBroker(" & `typeNameLit` & "): no zero-arg provider registered" + ) return err( - "RequestBroker(" & `typeNameLit` & "): no zero-arg provider registered" + "RequestBroker(" & `typeNameLit` & + "): no provider registered for broker context " & $brokerCtx ) var providerRes: Result[`typeIdent`, string] @@ -533,24 +675,54 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = ) if not argSig.isNil(): + let argProvidersFieldName = ident("providersWithArgs") result.add( quote do: proc setProvider*( _: typedesc[`typeIdent`], handler: `argProviderName` ): Result[void, string] = - if not `accessProcIdent`().`argFieldName`.isNil(): + if not `accessProcIdent`().`argProvidersFieldName`[0].handler.isNil(): return err("Provider already set") - `accessProcIdent`().`argFieldName` = handler + `accessProcIdent`().`argProvidersFieldName`[0].handler = handler return ok() ) - clearBody.add( + + result.add( quote do: - `accessProcIdent`().`argFieldName` = nil + proc setProvider*( + _: typedesc[`typeIdent`], + brokerCtx: BrokerContext, + handler: `argProviderName`, + ): Result[void, string] = + if brokerCtx == DefaultBrokerContext: + return setProvider(`typeIdent`, handler) + + for entry in `accessProcIdent`().`argProvidersFieldName`: + if entry.brokerCtx == brokerCtx: + return err( + "RequestBroker(" & `typeNameLit` & + "): provider already set for broker context " & $brokerCtx + ) + + `accessProcIdent`().`argProvidersFieldName`.add( + (brokerCtx: brokerCtx, handler: handler) + ) + return ok() + + ) + clearBodyKeyed.add( + quote do: + if `brokerCtxParamIdent` == DefaultBrokerContext: + `accessProcIdent`().`argProvidersFieldName`[0].handler = + default(`argProviderName`) + else: + `accessProcIdent`().`argProvidersFieldName`.keepItIf( + it.brokerCtx != `brokerCtxParamIdent` + ) ) let requestParamDefs = cloneParams(argParams) let argNameIdents = collectParamNames(requestParamDefs) - let providerSym = genSym(nskLet, "provider") var formalParams = newTree(nnkFormalParams) formalParams.add(copyNimTree(returnType)) formalParams.add( @@ -572,29 +744,96 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = of rbSync: quote: {.gcsafe, raises: [].} - var providerCall = newCall(providerSym) + + var forwardCall = newCall(ident("request")) + forwardCall.add(copyNimTree(typeIdent)) + forwardCall.add(ident("DefaultBrokerContext")) for argName in argNameIdents: - providerCall.add(argName) + forwardCall.add(argName) + var requestBody = newStmtList() - requestBody.add( - quote do: - let `providerSym` = `accessProcIdent`().`argFieldName` + case mode + of rbAsync: + requestBody.add( + quote do: + return await `forwardCall` + ) + of rbSync: + requestBody.add( + quote do: + return `forwardCall` + ) + + result.add( + newTree( + nnkProcDef, + postfix(ident("request"), "*"), + newEmptyNode(), + newEmptyNode(), + formalParams, + requestPragmas, + newEmptyNode(), + requestBody, + ) ) - requestBody.add( + + # Keyed request variant for the argument-based signature. + let requestParamDefsKeyed = cloneParams(argParams) + let argNameIdentsKeyed = collectParamNames(requestParamDefsKeyed) + let providerSymKeyed = genSym(nskVar, "provider") + var formalParamsKeyed = newTree(nnkFormalParams) + formalParamsKeyed.add(copyNimTree(returnType)) + formalParamsKeyed.add( + newTree( + nnkIdentDefs, + ident("_"), + newTree(nnkBracketExpr, ident("typedesc"), copyNimTree(typeIdent)), + newEmptyNode(), + ) + ) + formalParamsKeyed.add( + newTree(nnkIdentDefs, ident("brokerCtx"), ident("BrokerContext"), newEmptyNode()) + ) + for paramDef in requestParamDefsKeyed: + formalParamsKeyed.add(paramDef) + + let requestPragmasKeyed = requestPragmas + var providerCallKeyed = newCall(providerSymKeyed) + for argName in argNameIdentsKeyed: + providerCallKeyed.add(argName) + + var requestBodyKeyed = newStmtList() + requestBodyKeyed.add( quote do: - if `providerSym`.isNil(): + var `providerSymKeyed`: `argProviderName` + if brokerCtx == DefaultBrokerContext: + `providerSymKeyed` = `accessProcIdent`().`argProvidersFieldName`[0].handler + else: + for entry in `accessProcIdent`().`argProvidersFieldName`: + if entry.brokerCtx == brokerCtx: + `providerSymKeyed` = entry.handler + break + ) + requestBodyKeyed.add( + quote do: + if `providerSymKeyed`.isNil(): + if brokerCtx == DefaultBrokerContext: + return err( + "RequestBroker(" & `typeNameLit` & + "): no provider registered for input signature" + ) return err( "RequestBroker(" & `typeNameLit` & - "): no provider registered for input signature" + "): no provider registered for broker context " & $brokerCtx ) ) case mode of rbAsync: - requestBody.add( + requestBodyKeyed.add( quote do: let catchedRes = catch: - await `providerCall` + await `providerCallKeyed` if catchedRes.isErr(): return err( "RequestBroker(" & `typeNameLit` & "): provider threw exception: " & @@ -612,11 +851,11 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = return providerRes ) of rbSync: - requestBody.add( + requestBodyKeyed.add( quote do: var providerRes: Result[`typeIdent`, string] try: - providerRes = `providerCall` + providerRes = `providerCallKeyed` except CatchableError as e: return err( "RequestBroker(" & `typeNameLit` & "): provider threw exception: " & e.msg @@ -631,24 +870,52 @@ proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = ) return providerRes ) - # requestBody.add(providerCall) + result.add( newTree( nnkProcDef, postfix(ident("request"), "*"), newEmptyNode(), newEmptyNode(), - formalParams, - requestPragmas, + formalParamsKeyed, + requestPragmasKeyed, newEmptyNode(), - requestBody, + requestBodyKeyed, + ) + ) + + block: + var formalParamsClearKeyed = newTree(nnkFormalParams) + formalParamsClearKeyed.add(newEmptyNode()) + formalParamsClearKeyed.add( + newTree( + nnkIdentDefs, + ident("_"), + newTree(nnkBracketExpr, ident("typedesc"), copyNimTree(typeIdent)), + newEmptyNode(), + ) + ) + formalParamsClearKeyed.add( + newTree(nnkIdentDefs, brokerCtxParamIdent, ident("BrokerContext"), newEmptyNode()) + ) + + result.add( + newTree( + nnkProcDef, + postfix(ident("clearProvider"), "*"), + newEmptyNode(), + newEmptyNode(), + formalParamsClearKeyed, + newEmptyNode(), + newEmptyNode(), + clearBodyKeyed, ) ) result.add( quote do: proc clearProvider*(_: typedesc[`typeIdent`]) = - `clearBody` + clearProvider(`typeIdent`, DefaultBrokerContext) ) From bc4ff10ea921c7aae9f116d2b06508173e5452f3 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 17 Dec 2025 01:44:23 +0100 Subject: [PATCH 2/8] Context aware extension for EventBroker, EventBoker support for native or external types --- tests/common/test_event_broker.nim | 76 ++++++ waku/common/broker/event_broker.nim | 398 +++++++++++++++++++++------- 2 files changed, 371 insertions(+), 103 deletions(-) diff --git a/tests/common/test_event_broker.nim b/tests/common/test_event_broker.nim index cead1277f..bcd081f4f 100644 --- a/tests/common/test_event_broker.nim +++ b/tests/common/test_event_broker.nim @@ -4,6 +4,15 @@ import testutils/unittests import waku/common/broker/event_broker +type ExternalDefinedEventType = object + label*: string + +EventBroker: + type IntEvent = int + +EventBroker: + type ExternalAliasEvent = distinct ExternalDefinedEventType + EventBroker: type SampleEvent = object value*: int @@ -123,3 +132,70 @@ suite "EventBroker": check counter == 21 # 1+2+3 + 4+5+6 RefEvent.dropAllListeners() + + test "supports BrokerContext-scoped listeners": + SampleEvent.dropAllListeners() + + let ctxA = NewBrokerContext() + let ctxB = NewBrokerContext() + + var seenA: seq[int] = @[] + var seenB: seq[int] = @[] + + discard SampleEvent.listen( + ctxA, + proc(evt: SampleEvent): Future[void] {.async: (raises: []).} = + seenA.add(evt.value), + ) + + discard SampleEvent.listen( + ctxB, + proc(evt: SampleEvent): Future[void] {.async: (raises: []).} = + seenB.add(evt.value), + ) + + SampleEvent.emit(ctxA, SampleEvent(value: 1, label: "a")) + SampleEvent.emit(ctxB, SampleEvent(value: 2, label: "b")) + waitForListeners() + + check seenA == @[1] + check seenB == @[2] + + SampleEvent.dropAllListeners(ctxA) + SampleEvent.emit(ctxA, SampleEvent(value: 3, label: "a2")) + SampleEvent.emit(ctxB, SampleEvent(value: 4, label: "b2")) + waitForListeners() + + check seenA == @[1] + check seenB == @[2, 4] + + SampleEvent.dropAllListeners(ctxB) + + test "supports non-object event types (auto-distinct)": + var seen: seq[int] = @[] + + discard IntEvent.listen( + proc(evt: IntEvent): Future[void] {.async: (raises: []).} = + seen.add(int(evt)) + ) + + IntEvent.emit(IntEvent(42)) + waitForListeners() + + check seen == @[42] + IntEvent.dropAllListeners() + + test "supports externally-defined type aliases (auto-distinct)": + var seen: seq[string] = @[] + + discard ExternalAliasEvent.listen( + proc(evt: ExternalAliasEvent): Future[void] {.async: (raises: []).} = + let base = ExternalDefinedEventType(evt) + seen.add(base.label) + ) + + ExternalAliasEvent.emit(ExternalAliasEvent(ExternalDefinedEventType(label: "x"))) + waitForListeners() + + check seen == @["x"] + ExternalAliasEvent.dropAllListeners() diff --git a/waku/common/broker/event_broker.nim b/waku/common/broker/event_broker.nim index 05d7b50ab..f694c2c98 100644 --- a/waku/common/broker/event_broker.nim +++ b/waku/common/broker/event_broker.nim @@ -5,10 +5,35 @@ ## need for direct dependencies in between emitters and listeners. ## Worth considering using it in a single or many emitters to many listeners scenario. ## -## Generates a standalone, type-safe event broker for the declared object type. +## Generates a standalone, type-safe event broker for the declared type. ## The macro exports the value type itself plus a broker companion that manages ## listeners via thread-local storage. ## +## Type definitions: +## - Inline `object` / `ref object` definitions are supported. +## - Native types, aliases, and externally-defined types are also supported. +## In that case, EventBroker will automatically wrap the declared RHS type in +## `distinct` unless you already used `distinct`. +## This keeps event types unique even when multiple brokers share the same +## underlying base type. +## +## Default vs. context aware use: +## Every generated broker is a thread-local global instance. This means EventBroker +## enables decoupled event exchange threadwise. +## +## Sometimes we use brokers inside a context (e.g. within a component that has many +## modules or subsystems). If you instantiate multiple such components in a single +## thread, and each component must have its own listener set for the same EventBroker +## type, you can use context-aware EventBroker. +## +## Context awareness is supported through the `BrokerContext` argument for +## `listen`, `emit`, `dropListener`, and `dropAllListeners`. +## Listener stores are kept separate per broker context. +## +## Default broker context is defined as `DefaultBrokerContext`. If you don't need +## context awareness, you can keep using the interfaces without the context +## argument, which operate on `DefaultBrokerContext`. +## ## Usage: ## Declare your desired event type inside an `EventBroker` macro, add any number of fields.: ## ```nim @@ -47,11 +72,30 @@ ## GreetingEvent.dropListener(handle) ## ``` +## Example (non-object event type): +## ```nim +## EventBroker: +## type CounterEvent = int # exported as: `distinct int` +## +## discard CounterEvent.listen( +## proc(evt: CounterEvent): Future[void] {.async.} = +## echo int(evt) +## ) +## CounterEvent.emit(CounterEvent(42)) +## ``` + import std/[macros, tables] import chronos, chronicles, results -import ./helper/broker_utils +import ./helper/broker_utils, broker_context -export chronicles, results, chronos +export chronicles, results, chronos, broker_context + +proc ensureDistinctType(rhs: NimNode): NimNode = + ## For PODs / aliases / externally-defined types, wrap in `distinct` unless + ## it's already distinct. + if rhs.kind == nnkDistinctTy: + return rhs + newTree(nnkDistinctTy, copyNimTree(rhs)) macro EventBroker*(body: untyped): untyped = when defined(eventBrokerDebug): @@ -60,74 +104,102 @@ macro EventBroker*(body: untyped): untyped = var objectDef: NimNode = nil var fieldNames: seq[NimNode] = @[] var fieldTypes: seq[NimNode] = @[] - var isRefObject = false + var hasInlineFields = false for stmt in body: if stmt.kind == nnkTypeSection: for def in stmt: if def.kind != nnkTypeDef: continue let rhs = def[2] - var objectType: NimNode + if not typeIdent.isNil(): + error("Only one type may be declared inside EventBroker", def) + typeIdent = baseTypeIdent(def[0]) + + # Inline object/ref object definitions. case rhs.kind of nnkObjectTy: - objectType = rhs + let recList = rhs[2] + if recList.kind != nnkRecList: + error("EventBroker object must declare a standard field list", rhs) + var exportedRecList = newTree(nnkRecList) + for field in recList: + case field.kind + of nnkIdentDefs: + ensureFieldDef(field) + let fieldTypeNode = field[field.len - 2] + for i in 0 ..< field.len - 2: + let baseFieldIdent = baseTypeIdent(field[i]) + fieldNames.add(copyNimTree(baseFieldIdent)) + fieldTypes.add(copyNimTree(fieldTypeNode)) + var cloned = copyNimTree(field) + for i in 0 ..< cloned.len - 2: + cloned[i] = exportIdentNode(cloned[i]) + exportedRecList.add(cloned) + of nnkEmpty: + discard + else: + error( + "EventBroker object definition only supports simple field declarations", + field, + ) + let exportedObjectType = newTree( + nnkObjectTy, copyNimTree(rhs[0]), copyNimTree(rhs[1]), exportedRecList + ) + objectDef = exportedObjectType + hasInlineFields = true of nnkRefTy: - isRefObject = true if rhs.len != 1 or rhs[0].kind != nnkObjectTy: error("EventBroker ref object must wrap a concrete object definition", rhs) - objectType = rhs[0] - else: - continue - if not typeIdent.isNil(): - error("Only one object type may be declared inside EventBroker", def) - typeIdent = baseTypeIdent(def[0]) - let recList = objectType[2] - if recList.kind != nnkRecList: - error("EventBroker object must declare a standard field list", objectType) - var exportedRecList = newTree(nnkRecList) - for field in recList: - case field.kind - of nnkIdentDefs: - ensureFieldDef(field) - let fieldTypeNode = field[field.len - 2] - for i in 0 ..< field.len - 2: - let baseFieldIdent = baseTypeIdent(field[i]) - fieldNames.add(copyNimTree(baseFieldIdent)) - fieldTypes.add(copyNimTree(fieldTypeNode)) - var cloned = copyNimTree(field) - for i in 0 ..< cloned.len - 2: - cloned[i] = exportIdentNode(cloned[i]) - exportedRecList.add(cloned) - of nnkEmpty: - discard - else: - error( - "EventBroker object definition only supports simple field declarations", - field, - ) - let exportedObjectType = newTree( - nnkObjectTy, - copyNimTree(objectType[0]), - copyNimTree(objectType[1]), - exportedRecList, - ) - if isRefObject: + let obj = rhs[0] + let recList = obj[2] + if recList.kind != nnkRecList: + error("EventBroker object must declare a standard field list", obj) + var exportedRecList = newTree(nnkRecList) + for field in recList: + case field.kind + of nnkIdentDefs: + ensureFieldDef(field) + let fieldTypeNode = field[field.len - 2] + for i in 0 ..< field.len - 2: + let baseFieldIdent = baseTypeIdent(field[i]) + fieldNames.add(copyNimTree(baseFieldIdent)) + fieldTypes.add(copyNimTree(fieldTypeNode)) + var cloned = copyNimTree(field) + for i in 0 ..< cloned.len - 2: + cloned[i] = exportIdentNode(cloned[i]) + exportedRecList.add(cloned) + of nnkEmpty: + discard + else: + error( + "EventBroker object definition only supports simple field declarations", + field, + ) + let exportedObjectType = newTree( + nnkObjectTy, copyNimTree(obj[0]), copyNimTree(obj[1]), exportedRecList + ) objectDef = newTree(nnkRefTy, exportedObjectType) + hasInlineFields = true else: - objectDef = exportedObjectType + # Native type / alias / externally-defined type. + # Ensure we create a unique event type by wrapping in `distinct` unless + # the user already did. + objectDef = ensureDistinctType(rhs) if typeIdent.isNil(): - error("EventBroker body must declare exactly one object type", body) + error("EventBroker body must declare exactly one type", body) let exportedTypeIdent = postfix(copyNimTree(typeIdent), "*") let sanitized = sanitizeIdentName(typeIdent) let typeNameLit = newLit($typeIdent) - let isRefObjectLit = newLit(isRefObject) let handlerProcIdent = ident(sanitized & "ListenerProc") let listenerHandleIdent = ident(sanitized & "Listener") let brokerTypeIdent = ident(sanitized & "Broker") let exportedHandlerProcIdent = postfix(copyNimTree(handlerProcIdent), "*") let exportedListenerHandleIdent = postfix(copyNimTree(listenerHandleIdent), "*") let exportedBrokerTypeIdent = postfix(copyNimTree(brokerTypeIdent), "*") + let bucketTypeIdent = ident(sanitized & "CtxBucket") + let findBucketIdxIdent = ident(sanitized & "FindBucketIdx") + let getOrCreateBucketIdxIdent = ident(sanitized & "GetOrCreateBucketIdx") let accessProcIdent = ident("access" & sanitized & "Broker") let globalVarIdent = ident("g" & sanitized & "Broker") let listenImplIdent = ident("register" & sanitized & "Listener") @@ -147,10 +219,14 @@ macro EventBroker*(body: untyped): untyped = `exportedHandlerProcIdent` = proc(event: `typeIdent`): Future[void] {.async: (raises: []), gcsafe.} - `exportedBrokerTypeIdent` = ref object + `bucketTypeIdent` = object + brokerCtx: BrokerContext listeners: Table[uint64, `handlerProcIdent`] nextId: uint64 + `exportedBrokerTypeIdent` = ref object + buckets: seq[`bucketTypeIdent`] + ) result.add( @@ -163,49 +239,102 @@ macro EventBroker*(body: untyped): untyped = proc `accessProcIdent`(): `brokerTypeIdent` = if `globalVarIdent`.isNil(): new(`globalVarIdent`) - `globalVarIdent`.listeners = initTable[uint64, `handlerProcIdent`]() + `globalVarIdent`.buckets = + @[ + `bucketTypeIdent`( + brokerCtx: DefaultBrokerContext, + listeners: initTable[uint64, `handlerProcIdent`](), + nextId: 1'u64, + ) + ] `globalVarIdent` ) result.add( quote do: + proc `findBucketIdxIdent`( + broker: `brokerTypeIdent`, brokerCtx: BrokerContext + ): int = + if brokerCtx == DefaultBrokerContext: + return 0 + for i in 1 ..< broker.buckets.len: + if broker.buckets[i].brokerCtx == brokerCtx: + return i + return -1 + + proc `getOrCreateBucketIdxIdent`( + broker: `brokerTypeIdent`, brokerCtx: BrokerContext + ): int = + let idx = `findBucketIdxIdent`(broker, brokerCtx) + if idx >= 0: + return idx + broker.buckets.add( + `bucketTypeIdent`( + brokerCtx: brokerCtx, + listeners: initTable[uint64, `handlerProcIdent`](), + nextId: 1'u64, + ) + ) + return broker.buckets.high + proc `listenImplIdent`( - handler: `handlerProcIdent` + brokerCtx: BrokerContext, handler: `handlerProcIdent` ): Result[`listenerHandleIdent`, string] = if handler.isNil(): return err("Must provide a non-nil event handler") var broker = `accessProcIdent`() - if broker.nextId == 0'u64: - broker.nextId = 1'u64 - if broker.nextId == high(uint64): - error "Cannot add more listeners: ID space exhausted", nextId = $broker.nextId + + let bucketIdx = `getOrCreateBucketIdxIdent`(broker, brokerCtx) + if broker.buckets[bucketIdx].nextId == 0'u64: + broker.buckets[bucketIdx].nextId = 1'u64 + + if broker.buckets[bucketIdx].nextId == high(uint64): + error "Cannot add more listeners: ID space exhausted", + nextId = $broker.buckets[bucketIdx].nextId return err("Cannot add more listeners, listener ID space exhausted") - let newId = broker.nextId - inc broker.nextId - broker.listeners[newId] = handler + + let newId = broker.buckets[bucketIdx].nextId + inc broker.buckets[bucketIdx].nextId + broker.buckets[bucketIdx].listeners[newId] = handler return ok(`listenerHandleIdent`(id: newId)) ) result.add( quote do: - proc `dropListenerImplIdent`(handle: `listenerHandleIdent`) = + proc `dropListenerImplIdent`( + brokerCtx: BrokerContext, handle: `listenerHandleIdent` + ) = if handle.id == 0'u64: return var broker = `accessProcIdent`() - if broker.listeners.len == 0: + + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: return - broker.listeners.del(handle.id) + + if broker.buckets[bucketIdx].listeners.len == 0: + return + broker.buckets[bucketIdx].listeners.del(handle.id) + if brokerCtx != DefaultBrokerContext and + broker.buckets[bucketIdx].listeners.len == 0: + broker.buckets.delete(bucketIdx) ) result.add( quote do: - proc `dropAllListenersImplIdent`() = + proc `dropAllListenersImplIdent`(brokerCtx: BrokerContext) = var broker = `accessProcIdent`() - if broker.listeners.len > 0: - broker.listeners.clear() + + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: + return + if broker.buckets[bucketIdx].listeners.len > 0: + broker.buckets[bucketIdx].listeners.clear() + if brokerCtx != DefaultBrokerContext: + broker.buckets.delete(bucketIdx) ) @@ -214,17 +343,34 @@ macro EventBroker*(body: untyped): untyped = proc listen*( _: typedesc[`typeIdent`], handler: `handlerProcIdent` ): Result[`listenerHandleIdent`, string] = - return `listenImplIdent`(handler) + return `listenImplIdent`(DefaultBrokerContext, handler) + + proc listen*( + _: typedesc[`typeIdent`], + brokerCtx: BrokerContext, + handler: `handlerProcIdent`, + ): Result[`listenerHandleIdent`, string] = + return `listenImplIdent`(brokerCtx, handler) ) result.add( quote do: proc dropListener*(_: typedesc[`typeIdent`], handle: `listenerHandleIdent`) = - `dropListenerImplIdent`(handle) + `dropListenerImplIdent`(DefaultBrokerContext, handle) + + proc dropListener*( + _: typedesc[`typeIdent`], + brokerCtx: BrokerContext, + handle: `listenerHandleIdent`, + ) = + `dropListenerImplIdent`(brokerCtx, handle) proc dropAllListeners*(_: typedesc[`typeIdent`]) = - `dropAllListenersImplIdent`() + `dropAllListenersImplIdent`(DefaultBrokerContext) + + proc dropAllListeners*(_: typedesc[`typeIdent`], brokerCtx: BrokerContext) = + `dropAllListenersImplIdent`(brokerCtx) ) @@ -241,68 +387,114 @@ macro EventBroker*(body: untyped): untyped = error "Failed to execute event listener", error = getCurrentExceptionMsg() proc `emitImplIdent`( - event: `typeIdent` + brokerCtx: BrokerContext, event: `typeIdent` ): Future[void] {.async: (raises: []), gcsafe.} = - when `isRefObjectLit`: + when compiles(event.isNil()): if event.isNil(): error "Cannot emit uninitialized event object", eventType = `typeNameLit` return let broker = `accessProcIdent`() - if broker.listeners.len == 0: + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: # nothing to do as nobody is listening return + if broker.buckets[bucketIdx].listeners.len == 0: + return var callbacks: seq[`handlerProcIdent`] = @[] - for cb in broker.listeners.values: + for cb in broker.buckets[bucketIdx].listeners.values: callbacks.add(cb) for cb in callbacks: asyncSpawn `listenerTaskIdent`(cb, event) proc emit*(event: `typeIdent`) = - asyncSpawn `emitImplIdent`(event) + asyncSpawn `emitImplIdent`(DefaultBrokerContext, event) proc emit*(_: typedesc[`typeIdent`], event: `typeIdent`) = - asyncSpawn `emitImplIdent`(event) + asyncSpawn `emitImplIdent`(DefaultBrokerContext, event) + + proc emit*( + _: typedesc[`typeIdent`], brokerCtx: BrokerContext, event: `typeIdent` + ) = + asyncSpawn `emitImplIdent`(brokerCtx, event) ) - var emitCtorParams = newTree(nnkFormalParams, newEmptyNode()) - let typedescParamType = - newTree(nnkBracketExpr, ident("typedesc"), copyNimTree(typeIdent)) - emitCtorParams.add( - newTree(nnkIdentDefs, ident("_"), typedescParamType, newEmptyNode()) - ) - for i in 0 ..< fieldNames.len: + if hasInlineFields: + # Typedesc emit constructor overloads for inline object/ref object types. + var emitCtorParams = newTree(nnkFormalParams, newEmptyNode()) + let typedescParamType = + newTree(nnkBracketExpr, ident("typedesc"), copyNimTree(typeIdent)) emitCtorParams.add( - newTree( - nnkIdentDefs, - copyNimTree(fieldNames[i]), - copyNimTree(fieldTypes[i]), - newEmptyNode(), + newTree(nnkIdentDefs, ident("_"), typedescParamType, newEmptyNode()) + ) + for i in 0 ..< fieldNames.len: + emitCtorParams.add( + newTree( + nnkIdentDefs, + copyNimTree(fieldNames[i]), + copyNimTree(fieldTypes[i]), + newEmptyNode(), + ) ) + + var emitCtorExpr = newTree(nnkObjConstr, copyNimTree(typeIdent)) + for i in 0 ..< fieldNames.len: + emitCtorExpr.add( + newTree( + nnkExprColonExpr, copyNimTree(fieldNames[i]), copyNimTree(fieldNames[i]) + ) + ) + + let emitCtorCallDefault = + newCall(copyNimTree(emitImplIdent), ident("DefaultBrokerContext"), emitCtorExpr) + let emitCtorBodyDefault = quote: + asyncSpawn `emitCtorCallDefault` + + let typedescEmitProcDefault = newTree( + nnkProcDef, + postfix(ident("emit"), "*"), + newEmptyNode(), + newEmptyNode(), + emitCtorParams, + newEmptyNode(), + newEmptyNode(), + emitCtorBodyDefault, ) + result.add(typedescEmitProcDefault) - var emitCtorExpr = newTree(nnkObjConstr, copyNimTree(typeIdent)) - for i in 0 ..< fieldNames.len: - emitCtorExpr.add( - newTree(nnkExprColonExpr, copyNimTree(fieldNames[i]), copyNimTree(fieldNames[i])) + var emitCtorParamsCtx = newTree(nnkFormalParams, newEmptyNode()) + emitCtorParamsCtx.add( + newTree(nnkIdentDefs, ident("_"), typedescParamType, newEmptyNode()) ) + emitCtorParamsCtx.add( + newTree(nnkIdentDefs, ident("brokerCtx"), ident("BrokerContext"), newEmptyNode()) + ) + for i in 0 ..< fieldNames.len: + emitCtorParamsCtx.add( + newTree( + nnkIdentDefs, + copyNimTree(fieldNames[i]), + copyNimTree(fieldTypes[i]), + newEmptyNode(), + ) + ) - let emitCtorCall = newCall(copyNimTree(emitImplIdent), emitCtorExpr) - let emitCtorBody = quote: - asyncSpawn `emitCtorCall` + let emitCtorCallCtx = + newCall(copyNimTree(emitImplIdent), ident("brokerCtx"), copyNimTree(emitCtorExpr)) + let emitCtorBodyCtx = quote: + asyncSpawn `emitCtorCallCtx` - let typedescEmitProc = newTree( - nnkProcDef, - postfix(ident("emit"), "*"), - newEmptyNode(), - newEmptyNode(), - emitCtorParams, - newEmptyNode(), - newEmptyNode(), - emitCtorBody, - ) - - result.add(typedescEmitProc) + let typedescEmitProcCtx = newTree( + nnkProcDef, + postfix(ident("emit"), "*"), + newEmptyNode(), + newEmptyNode(), + emitCtorParamsCtx, + newEmptyNode(), + newEmptyNode(), + emitCtorBodyCtx, + ) + result.add(typedescEmitProcCtx) when defined(eventBrokerDebug): echo result.repr From ca96391255d45b240f48c4f72ac931304b8c64e8 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 17 Dec 2025 08:37:53 +0100 Subject: [PATCH 3/8] Enhance MultiRequestBroker - similar to RequestBroker and EventBroker - with support for native and external types and context aware execution. --- tests/common/test_multi_request_broker.nim | 110 ++++ waku/common/broker/multi_request_broker.nim | 591 ++++++++++++++------ 2 files changed, 540 insertions(+), 161 deletions(-) diff --git a/tests/common/test_multi_request_broker.nim b/tests/common/test_multi_request_broker.nim index 3bf10a54d..d45de478a 100644 --- a/tests/common/test_multi_request_broker.nim +++ b/tests/common/test_multi_request_broker.nim @@ -31,6 +31,23 @@ MultiRequestBroker: suffix: string ): Future[Result[DualResponse, string]] {.async.} +type ExternalBaseType = string + +MultiRequestBroker: + type NativeIntResponse = int + + proc signatureFetch*(): Future[Result[NativeIntResponse, string]] {.async.} + +MultiRequestBroker: + type ExternalAliasResponse = ExternalBaseType + + proc signatureFetch*(): Future[Result[ExternalAliasResponse, string]] {.async.} + +MultiRequestBroker: + type AlreadyDistinctResponse = distinct int + + proc signatureFetch*(): Future[Result[AlreadyDistinctResponse, string]] {.async.} + suite "MultiRequestBroker": test "aggregates zero-argument providers": discard NoArgResponse.setProvider( @@ -211,6 +228,99 @@ suite "MultiRequestBroker": test "ref providers returning nil fail request": DualResponse.clearProviders() + test "supports native request types": + NativeIntResponse.clearProviders() + + discard NativeIntResponse.setProvider( + proc(): Future[Result[NativeIntResponse, string]] {.async.} = + ok(NativeIntResponse(1)) + ) + + discard NativeIntResponse.setProvider( + proc(): Future[Result[NativeIntResponse, string]] {.async.} = + ok(NativeIntResponse(2)) + ) + + let res = waitFor NativeIntResponse.request() + check res.isOk() + check res.get().len == 2 + check res.get().anyIt(int(it) == 1) + check res.get().anyIt(int(it) == 2) + + NativeIntResponse.clearProviders() + + test "supports external request types": + ExternalAliasResponse.clearProviders() + + discard ExternalAliasResponse.setProvider( + proc(): Future[Result[ExternalAliasResponse, string]] {.async.} = + ok(ExternalAliasResponse("hello")) + ) + + let res = waitFor ExternalAliasResponse.request() + check res.isOk() + check res.get().len == 1 + check ExternalBaseType(res.get()[0]) == "hello" + + ExternalAliasResponse.clearProviders() + + test "supports already-distinct request types": + AlreadyDistinctResponse.clearProviders() + + discard AlreadyDistinctResponse.setProvider( + proc(): Future[Result[AlreadyDistinctResponse, string]] {.async.} = + ok(AlreadyDistinctResponse(7)) + ) + + let res = waitFor AlreadyDistinctResponse.request() + check res.isOk() + check res.get().len == 1 + check int(res.get()[0]) == 7 + + AlreadyDistinctResponse.clearProviders() + + test "context-aware providers are isolated": + NoArgResponse.clearProviders() + let ctxA = NewBrokerContext() + let ctxB = NewBrokerContext() + + discard NoArgResponse.setProvider( + ctxA, + proc(): Future[Result[NoArgResponse, string]] {.async.} = + ok(NoArgResponse(label: "a")), + ) + discard NoArgResponse.setProvider( + ctxB, + proc(): Future[Result[NoArgResponse, string]] {.async.} = + ok(NoArgResponse(label: "b")), + ) + + let resA = waitFor NoArgResponse.request(ctxA) + check resA.isOk() + check resA.get().len == 1 + check resA.get()[0].label == "a" + + let resB = waitFor NoArgResponse.request(ctxB) + check resB.isOk() + check resB.get().len == 1 + check resB.get()[0].label == "b" + + let resDefault = waitFor NoArgResponse.request() + check resDefault.isOk() + check resDefault.get().len == 0 + + NoArgResponse.clearProviders(ctxA) + let clearedA = waitFor NoArgResponse.request(ctxA) + check clearedA.isOk() + check clearedA.get().len == 0 + + let stillB = waitFor NoArgResponse.request(ctxB) + check stillB.isOk() + check stillB.get().len == 1 + check stillB.get()[0].label == "b" + + NoArgResponse.clearProviders(ctxB) + discard DualResponse.setProvider( proc(): Future[Result[DualResponse, string]] {.async.} = let nilResponse: DualResponse = nil diff --git a/waku/common/broker/multi_request_broker.nim b/waku/common/broker/multi_request_broker.nim index 7f4161f5a..c2b9cf66f 100644 --- a/waku/common/broker/multi_request_broker.nim +++ b/waku/common/broker/multi_request_broker.nim @@ -5,12 +5,35 @@ ## need for direct dependencies in between. ## Worth considering using it for use cases where you need to collect data from multiple providers. ## -## Provides a declarative way to define an immutable value type together with a -## thread-local broker that can register multiple asynchronous providers, dispatch -## typed requests, and clear handlers. Unlike `RequestBroker`, -## every call to `request` fan-outs to every registered provider and returns with -## collected responses. -## Request succeeds if all providers succeed, otherwise fails with an error. +## Generates a standalone, type-safe request broker for the declared type. +## The macro exports the value type itself plus a broker companion that manages +## providers via thread-local storage. +## +## Unlike `RequestBroker`, every call to `request` fan-outs to every registered +## provider and returns all collected responses. +## The request succeeds only if all providers succeed, otherwise it fails. +## +## Type definitions: +## - Inline `object` / `ref object` definitions are supported. +## - Native types, aliases, and externally-defined types are also supported. +## In that case, MultiRequestBroker will automatically wrap the declared RHS +## type in `distinct` unless you already used `distinct`. +## This keeps request types unique even when multiple brokers share the same +## underlying base type. +## +## Default vs. context aware use: +## Every generated broker is a thread-local global instance. +## Sometimes you want multiple independent provider sets for the same request +## type within the same thread (e.g. multiple components). For that, you can use +## context-aware MultiRequestBroker. +## +## Context awareness is supported through the `BrokerContext` argument for +## `setProvider`, `request`, `removeProvider`, and `clearProviders`. +## Provider stores are kept separate per broker context. +## +## Default broker context is defined as `DefaultBrokerContext`. If you don't +## need context awareness, you can keep using the interfaces without the context +## argument, which operate on `DefaultBrokerContext`. ## ## Usage: ## @@ -29,14 +52,17 @@ ## ## ``` ## -## You regiser request processor (proveder) at any place of the code without the need to know of who ever may request. -## Respectively to the defined signatures register provider functions with `TypeName.setProvider(...)`. -## Providers are async procs or lambdas that return with a Future[Result[seq[TypeName], string]]. -## Notice MultiRequestBroker's `setProvider` return with a handler that can be used to remove the provider later (or error). +## You can register a request processor (provider) anywhere without the need to +## know who will request. +## Register provider functions with `TypeName.setProvider(...)`. +## Providers are async procs or lambdas that return `Future[Result[TypeName, string]]`. +## `setProvider` returns a handle (or an error) that can later be used to remove +## the provider. -## Requests can be made from anywhere with no direct dependency on the provider(s) by -## calling `TypeName.request()` - with arguments respecting the signature(s). -## This will asynchronously call the registered provider and return the collected data, in form of `Future[Result[seq[TypeName], string]]`. +## Requests can be made from anywhere with no direct dependency on the provider(s) +## by calling `TypeName.request()` (with arguments respecting the declared signature). +## This will asynchronously call all registered providers and return the collected +## responses as `Future[Result[seq[TypeName], string]]`. ## ## Whenever you don't want to process requests anymore (or your object instance that provides the request goes out of scope), ## you can remove it from the broker with `TypeName.removeProvider(handle)`. @@ -77,8 +103,9 @@ import std/[macros, strutils, tables, sugar] import chronos import results import ./helper/broker_utils +import ./broker_context -export results, chronos +export results, chronos, broker_context proc isReturnTypeValid(returnType, typeIdent: NimNode): bool = ## Accept Future[Result[TypeIdent, string]] as the contract. @@ -123,6 +150,13 @@ proc makeProcType(returnType: NimNode, params: seq[NimNode]): NimNode = newTree(nnkProcTy, formal, pragmas) +proc ensureDistinctType(rhs: NimNode): NimNode = + ## For PODs / aliases / externally-defined types, wrap in `distinct` unless + ## it's already distinct. + if rhs.kind == nnkDistinctTy: + return copyNimTree(rhs) + newTree(nnkDistinctTy, copyNimTree(rhs)) + macro MultiRequestBroker*(body: untyped): untyped = when defined(requestBrokerDebug): echo body.treeRepr @@ -134,11 +168,42 @@ macro MultiRequestBroker*(body: untyped): untyped = for def in stmt: if def.kind != nnkTypeDef: continue + if not typeIdent.isNil(): + error("Only one type may be declared inside MultiRequestBroker", def) + + typeIdent = baseTypeIdent(def[0]) let rhs = def[2] - var objectType: NimNode + case rhs.kind of nnkObjectTy: - objectType = rhs + let objectType = rhs + let recList = objectType[2] + if recList.kind != nnkRecList: + error( + "MultiRequestBroker object must declare a standard field list", objectType + ) + var exportedRecList = newTree(nnkRecList) + for field in recList: + case field.kind + of nnkIdentDefs: + ensureFieldDef(field) + var cloned = copyNimTree(field) + for i in 0 ..< cloned.len - 2: + cloned[i] = exportIdentNode(cloned[i]) + exportedRecList.add(cloned) + of nnkEmpty: + discard + else: + error( + "MultiRequestBroker object definition only supports simple field declarations", + field, + ) + objectDef = newTree( + nnkObjectTy, + copyNimTree(objectType[0]), + copyNimTree(objectType[1]), + exportedRecList, + ) of nnkRefTy: isRefObject = true if rhs.len != 1 or rhs[0].kind != nnkObjectTy: @@ -146,45 +211,44 @@ macro MultiRequestBroker*(body: untyped): untyped = "MultiRequestBroker ref object must wrap a concrete object definition", rhs, ) - objectType = rhs[0] - else: - continue - if not typeIdent.isNil(): - error("Only one object type may be declared inside MultiRequestBroker", def) - typeIdent = baseTypeIdent(def[0]) - let recList = objectType[2] - if recList.kind != nnkRecList: - error( - "MultiRequestBroker object must declare a standard field list", objectType - ) - var exportedRecList = newTree(nnkRecList) - for field in recList: - case field.kind - of nnkIdentDefs: - ensureFieldDef(field) - var cloned = copyNimTree(field) - for i in 0 ..< cloned.len - 2: - cloned[i] = exportIdentNode(cloned[i]) - exportedRecList.add(cloned) - of nnkEmpty: - discard - else: + let objectType = rhs[0] + let recList = objectType[2] + if recList.kind != nnkRecList: error( - "MultiRequestBroker object definition only supports simple field declarations", - field, + "MultiRequestBroker object must declare a standard field list", objectType ) - let exportedObjectType = newTree( - nnkObjectTy, - copyNimTree(objectType[0]), - copyNimTree(objectType[1]), - exportedRecList, - ) - if isRefObject: + var exportedRecList = newTree(nnkRecList) + for field in recList: + case field.kind + of nnkIdentDefs: + ensureFieldDef(field) + var cloned = copyNimTree(field) + for i in 0 ..< cloned.len - 2: + cloned[i] = exportIdentNode(cloned[i]) + exportedRecList.add(cloned) + of nnkEmpty: + discard + else: + error( + "MultiRequestBroker object definition only supports simple field declarations", + field, + ) + let exportedObjectType = newTree( + nnkObjectTy, + copyNimTree(objectType[0]), + copyNimTree(objectType[1]), + exportedRecList, + ) objectDef = newTree(nnkRefTy, exportedObjectType) else: - objectDef = exportedObjectType + # Native type / alias / externally-defined type. + # Ensure we create a unique request type by wrapping in `distinct` unless + # the user already did. + objectDef = ensureDistinctType(rhs) + isRefObject = false + if typeIdent.isNil(): - error("MultiRequestBroker body must declare exactly one object type", body) + error("MultiRequestBroker body must declare exactly one type", body) when defined(requestBrokerDebug): echo "MultiRequestBroker generating type: ", $typeIdent @@ -193,12 +257,13 @@ macro MultiRequestBroker*(body: untyped): untyped = let sanitized = sanitizeIdentName(typeIdent) let typeNameLit = newLit($typeIdent) let isRefObjectLit = newLit(isRefObject) - let tableSym = bindSym"Table" - let initTableSym = bindSym"initTable" let uint64Ident = ident("uint64") let providerKindIdent = ident(sanitized & "ProviderKind") let providerHandleIdent = ident(sanitized & "ProviderHandle") let exportedProviderHandleIdent = postfix(copyNimTree(providerHandleIdent), "*") + let bucketTypeIdent = ident(sanitized & "CtxBucket") + let findBucketIdxIdent = ident(sanitized & "FindBucketIdx") + let getOrCreateBucketIdxIdent = ident(sanitized & "GetOrCreateBucketIdx") let zeroKindIdent = ident("pk" & sanitized & "NoArgs") let argKindIdent = ident("pk" & sanitized & "WithArgs") var zeroArgSig: NimNode = nil @@ -306,63 +371,90 @@ macro MultiRequestBroker*(body: untyped): untyped = let procType = makeProcType(returnType, cloneParams(argParams)) typeSection.add(newTree(nnkTypeDef, argProviderName, newEmptyNode(), procType)) - var brokerRecList = newTree(nnkRecList) + var bucketRecList = newTree(nnkRecList) + bucketRecList.add( + newTree(nnkIdentDefs, ident("brokerCtx"), ident("BrokerContext"), newEmptyNode()) + ) if not zeroArgSig.isNil(): - brokerRecList.add( + bucketRecList.add( newTree( nnkIdentDefs, zeroArgFieldName, - newTree(nnkBracketExpr, tableSym, uint64Ident, zeroArgProviderName), + newTree(nnkBracketExpr, ident("seq"), zeroArgProviderName), newEmptyNode(), ) ) if not argSig.isNil(): - brokerRecList.add( + bucketRecList.add( newTree( nnkIdentDefs, argFieldName, - newTree(nnkBracketExpr, tableSym, uint64Ident, argProviderName), + newTree(nnkBracketExpr, ident("seq"), argProviderName), newEmptyNode(), ) ) - brokerRecList.add(newTree(nnkIdentDefs, ident("nextId"), uint64Ident, newEmptyNode())) - let brokerTypeIdent = ident(sanitizeIdentName(typeIdent) & "Broker") - let brokerTypeDef = newTree( - nnkTypeDef, - brokerTypeIdent, - newEmptyNode(), + typeSection.add( newTree( - nnkRefTy, newTree(nnkObjectTy, newEmptyNode(), newEmptyNode(), brokerRecList) - ), + nnkTypeDef, + bucketTypeIdent, + newEmptyNode(), + newTree(nnkObjectTy, newEmptyNode(), newEmptyNode(), bucketRecList), + ) + ) + + var brokerRecList = newTree(nnkRecList) + brokerRecList.add( + newTree( + nnkIdentDefs, + ident("buckets"), + newTree(nnkBracketExpr, ident("seq"), bucketTypeIdent), + newEmptyNode(), + ) + ) + let brokerTypeIdent = ident(sanitizeIdentName(typeIdent) & "Broker") + typeSection.add( + newTree( + nnkTypeDef, + brokerTypeIdent, + newEmptyNode(), + newTree( + nnkRefTy, newTree(nnkObjectTy, newEmptyNode(), newEmptyNode(), brokerRecList) + ), + ) ) - typeSection.add(brokerTypeDef) result = newStmtList() result.add(typeSection) let globalVarIdent = ident("g" & sanitizeIdentName(typeIdent) & "Broker") let accessProcIdent = ident("access" & sanitizeIdentName(typeIdent) & "Broker") - var initStatements = newStmtList() - if not zeroArgSig.isNil(): - initStatements.add( - quote do: - `globalVarIdent`.`zeroArgFieldName` = - `initTableSym`[`uint64Ident`, `zeroArgProviderName`]() - ) - if not argSig.isNil(): - initStatements.add( - quote do: - `globalVarIdent`.`argFieldName` = - `initTableSym`[`uint64Ident`, `argProviderName`]() - ) result.add( quote do: var `globalVarIdent` {.threadvar.}: `brokerTypeIdent` + proc `findBucketIdxIdent`( + broker: `brokerTypeIdent`, brokerCtx: BrokerContext + ): int = + if brokerCtx == DefaultBrokerContext: + return 0 + for i in 1 ..< broker.buckets.len: + if broker.buckets[i].brokerCtx == brokerCtx: + return i + return -1 + + proc `getOrCreateBucketIdxIdent`( + broker: `brokerTypeIdent`, brokerCtx: BrokerContext + ): int = + let idx = `findBucketIdxIdent`(broker, brokerCtx) + if idx >= 0: + return idx + broker.buckets.add(`bucketTypeIdent`(brokerCtx: brokerCtx)) + return broker.buckets.high + proc `accessProcIdent`(): `brokerTypeIdent` = if `globalVarIdent`.isNil(): new(`globalVarIdent`) - `globalVarIdent`.nextId = 1'u64 - `initStatements` + `globalVarIdent`.buckets = + @[`bucketTypeIdent`(brokerCtx: DefaultBrokerContext)] return `globalVarIdent` ) @@ -372,40 +464,47 @@ macro MultiRequestBroker*(body: untyped): untyped = result.add( quote do: proc setProvider*( - _: typedesc[`typeIdent`], handler: `zeroArgProviderName` + _: typedesc[`typeIdent`], + brokerCtx: BrokerContext, + handler: `zeroArgProviderName`, ): Result[`providerHandleIdent`, string] = if handler.isNil(): return err("Provider handler must be provided") let broker = `accessProcIdent`() - if broker.nextId == 0'u64: - broker.nextId = 1'u64 - for existingId, existing in broker.`zeroArgFieldName`.pairs: - if existing == handler: - return ok(`providerHandleIdent`(id: existingId, kind: `zeroKindIdent`)) - let newId = broker.nextId - inc broker.nextId - broker.`zeroArgFieldName`[newId] = handler - return ok(`providerHandleIdent`(id: newId, kind: `zeroKindIdent`)) + let bucketIdx = `getOrCreateBucketIdxIdent`(broker, brokerCtx) + for i, existing in broker.buckets[bucketIdx].`zeroArgFieldName`: + if not existing.isNil() and existing == handler: + return ok(`providerHandleIdent`(id: uint64(i + 1), kind: `zeroKindIdent`)) + broker.buckets[bucketIdx].`zeroArgFieldName`.add(handler) + return ok( + `providerHandleIdent`( + id: uint64(broker.buckets[bucketIdx].`zeroArgFieldName`.len), + kind: `zeroKindIdent`, + ) + ) + + proc setProvider*( + _: typedesc[`typeIdent`], handler: `zeroArgProviderName` + ): Result[`providerHandleIdent`, string] = + return setProvider(`typeIdent`, DefaultBrokerContext, handler) - ) - clearBody.add( - quote do: - let broker = `accessProcIdent`() - if not broker.isNil() and broker.`zeroArgFieldName`.len > 0: - broker.`zeroArgFieldName`.clear() ) result.add( quote do: proc request*( - _: typedesc[`typeIdent`] + _: typedesc[`typeIdent`], brokerCtx: BrokerContext ): Future[Result[seq[`typeIdent`], string]] {.async: (raises: []), gcsafe.} = var aggregated: seq[`typeIdent`] = @[] - let providers = `accessProcIdent`().`zeroArgFieldName` + let broker = `accessProcIdent`() + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: + return ok(aggregated) + let providers = broker.buckets[bucketIdx].`zeroArgFieldName` if providers.len == 0: return ok(aggregated) # var providersFut: seq[Future[Result[`typeIdent`, string]]] = collect: var providersFut = collect(newSeq): - for provider in providers.values: + for provider in providers: if provider.isNil(): continue provider() @@ -435,32 +534,40 @@ macro MultiRequestBroker*(body: untyped): untyped = return ok(aggregated) + proc request*( + _: typedesc[`typeIdent`] + ): Future[Result[seq[`typeIdent`], string]] = + return request(`typeIdent`, DefaultBrokerContext) + ) if not argSig.isNil(): result.add( quote do: proc setProvider*( - _: typedesc[`typeIdent`], handler: `argProviderName` + _: typedesc[`typeIdent`], + brokerCtx: BrokerContext, + handler: `argProviderName`, ): Result[`providerHandleIdent`, string] = if handler.isNil(): return err("Provider handler must be provided") let broker = `accessProcIdent`() - if broker.nextId == 0'u64: - broker.nextId = 1'u64 - for existingId, existing in broker.`argFieldName`.pairs: - if existing == handler: - return ok(`providerHandleIdent`(id: existingId, kind: `argKindIdent`)) - let newId = broker.nextId - inc broker.nextId - broker.`argFieldName`[newId] = handler - return ok(`providerHandleIdent`(id: newId, kind: `argKindIdent`)) + let bucketIdx = `getOrCreateBucketIdxIdent`(broker, brokerCtx) + for i, existing in broker.buckets[bucketIdx].`argFieldName`: + if not existing.isNil() and existing == handler: + return ok(`providerHandleIdent`(id: uint64(i + 1), kind: `argKindIdent`)) + broker.buckets[bucketIdx].`argFieldName`.add(handler) + return ok( + `providerHandleIdent`( + id: uint64(broker.buckets[bucketIdx].`argFieldName`.len), + kind: `argKindIdent`, + ) + ) + + proc setProvider*( + _: typedesc[`typeIdent`], handler: `argProviderName` + ): Result[`providerHandleIdent`, string] = + return setProvider(`typeIdent`, DefaultBrokerContext, handler) - ) - clearBody.add( - quote do: - let broker = `accessProcIdent`() - if not broker.isNil() and broker.`argFieldName`.len > 0: - broker.`argFieldName`.clear() ) let requestParamDefs = cloneParams(argParams) let argNameIdents = collectParamNames(requestParamDefs) @@ -481,17 +588,24 @@ macro MultiRequestBroker*(body: untyped): untyped = newEmptyNode(), ) ) + formalParams.add( + newTree(nnkIdentDefs, ident("brokerCtx"), ident("BrokerContext"), newEmptyNode()) + ) for paramDef in requestParamDefs: formalParams.add(paramDef) let requestPragmas = quote: {.async: (raises: []), gcsafe.} let requestBody = quote: var aggregated: seq[`typeIdent`] = @[] - let providers = `accessProcIdent`().`argFieldName` + let broker = `accessProcIdent`() + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: + return ok(aggregated) + let providers = broker.buckets[bucketIdx].`argFieldName` if providers.len == 0: return ok(aggregated) var providersFut = collect(newSeq): - for provider in providers.values: + for provider in providers: if provider.isNil(): continue let `providerSym` = provider @@ -531,53 +645,208 @@ macro MultiRequestBroker*(body: untyped): untyped = ) ) - result.add( - quote do: - proc clearProviders*(_: typedesc[`typeIdent`]) = - `clearBody` - let broker = `accessProcIdent`() - if not broker.isNil(): - broker.nextId = 1'u64 - - ) - - let removeHandleSym = genSym(nskParam, "handle") - let removeBrokerSym = genSym(nskLet, "broker") - var removeBody = newStmtList() - removeBody.add( - quote do: - if `removeHandleSym`.id == 0'u64: - return - let `removeBrokerSym` = `accessProcIdent`() - if `removeBrokerSym`.isNil(): - return - ) - if not zeroArgSig.isNil(): - removeBody.add( + # Backward-compatible default-context overload (no brokerCtx parameter). + var formalParamsDefault = newTree(nnkFormalParams) + formalParamsDefault.add( quote do: - if `removeHandleSym`.kind == `zeroKindIdent`: - `removeBrokerSym`.`zeroArgFieldName`.del(`removeHandleSym`.id) - return + Future[Result[seq[`typeIdent`], string]] ) - if not argSig.isNil(): - removeBody.add( - quote do: - if `removeHandleSym`.kind == `argKindIdent`: - `removeBrokerSym`.`argFieldName`.del(`removeHandleSym`.id) - return + formalParamsDefault.add( + newTree( + nnkIdentDefs, + ident("_"), + newTree(nnkBracketExpr, ident("typedesc"), copyNimTree(typeIdent)), + newEmptyNode(), + ) ) - removeBody.add( - quote do: - discard - ) - result.add( - quote do: - proc removeProvider*( - _: typedesc[`typeIdent`], `removeHandleSym`: `providerHandleIdent` - ) = - `removeBody` + for paramDef in requestParamDefs: + formalParamsDefault.add(copyNimTree(paramDef)) - ) + var wrapperCall = newCall(ident("request")) + wrapperCall.add(copyNimTree(typeIdent)) + wrapperCall.add(ident("DefaultBrokerContext")) + for argName in argNameIdents: + wrapperCall.add(copyNimTree(argName)) + + result.add( + newTree( + nnkProcDef, + postfix(ident("request"), "*"), + newEmptyNode(), + newEmptyNode(), + formalParamsDefault, + newEmptyNode(), + newEmptyNode(), + newStmtList(newTree(nnkReturnStmt, wrapperCall)), + ) + ) + let removeHandleCtxSym = genSym(nskParam, "handle") + let removeHandleDefaultSym = genSym(nskParam, "handle") + + when true: + # Generate clearProviders / removeProvider with macro-time knowledge about which + # provider lists exist (zero-arg and/or arg providers). + if not zeroArgSig.isNil() and not argSig.isNil(): + result.add( + quote do: + proc clearProviders*(_: typedesc[`typeIdent`], brokerCtx: BrokerContext) = + let broker = `accessProcIdent`() + if broker.isNil(): + return + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: + return + broker.buckets[bucketIdx].`zeroArgFieldName`.setLen(0) + broker.buckets[bucketIdx].`argFieldName`.setLen(0) + if brokerCtx != DefaultBrokerContext: + broker.buckets.delete(bucketIdx) + + proc clearProviders*(_: typedesc[`typeIdent`]) = + clearProviders(`typeIdent`, DefaultBrokerContext) + + proc removeProvider*( + _: typedesc[`typeIdent`], + brokerCtx: BrokerContext, + `removeHandleCtxSym`: `providerHandleIdent`, + ) = + if `removeHandleCtxSym`.id == 0'u64: + return + let broker = `accessProcIdent`() + if broker.isNil(): + return + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: + return + + if `removeHandleCtxSym`.kind == `zeroKindIdent`: + let idx = int(`removeHandleCtxSym`.id) - 1 + if idx >= 0 and idx < broker.buckets[bucketIdx].`zeroArgFieldName`.len: + broker.buckets[bucketIdx].`zeroArgFieldName`[idx] = nil + elif `removeHandleCtxSym`.kind == `argKindIdent`: + let idx = int(`removeHandleCtxSym`.id) - 1 + if idx >= 0 and idx < broker.buckets[bucketIdx].`argFieldName`.len: + broker.buckets[bucketIdx].`argFieldName`[idx] = nil + + if brokerCtx != DefaultBrokerContext: + var hasAny = false + for p in broker.buckets[bucketIdx].`zeroArgFieldName`: + if not p.isNil(): + hasAny = true + break + if not hasAny: + for p in broker.buckets[bucketIdx].`argFieldName`: + if not p.isNil(): + hasAny = true + break + if not hasAny: + broker.buckets.delete(bucketIdx) + + proc removeProvider*( + _: typedesc[`typeIdent`], `removeHandleDefaultSym`: `providerHandleIdent` + ) = + removeProvider(`typeIdent`, DefaultBrokerContext, `removeHandleDefaultSym`) + + ) + elif not zeroArgSig.isNil(): + result.add( + quote do: + proc clearProviders*(_: typedesc[`typeIdent`], brokerCtx: BrokerContext) = + let broker = `accessProcIdent`() + if broker.isNil(): + return + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: + return + broker.buckets[bucketIdx].`zeroArgFieldName`.setLen(0) + if brokerCtx != DefaultBrokerContext: + broker.buckets.delete(bucketIdx) + + proc clearProviders*(_: typedesc[`typeIdent`]) = + clearProviders(`typeIdent`, DefaultBrokerContext) + + proc removeProvider*( + _: typedesc[`typeIdent`], + brokerCtx: BrokerContext, + `removeHandleCtxSym`: `providerHandleIdent`, + ) = + if `removeHandleCtxSym`.id == 0'u64: + return + let broker = `accessProcIdent`() + if broker.isNil(): + return + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: + return + if `removeHandleCtxSym`.kind != `zeroKindIdent`: + return + let idx = int(`removeHandleCtxSym`.id) - 1 + if idx >= 0 and idx < broker.buckets[bucketIdx].`zeroArgFieldName`.len: + broker.buckets[bucketIdx].`zeroArgFieldName`[idx] = nil + if brokerCtx != DefaultBrokerContext: + var hasAny = false + for p in broker.buckets[bucketIdx].`zeroArgFieldName`: + if not p.isNil(): + hasAny = true + break + if not hasAny: + broker.buckets.delete(bucketIdx) + + proc removeProvider*( + _: typedesc[`typeIdent`], `removeHandleDefaultSym`: `providerHandleIdent` + ) = + removeProvider(`typeIdent`, DefaultBrokerContext, `removeHandleDefaultSym`) + + ) + else: + result.add( + quote do: + proc clearProviders*(_: typedesc[`typeIdent`], brokerCtx: BrokerContext) = + let broker = `accessProcIdent`() + if broker.isNil(): + return + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: + return + broker.buckets[bucketIdx].`argFieldName`.setLen(0) + if brokerCtx != DefaultBrokerContext: + broker.buckets.delete(bucketIdx) + + proc clearProviders*(_: typedesc[`typeIdent`]) = + clearProviders(`typeIdent`, DefaultBrokerContext) + + proc removeProvider*( + _: typedesc[`typeIdent`], + brokerCtx: BrokerContext, + `removeHandleCtxSym`: `providerHandleIdent`, + ) = + if `removeHandleCtxSym`.id == 0'u64: + return + let broker = `accessProcIdent`() + if broker.isNil(): + return + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: + return + if `removeHandleCtxSym`.kind != `argKindIdent`: + return + let idx = int(`removeHandleCtxSym`.id) - 1 + if idx >= 0 and idx < broker.buckets[bucketIdx].`argFieldName`.len: + broker.buckets[bucketIdx].`argFieldName`[idx] = nil + if brokerCtx != DefaultBrokerContext: + var hasAny = false + for p in broker.buckets[bucketIdx].`argFieldName`: + if not p.isNil(): + hasAny = true + break + if not hasAny: + broker.buckets.delete(bucketIdx) + + proc removeProvider*( + _: typedesc[`typeIdent`], `removeHandleDefaultSym`: `providerHandleIdent` + ) = + removeProvider(`typeIdent`, DefaultBrokerContext, `removeHandleDefaultSym`) + + ) when defined(requestBrokerDebug): echo result.repr From 26f6dbefa2cec2a6f09d553a4991a88b29d1df37 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 17 Dec 2025 08:58:40 +0100 Subject: [PATCH 4/8] Move duplicated and common code into broker_utils from event- request- and multi_request_brokers --- waku/common/broker/event_broker.nim | 100 +----------- waku/common/broker/helper/broker_utils.nim | 163 ++++++++++++++++++++ waku/common/broker/multi_request_broker.nim | 117 +------------- waku/common/broker/request_broker.nim | 102 +----------- 4 files changed, 176 insertions(+), 306 deletions(-) diff --git a/waku/common/broker/event_broker.nim b/waku/common/broker/event_broker.nim index f694c2c98..779689f88 100644 --- a/waku/common/broker/event_broker.nim +++ b/waku/common/broker/event_broker.nim @@ -90,103 +90,15 @@ import ./helper/broker_utils, broker_context export chronicles, results, chronos, broker_context -proc ensureDistinctType(rhs: NimNode): NimNode = - ## For PODs / aliases / externally-defined types, wrap in `distinct` unless - ## it's already distinct. - if rhs.kind == nnkDistinctTy: - return rhs - newTree(nnkDistinctTy, copyNimTree(rhs)) - macro EventBroker*(body: untyped): untyped = when defined(eventBrokerDebug): echo body.treeRepr - var typeIdent: NimNode = nil - var objectDef: NimNode = nil - var fieldNames: seq[NimNode] = @[] - var fieldTypes: seq[NimNode] = @[] - var hasInlineFields = false - for stmt in body: - if stmt.kind == nnkTypeSection: - for def in stmt: - if def.kind != nnkTypeDef: - continue - let rhs = def[2] - if not typeIdent.isNil(): - error("Only one type may be declared inside EventBroker", def) - typeIdent = baseTypeIdent(def[0]) - - # Inline object/ref object definitions. - case rhs.kind - of nnkObjectTy: - let recList = rhs[2] - if recList.kind != nnkRecList: - error("EventBroker object must declare a standard field list", rhs) - var exportedRecList = newTree(nnkRecList) - for field in recList: - case field.kind - of nnkIdentDefs: - ensureFieldDef(field) - let fieldTypeNode = field[field.len - 2] - for i in 0 ..< field.len - 2: - let baseFieldIdent = baseTypeIdent(field[i]) - fieldNames.add(copyNimTree(baseFieldIdent)) - fieldTypes.add(copyNimTree(fieldTypeNode)) - var cloned = copyNimTree(field) - for i in 0 ..< cloned.len - 2: - cloned[i] = exportIdentNode(cloned[i]) - exportedRecList.add(cloned) - of nnkEmpty: - discard - else: - error( - "EventBroker object definition only supports simple field declarations", - field, - ) - let exportedObjectType = newTree( - nnkObjectTy, copyNimTree(rhs[0]), copyNimTree(rhs[1]), exportedRecList - ) - objectDef = exportedObjectType - hasInlineFields = true - of nnkRefTy: - if rhs.len != 1 or rhs[0].kind != nnkObjectTy: - error("EventBroker ref object must wrap a concrete object definition", rhs) - let obj = rhs[0] - let recList = obj[2] - if recList.kind != nnkRecList: - error("EventBroker object must declare a standard field list", obj) - var exportedRecList = newTree(nnkRecList) - for field in recList: - case field.kind - of nnkIdentDefs: - ensureFieldDef(field) - let fieldTypeNode = field[field.len - 2] - for i in 0 ..< field.len - 2: - let baseFieldIdent = baseTypeIdent(field[i]) - fieldNames.add(copyNimTree(baseFieldIdent)) - fieldTypes.add(copyNimTree(fieldTypeNode)) - var cloned = copyNimTree(field) - for i in 0 ..< cloned.len - 2: - cloned[i] = exportIdentNode(cloned[i]) - exportedRecList.add(cloned) - of nnkEmpty: - discard - else: - error( - "EventBroker object definition only supports simple field declarations", - field, - ) - let exportedObjectType = newTree( - nnkObjectTy, copyNimTree(obj[0]), copyNimTree(obj[1]), exportedRecList - ) - objectDef = newTree(nnkRefTy, exportedObjectType) - hasInlineFields = true - else: - # Native type / alias / externally-defined type. - # Ensure we create a unique event type by wrapping in `distinct` unless - # the user already did. - objectDef = ensureDistinctType(rhs) - if typeIdent.isNil(): - error("EventBroker body must declare exactly one type", body) + let parsed = parseSingleTypeDef(body, "EventBroker", collectFieldInfo = true) + let typeIdent = parsed.typeIdent + let objectDef = parsed.objectDef + let fieldNames = parsed.fieldNames + let fieldTypes = parsed.fieldTypes + let hasInlineFields = parsed.hasInlineFields let exportedTypeIdent = postfix(copyNimTree(typeIdent), "*") let sanitized = sanitizeIdentName(typeIdent) diff --git a/waku/common/broker/helper/broker_utils.nim b/waku/common/broker/helper/broker_utils.nim index ea9f85750..90f2055d3 100644 --- a/waku/common/broker/helper/broker_utils.nim +++ b/waku/common/broker/helper/broker_utils.nim @@ -1,5 +1,21 @@ import std/macros +type ParsedBrokerType* = object + ## Result of parsing the single `type` definition inside a broker macro body. + ## + ## - `typeIdent`: base identifier for the declared type name + ## - `objectDef`: exported type definition RHS (inline object fields exported; + ## non-object types wrapped in `distinct` unless already distinct) + ## - `isRefObject`: true only for inline `ref object` definitions + ## - `hasInlineFields`: true for inline `object` / `ref object` + ## - `fieldNames`/`fieldTypes`: populated only when `collectFieldInfo = true` + typeIdent*: NimNode + objectDef*: NimNode + isRefObject*: bool + hasInlineFields*: bool + fieldNames*: seq[NimNode] + fieldTypes*: seq[NimNode] + proc sanitizeIdentName*(node: NimNode): string = var raw = $node var sanitizedName = newStringOfCap(raw.len) @@ -41,3 +57,150 @@ proc baseTypeIdent*(defName: NimNode): NimNode = baseTypeIdent(defName[0]) else: error("Unsupported type name in broker definition", defName) + +proc ensureDistinctType*(rhs: NimNode): NimNode = + ## For PODs / aliases / externally-defined types, wrap in `distinct` unless + ## it's already distinct. + if rhs.kind == nnkDistinctTy: + return copyNimTree(rhs) + newTree(nnkDistinctTy, copyNimTree(rhs)) + +proc cloneParams*(params: seq[NimNode]): seq[NimNode] = + ## Deep copy parameter definitions so they can be inserted in multiple places. + result = @[] + for param in params: + result.add(copyNimTree(param)) + +proc collectParamNames*(params: seq[NimNode]): seq[NimNode] = + ## Extract all identifier symbols declared across IdentDefs nodes. + result = @[] + for param in params: + assert param.kind == nnkIdentDefs + for i in 0 ..< param.len - 2: + let nameNode = param[i] + if nameNode.kind == nnkEmpty: + continue + result.add(ident($nameNode)) + +proc parseSingleTypeDef*( + body: NimNode, + macroName: string, + allowRefToNonObject = false, + collectFieldInfo = false, +): ParsedBrokerType = + ## Parses exactly one `type` definition from a broker macro body. + ## + ## Supported RHS: + ## - inline `object` / `ref object` (fields are auto-exported) + ## - non-object types / aliases / externally-defined types (wrapped in `distinct`) + ## - optionally: `ref SomeType` when `allowRefToNonObject = true` + var typeIdent: NimNode = nil + var objectDef: NimNode = nil + var isRefObject = false + var hasInlineFields = false + var fieldNames: seq[NimNode] = @[] + var fieldTypes: seq[NimNode] = @[] + + for stmt in body: + if stmt.kind != nnkTypeSection: + continue + for def in stmt: + if def.kind != nnkTypeDef: + continue + if not typeIdent.isNil(): + error("Only one type may be declared inside " & macroName, def) + typeIdent = baseTypeIdent(def[0]) + let rhs = def[2] + + case rhs.kind + of nnkObjectTy: + let recList = rhs[2] + if recList.kind != nnkRecList: + error(macroName & " object must declare a standard field list", rhs) + var exportedRecList = newTree(nnkRecList) + for field in recList: + case field.kind + of nnkIdentDefs: + ensureFieldDef(field) + if collectFieldInfo: + let fieldTypeNode = field[field.len - 2] + for i in 0 ..< field.len - 2: + let baseFieldIdent = baseTypeIdent(field[i]) + fieldNames.add(copyNimTree(baseFieldIdent)) + fieldTypes.add(copyNimTree(fieldTypeNode)) + var cloned = copyNimTree(field) + for i in 0 ..< cloned.len - 2: + cloned[i] = exportIdentNode(cloned[i]) + exportedRecList.add(cloned) + of nnkEmpty: + discard + else: + error( + macroName & " object definition only supports simple field declarations", + field, + ) + objectDef = newTree( + nnkObjectTy, copyNimTree(rhs[0]), copyNimTree(rhs[1]), exportedRecList + ) + isRefObject = false + hasInlineFields = true + of nnkRefTy: + if rhs.len != 1: + error(macroName & " ref type must have a single base", rhs) + if rhs[0].kind == nnkObjectTy: + let obj = rhs[0] + let recList = obj[2] + if recList.kind != nnkRecList: + error(macroName & " object must declare a standard field list", obj) + var exportedRecList = newTree(nnkRecList) + for field in recList: + case field.kind + of nnkIdentDefs: + ensureFieldDef(field) + if collectFieldInfo: + let fieldTypeNode = field[field.len - 2] + for i in 0 ..< field.len - 2: + let baseFieldIdent = baseTypeIdent(field[i]) + fieldNames.add(copyNimTree(baseFieldIdent)) + fieldTypes.add(copyNimTree(fieldTypeNode)) + var cloned = copyNimTree(field) + for i in 0 ..< cloned.len - 2: + cloned[i] = exportIdentNode(cloned[i]) + exportedRecList.add(cloned) + of nnkEmpty: + discard + else: + error( + macroName & " object definition only supports simple field declarations", + field, + ) + let exportedObjectType = newTree( + nnkObjectTy, copyNimTree(obj[0]), copyNimTree(obj[1]), exportedRecList + ) + objectDef = newTree(nnkRefTy, exportedObjectType) + isRefObject = true + hasInlineFields = true + elif allowRefToNonObject: + ## `ref SomeType` (SomeType can be defined elsewhere) + objectDef = ensureDistinctType(rhs) + isRefObject = false + hasInlineFields = false + else: + error(macroName & " ref object must wrap a concrete object definition", rhs) + else: + ## Non-object type / alias. + objectDef = ensureDistinctType(rhs) + isRefObject = false + hasInlineFields = false + + if typeIdent.isNil(): + error(macroName & " body must declare exactly one type", body) + + result = ParsedBrokerType( + typeIdent: typeIdent, + objectDef: objectDef, + isRefObject: isRefObject, + hasInlineFields: hasInlineFields, + fieldNames: fieldNames, + fieldTypes: fieldTypes, + ) diff --git a/waku/common/broker/multi_request_broker.nim b/waku/common/broker/multi_request_broker.nim index c2b9cf66f..2baa19940 100644 --- a/waku/common/broker/multi_request_broker.nim +++ b/waku/common/broker/multi_request_broker.nim @@ -122,23 +122,6 @@ proc isReturnTypeValid(returnType, typeIdent: NimNode): bool = return false inner[2].kind == nnkIdent and inner[2].eqIdent("string") -proc cloneParams(params: seq[NimNode]): seq[NimNode] = - ## Deep copy parameter definitions so they can be reused in generated nodes. - result = @[] - for param in params: - result.add(copyNimTree(param)) - -proc collectParamNames(params: seq[NimNode]): seq[NimNode] = - ## Extract identifiers declared in parameter definitions. - result = @[] - for param in params: - assert param.kind == nnkIdentDefs - for i in 0 ..< param.len - 2: - let nameNode = param[i] - if nameNode.kind == nnkEmpty: - continue - result.add(ident($nameNode)) - proc makeProcType(returnType: NimNode, params: seq[NimNode]): NimNode = var formal = newTree(nnkFormalParams) formal.add(returnType) @@ -150,105 +133,13 @@ proc makeProcType(returnType: NimNode, params: seq[NimNode]): NimNode = newTree(nnkProcTy, formal, pragmas) -proc ensureDistinctType(rhs: NimNode): NimNode = - ## For PODs / aliases / externally-defined types, wrap in `distinct` unless - ## it's already distinct. - if rhs.kind == nnkDistinctTy: - return copyNimTree(rhs) - newTree(nnkDistinctTy, copyNimTree(rhs)) - macro MultiRequestBroker*(body: untyped): untyped = when defined(requestBrokerDebug): echo body.treeRepr - var typeIdent: NimNode = nil - var objectDef: NimNode = nil - var isRefObject = false - for stmt in body: - if stmt.kind == nnkTypeSection: - for def in stmt: - if def.kind != nnkTypeDef: - continue - if not typeIdent.isNil(): - error("Only one type may be declared inside MultiRequestBroker", def) - - typeIdent = baseTypeIdent(def[0]) - let rhs = def[2] - - case rhs.kind - of nnkObjectTy: - let objectType = rhs - let recList = objectType[2] - if recList.kind != nnkRecList: - error( - "MultiRequestBroker object must declare a standard field list", objectType - ) - var exportedRecList = newTree(nnkRecList) - for field in recList: - case field.kind - of nnkIdentDefs: - ensureFieldDef(field) - var cloned = copyNimTree(field) - for i in 0 ..< cloned.len - 2: - cloned[i] = exportIdentNode(cloned[i]) - exportedRecList.add(cloned) - of nnkEmpty: - discard - else: - error( - "MultiRequestBroker object definition only supports simple field declarations", - field, - ) - objectDef = newTree( - nnkObjectTy, - copyNimTree(objectType[0]), - copyNimTree(objectType[1]), - exportedRecList, - ) - of nnkRefTy: - isRefObject = true - if rhs.len != 1 or rhs[0].kind != nnkObjectTy: - error( - "MultiRequestBroker ref object must wrap a concrete object definition", - rhs, - ) - let objectType = rhs[0] - let recList = objectType[2] - if recList.kind != nnkRecList: - error( - "MultiRequestBroker object must declare a standard field list", objectType - ) - var exportedRecList = newTree(nnkRecList) - for field in recList: - case field.kind - of nnkIdentDefs: - ensureFieldDef(field) - var cloned = copyNimTree(field) - for i in 0 ..< cloned.len - 2: - cloned[i] = exportIdentNode(cloned[i]) - exportedRecList.add(cloned) - of nnkEmpty: - discard - else: - error( - "MultiRequestBroker object definition only supports simple field declarations", - field, - ) - let exportedObjectType = newTree( - nnkObjectTy, - copyNimTree(objectType[0]), - copyNimTree(objectType[1]), - exportedRecList, - ) - objectDef = newTree(nnkRefTy, exportedObjectType) - else: - # Native type / alias / externally-defined type. - # Ensure we create a unique request type by wrapping in `distinct` unless - # the user already did. - objectDef = ensureDistinctType(rhs) - isRefObject = false - - if typeIdent.isNil(): - error("MultiRequestBroker body must declare exactly one type", body) + let parsed = parseSingleTypeDef(body, "MultiRequestBroker") + let typeIdent = parsed.typeIdent + let objectDef = parsed.objectDef + let isRefObject = parsed.isRefObject when defined(requestBrokerDebug): echo "MultiRequestBroker generating type: ", $typeIdent diff --git a/waku/common/broker/request_broker.nim b/waku/common/broker/request_broker.nim index c71f90229..46f7d7d16 100644 --- a/waku/common/broker/request_broker.nim +++ b/waku/common/broker/request_broker.nim @@ -206,23 +206,6 @@ proc isReturnTypeValid(returnType, typeIdent: NimNode, mode: RequestBrokerMode): of rbSync: isSyncReturnTypeValid(returnType, typeIdent) -proc cloneParams(params: seq[NimNode]): seq[NimNode] = - ## Deep copy parameter definitions so they can be inserted in multiple places. - result = @[] - for param in params: - result.add(copyNimTree(param)) - -proc collectParamNames(params: seq[NimNode]): seq[NimNode] = - ## Extract all identifier symbols declared across IdentDefs nodes. - result = @[] - for param in params: - assert param.kind == nnkIdentDefs - for i in 0 ..< param.len - 2: - let nameNode = param[i] - if nameNode.kind == nnkEmpty: - continue - result.add(ident($nameNode)) - proc makeProcType( returnType: NimNode, params: seq[NimNode], mode: RequestBrokerMode ): NimNode = @@ -253,92 +236,13 @@ proc parseMode(modeNode: NimNode): RequestBrokerMode = else: error("RequestBroker mode must be `sync` or `async` (default is async)", modeNode) -proc ensureDistinctType(rhs: NimNode): NimNode = - ## For PODs / aliases / externally-defined types, wrap in `distinct` unless - ## it's already distinct. - if rhs.kind == nnkDistinctTy: - return copyNimTree(rhs) - newTree(nnkDistinctTy, copyNimTree(rhs)) - proc generateRequestBroker(body: NimNode, mode: RequestBrokerMode): NimNode = when defined(requestBrokerDebug): echo body.treeRepr echo "RequestBroker mode: ", $mode - var typeIdent: NimNode = nil - var objectDef: NimNode = nil - for stmt in body: - if stmt.kind == nnkTypeSection: - for def in stmt: - if def.kind != nnkTypeDef: - continue - if not typeIdent.isNil(): - error("Only one type may be declared inside RequestBroker", def) - - typeIdent = baseTypeIdent(def[0]) - let rhs = def[2] - - ## Support inline object types (fields are auto-exported) - ## AND non-object types / aliases (e.g. `string`, `int`, `OtherType`). - case rhs.kind - of nnkObjectTy: - let recList = rhs[2] - if recList.kind != nnkRecList: - error("RequestBroker object must declare a standard field list", rhs) - var exportedRecList = newTree(nnkRecList) - for field in recList: - case field.kind - of nnkIdentDefs: - ensureFieldDef(field) - var cloned = copyNimTree(field) - for i in 0 ..< cloned.len - 2: - cloned[i] = exportIdentNode(cloned[i]) - exportedRecList.add(cloned) - of nnkEmpty: - discard - else: - error( - "RequestBroker object definition only supports simple field declarations", - field, - ) - objectDef = newTree( - nnkObjectTy, copyNimTree(rhs[0]), copyNimTree(rhs[1]), exportedRecList - ) - of nnkRefTy: - if rhs.len != 1: - error("RequestBroker ref type must have a single base", rhs) - if rhs[0].kind == nnkObjectTy: - let obj = rhs[0] - let recList = obj[2] - if recList.kind != nnkRecList: - error("RequestBroker object must declare a standard field list", obj) - var exportedRecList = newTree(nnkRecList) - for field in recList: - case field.kind - of nnkIdentDefs: - ensureFieldDef(field) - var cloned = copyNimTree(field) - for i in 0 ..< cloned.len - 2: - cloned[i] = exportIdentNode(cloned[i]) - exportedRecList.add(cloned) - of nnkEmpty: - discard - else: - error( - "RequestBroker object definition only supports simple field declarations", - field, - ) - let exportedObjectType = newTree( - nnkObjectTy, copyNimTree(obj[0]), copyNimTree(obj[1]), exportedRecList - ) - objectDef = newTree(nnkRefTy, exportedObjectType) - else: - ## `ref SomeType` (SomeType can be defined elsewhere) - objectDef = ensureDistinctType(rhs) - else: - ## Non-object type / alias (e.g. `string`, `int`, `SomeExternalType`). - objectDef = ensureDistinctType(rhs) - if typeIdent.isNil(): - error("RequestBroker body must declare exactly one type", body) + let parsed = parseSingleTypeDef(body, "RequestBroker", allowRefToNonObject = true) + let typeIdent = parsed.typeIdent + let objectDef = parsed.objectDef when defined(requestBrokerDebug): echo "RequestBroker generating type: ", $typeIdent From 4b696856c7524ad5be46155df33967087ab85c8b Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 17 Dec 2025 09:01:35 +0100 Subject: [PATCH 5/8] fix - removed unreachable code from test --- tests/common/test_multi_request_broker.nim | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/common/test_multi_request_broker.nim b/tests/common/test_multi_request_broker.nim index d45de478a..39ed90eea 100644 --- a/tests/common/test_multi_request_broker.nim +++ b/tests/common/test_multi_request_broker.nim @@ -211,7 +211,6 @@ suite "MultiRequestBroker": let firstHandler = NoArgResponse.setProvider( proc(): Future[Result[NoArgResponse, string]] {.async.} = raise newException(ValueError, "first handler raised") - ok(NoArgResponse(label: "any")) ) discard NoArgResponse.setProvider( From 800e4975857648d4069ca81b456a9dabf251a7b1 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 17 Dec 2025 14:55:58 +0100 Subject: [PATCH 6/8] Change BrokerContext from random number to counter --- waku/common/broker/broker_context.nim | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/waku/common/broker/broker_context.nim b/waku/common/broker/broker_context.nim index 1b8235f6a..bdc5fc73a 100644 --- a/waku/common/broker/broker_context.nim +++ b/waku/common/broker/broker_context.nim @@ -1,4 +1,4 @@ -import std/[strutils, sysrand] +import std/[strutils, concurrency/atomics] type BrokerContext* = distinct uint32 @@ -9,18 +9,10 @@ func `$`*(bc: BrokerContext): string = const DefaultBrokerContext* = BrokerContext(0xCAFFE14E'u32) +var gContextCounter: Atomic[uint32] + proc NewBrokerContext*(): BrokerContext = - ## Generates a random non-default broker context (as a raw uint32). - ## - ## The default broker context is reserved for the provider at index 0. - ## This helper never returns that value. - for _ in 0 ..< 16: - let b = urandom(4) - if b.len != 4: - continue - let key = - (uint32(b[0]) shl 24) or (uint32(b[1]) shl 16) or (uint32(b[2]) shl 8) or - uint32(b[3]) - if key != uint32(DefaultBrokerContext): - return BrokerContext(key) - BrokerContext(1'u32) + var nextId = gContextCounter.fetchAdd(1, moRelaxed) + if nextId == uint32(DefaultBrokerContext): + nextId = gContextCounter.fetchAdd(1, moRelaxed) + return BrokerContext(nextId) From 94e00e7e2554bb51b8c575c0a357636701b6aff5 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Thu, 18 Dec 2025 12:04:41 +0100 Subject: [PATCH 7/8] Apply suggestion from @Ivansete-status Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --- waku/common/broker/broker_context.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/common/broker/broker_context.nim b/waku/common/broker/broker_context.nim index bdc5fc73a..794e45fc8 100644 --- a/waku/common/broker/broker_context.nim +++ b/waku/common/broker/broker_context.nim @@ -11,7 +11,7 @@ const DefaultBrokerContext* = BrokerContext(0xCAFFE14E'u32) var gContextCounter: Atomic[uint32] -proc NewBrokerContext*(): BrokerContext = +proc newBrokerContext*(): BrokerContext = var nextId = gContextCounter.fetchAdd(1, moRelaxed) if nextId == uint32(DefaultBrokerContext): nextId = gContextCounter.fetchAdd(1, moRelaxed) From f13a613bbb8059b87d9aee0bbeab51346a8fd14d Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Thu, 18 Dec 2025 15:13:53 +0100 Subject: [PATCH 8/8] Adjust naming in broker tests --- tests/common/test_event_broker.nim | 4 ++-- tests/common/test_multi_request_broker.nim | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/common/test_event_broker.nim b/tests/common/test_event_broker.nim index bcd081f4f..7d38666cb 100644 --- a/tests/common/test_event_broker.nim +++ b/tests/common/test_event_broker.nim @@ -136,8 +136,8 @@ suite "EventBroker": test "supports BrokerContext-scoped listeners": SampleEvent.dropAllListeners() - let ctxA = NewBrokerContext() - let ctxB = NewBrokerContext() + let ctxA = newBrokerContext() + let ctxB = newBrokerContext() var seenA: seq[int] = @[] var seenB: seq[int] = @[] diff --git a/tests/common/test_multi_request_broker.nim b/tests/common/test_multi_request_broker.nim index 39ed90eea..76e7db253 100644 --- a/tests/common/test_multi_request_broker.nim +++ b/tests/common/test_multi_request_broker.nim @@ -280,8 +280,8 @@ suite "MultiRequestBroker": test "context-aware providers are isolated": NoArgResponse.clearProviders() - let ctxA = NewBrokerContext() - let ctxB = NewBrokerContext() + let ctxA = newBrokerContext() + let ctxB = newBrokerContext() discard NoArgResponse.setProvider( ctxA,