From bc4ff10ea921c7aae9f116d2b06508173e5452f3 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 17 Dec 2025 01:44:23 +0100 Subject: [PATCH] Context aware extension for EventBroker, EventBoker support for native or external types --- tests/common/test_event_broker.nim | 76 ++++++ waku/common/broker/event_broker.nim | 398 +++++++++++++++++++++------- 2 files changed, 371 insertions(+), 103 deletions(-) diff --git a/tests/common/test_event_broker.nim b/tests/common/test_event_broker.nim index cead1277f..bcd081f4f 100644 --- a/tests/common/test_event_broker.nim +++ b/tests/common/test_event_broker.nim @@ -4,6 +4,15 @@ import testutils/unittests import waku/common/broker/event_broker +type ExternalDefinedEventType = object + label*: string + +EventBroker: + type IntEvent = int + +EventBroker: + type ExternalAliasEvent = distinct ExternalDefinedEventType + EventBroker: type SampleEvent = object value*: int @@ -123,3 +132,70 @@ suite "EventBroker": check counter == 21 # 1+2+3 + 4+5+6 RefEvent.dropAllListeners() + + test "supports BrokerContext-scoped listeners": + SampleEvent.dropAllListeners() + + let ctxA = NewBrokerContext() + let ctxB = NewBrokerContext() + + var seenA: seq[int] = @[] + var seenB: seq[int] = @[] + + discard SampleEvent.listen( + ctxA, + proc(evt: SampleEvent): Future[void] {.async: (raises: []).} = + seenA.add(evt.value), + ) + + discard SampleEvent.listen( + ctxB, + proc(evt: SampleEvent): Future[void] {.async: (raises: []).} = + seenB.add(evt.value), + ) + + SampleEvent.emit(ctxA, SampleEvent(value: 1, label: "a")) + SampleEvent.emit(ctxB, SampleEvent(value: 2, label: "b")) + waitForListeners() + + check seenA == @[1] + check seenB == @[2] + + SampleEvent.dropAllListeners(ctxA) + SampleEvent.emit(ctxA, SampleEvent(value: 3, label: "a2")) + SampleEvent.emit(ctxB, SampleEvent(value: 4, label: "b2")) + waitForListeners() + + check seenA == @[1] + check seenB == @[2, 4] + + SampleEvent.dropAllListeners(ctxB) + + test "supports non-object event types (auto-distinct)": + var seen: seq[int] = @[] + + discard IntEvent.listen( + proc(evt: IntEvent): Future[void] {.async: (raises: []).} = + seen.add(int(evt)) + ) + + IntEvent.emit(IntEvent(42)) + waitForListeners() + + check seen == @[42] + IntEvent.dropAllListeners() + + test "supports externally-defined type aliases (auto-distinct)": + var seen: seq[string] = @[] + + discard ExternalAliasEvent.listen( + proc(evt: ExternalAliasEvent): Future[void] {.async: (raises: []).} = + let base = ExternalDefinedEventType(evt) + seen.add(base.label) + ) + + ExternalAliasEvent.emit(ExternalAliasEvent(ExternalDefinedEventType(label: "x"))) + waitForListeners() + + check seen == @["x"] + ExternalAliasEvent.dropAllListeners() diff --git a/waku/common/broker/event_broker.nim b/waku/common/broker/event_broker.nim index 05d7b50ab..f694c2c98 100644 --- a/waku/common/broker/event_broker.nim +++ b/waku/common/broker/event_broker.nim @@ -5,10 +5,35 @@ ## need for direct dependencies in between emitters and listeners. ## Worth considering using it in a single or many emitters to many listeners scenario. ## -## Generates a standalone, type-safe event broker for the declared object type. +## Generates a standalone, type-safe event broker for the declared type. ## The macro exports the value type itself plus a broker companion that manages ## listeners via thread-local storage. ## +## Type definitions: +## - Inline `object` / `ref object` definitions are supported. +## - Native types, aliases, and externally-defined types are also supported. +## In that case, EventBroker will automatically wrap the declared RHS type in +## `distinct` unless you already used `distinct`. +## This keeps event types unique even when multiple brokers share the same +## underlying base type. +## +## Default vs. context aware use: +## Every generated broker is a thread-local global instance. This means EventBroker +## enables decoupled event exchange threadwise. +## +## Sometimes we use brokers inside a context (e.g. within a component that has many +## modules or subsystems). If you instantiate multiple such components in a single +## thread, and each component must have its own listener set for the same EventBroker +## type, you can use context-aware EventBroker. +## +## Context awareness is supported through the `BrokerContext` argument for +## `listen`, `emit`, `dropListener`, and `dropAllListeners`. +## Listener stores are kept separate per broker context. +## +## Default broker context is defined as `DefaultBrokerContext`. If you don't need +## context awareness, you can keep using the interfaces without the context +## argument, which operate on `DefaultBrokerContext`. +## ## Usage: ## Declare your desired event type inside an `EventBroker` macro, add any number of fields.: ## ```nim @@ -47,11 +72,30 @@ ## GreetingEvent.dropListener(handle) ## ``` +## Example (non-object event type): +## ```nim +## EventBroker: +## type CounterEvent = int # exported as: `distinct int` +## +## discard CounterEvent.listen( +## proc(evt: CounterEvent): Future[void] {.async.} = +## echo int(evt) +## ) +## CounterEvent.emit(CounterEvent(42)) +## ``` + import std/[macros, tables] import chronos, chronicles, results -import ./helper/broker_utils +import ./helper/broker_utils, broker_context -export chronicles, results, chronos +export chronicles, results, chronos, broker_context + +proc ensureDistinctType(rhs: NimNode): NimNode = + ## For PODs / aliases / externally-defined types, wrap in `distinct` unless + ## it's already distinct. + if rhs.kind == nnkDistinctTy: + return rhs + newTree(nnkDistinctTy, copyNimTree(rhs)) macro EventBroker*(body: untyped): untyped = when defined(eventBrokerDebug): @@ -60,74 +104,102 @@ macro EventBroker*(body: untyped): untyped = var objectDef: NimNode = nil var fieldNames: seq[NimNode] = @[] var fieldTypes: seq[NimNode] = @[] - var isRefObject = false + var hasInlineFields = false for stmt in body: if stmt.kind == nnkTypeSection: for def in stmt: if def.kind != nnkTypeDef: continue let rhs = def[2] - var objectType: NimNode + if not typeIdent.isNil(): + error("Only one type may be declared inside EventBroker", def) + typeIdent = baseTypeIdent(def[0]) + + # Inline object/ref object definitions. case rhs.kind of nnkObjectTy: - objectType = rhs + let recList = rhs[2] + if recList.kind != nnkRecList: + error("EventBroker object must declare a standard field list", rhs) + var exportedRecList = newTree(nnkRecList) + for field in recList: + case field.kind + of nnkIdentDefs: + ensureFieldDef(field) + let fieldTypeNode = field[field.len - 2] + for i in 0 ..< field.len - 2: + let baseFieldIdent = baseTypeIdent(field[i]) + fieldNames.add(copyNimTree(baseFieldIdent)) + fieldTypes.add(copyNimTree(fieldTypeNode)) + var cloned = copyNimTree(field) + for i in 0 ..< cloned.len - 2: + cloned[i] = exportIdentNode(cloned[i]) + exportedRecList.add(cloned) + of nnkEmpty: + discard + else: + error( + "EventBroker object definition only supports simple field declarations", + field, + ) + let exportedObjectType = newTree( + nnkObjectTy, copyNimTree(rhs[0]), copyNimTree(rhs[1]), exportedRecList + ) + objectDef = exportedObjectType + hasInlineFields = true of nnkRefTy: - isRefObject = true if rhs.len != 1 or rhs[0].kind != nnkObjectTy: error("EventBroker ref object must wrap a concrete object definition", rhs) - objectType = rhs[0] - else: - continue - if not typeIdent.isNil(): - error("Only one object type may be declared inside EventBroker", def) - typeIdent = baseTypeIdent(def[0]) - let recList = objectType[2] - if recList.kind != nnkRecList: - error("EventBroker object must declare a standard field list", objectType) - var exportedRecList = newTree(nnkRecList) - for field in recList: - case field.kind - of nnkIdentDefs: - ensureFieldDef(field) - let fieldTypeNode = field[field.len - 2] - for i in 0 ..< field.len - 2: - let baseFieldIdent = baseTypeIdent(field[i]) - fieldNames.add(copyNimTree(baseFieldIdent)) - fieldTypes.add(copyNimTree(fieldTypeNode)) - var cloned = copyNimTree(field) - for i in 0 ..< cloned.len - 2: - cloned[i] = exportIdentNode(cloned[i]) - exportedRecList.add(cloned) - of nnkEmpty: - discard - else: - error( - "EventBroker object definition only supports simple field declarations", - field, - ) - let exportedObjectType = newTree( - nnkObjectTy, - copyNimTree(objectType[0]), - copyNimTree(objectType[1]), - exportedRecList, - ) - if isRefObject: + let obj = rhs[0] + let recList = obj[2] + if recList.kind != nnkRecList: + error("EventBroker object must declare a standard field list", obj) + var exportedRecList = newTree(nnkRecList) + for field in recList: + case field.kind + of nnkIdentDefs: + ensureFieldDef(field) + let fieldTypeNode = field[field.len - 2] + for i in 0 ..< field.len - 2: + let baseFieldIdent = baseTypeIdent(field[i]) + fieldNames.add(copyNimTree(baseFieldIdent)) + fieldTypes.add(copyNimTree(fieldTypeNode)) + var cloned = copyNimTree(field) + for i in 0 ..< cloned.len - 2: + cloned[i] = exportIdentNode(cloned[i]) + exportedRecList.add(cloned) + of nnkEmpty: + discard + else: + error( + "EventBroker object definition only supports simple field declarations", + field, + ) + let exportedObjectType = newTree( + nnkObjectTy, copyNimTree(obj[0]), copyNimTree(obj[1]), exportedRecList + ) objectDef = newTree(nnkRefTy, exportedObjectType) + hasInlineFields = true else: - objectDef = exportedObjectType + # Native type / alias / externally-defined type. + # Ensure we create a unique event type by wrapping in `distinct` unless + # the user already did. + objectDef = ensureDistinctType(rhs) if typeIdent.isNil(): - error("EventBroker body must declare exactly one object type", body) + error("EventBroker body must declare exactly one type", body) let exportedTypeIdent = postfix(copyNimTree(typeIdent), "*") let sanitized = sanitizeIdentName(typeIdent) let typeNameLit = newLit($typeIdent) - let isRefObjectLit = newLit(isRefObject) let handlerProcIdent = ident(sanitized & "ListenerProc") let listenerHandleIdent = ident(sanitized & "Listener") let brokerTypeIdent = ident(sanitized & "Broker") let exportedHandlerProcIdent = postfix(copyNimTree(handlerProcIdent), "*") let exportedListenerHandleIdent = postfix(copyNimTree(listenerHandleIdent), "*") let exportedBrokerTypeIdent = postfix(copyNimTree(brokerTypeIdent), "*") + let bucketTypeIdent = ident(sanitized & "CtxBucket") + let findBucketIdxIdent = ident(sanitized & "FindBucketIdx") + let getOrCreateBucketIdxIdent = ident(sanitized & "GetOrCreateBucketIdx") let accessProcIdent = ident("access" & sanitized & "Broker") let globalVarIdent = ident("g" & sanitized & "Broker") let listenImplIdent = ident("register" & sanitized & "Listener") @@ -147,10 +219,14 @@ macro EventBroker*(body: untyped): untyped = `exportedHandlerProcIdent` = proc(event: `typeIdent`): Future[void] {.async: (raises: []), gcsafe.} - `exportedBrokerTypeIdent` = ref object + `bucketTypeIdent` = object + brokerCtx: BrokerContext listeners: Table[uint64, `handlerProcIdent`] nextId: uint64 + `exportedBrokerTypeIdent` = ref object + buckets: seq[`bucketTypeIdent`] + ) result.add( @@ -163,49 +239,102 @@ macro EventBroker*(body: untyped): untyped = proc `accessProcIdent`(): `brokerTypeIdent` = if `globalVarIdent`.isNil(): new(`globalVarIdent`) - `globalVarIdent`.listeners = initTable[uint64, `handlerProcIdent`]() + `globalVarIdent`.buckets = + @[ + `bucketTypeIdent`( + brokerCtx: DefaultBrokerContext, + listeners: initTable[uint64, `handlerProcIdent`](), + nextId: 1'u64, + ) + ] `globalVarIdent` ) result.add( quote do: + proc `findBucketIdxIdent`( + broker: `brokerTypeIdent`, brokerCtx: BrokerContext + ): int = + if brokerCtx == DefaultBrokerContext: + return 0 + for i in 1 ..< broker.buckets.len: + if broker.buckets[i].brokerCtx == brokerCtx: + return i + return -1 + + proc `getOrCreateBucketIdxIdent`( + broker: `brokerTypeIdent`, brokerCtx: BrokerContext + ): int = + let idx = `findBucketIdxIdent`(broker, brokerCtx) + if idx >= 0: + return idx + broker.buckets.add( + `bucketTypeIdent`( + brokerCtx: brokerCtx, + listeners: initTable[uint64, `handlerProcIdent`](), + nextId: 1'u64, + ) + ) + return broker.buckets.high + proc `listenImplIdent`( - handler: `handlerProcIdent` + brokerCtx: BrokerContext, handler: `handlerProcIdent` ): Result[`listenerHandleIdent`, string] = if handler.isNil(): return err("Must provide a non-nil event handler") var broker = `accessProcIdent`() - if broker.nextId == 0'u64: - broker.nextId = 1'u64 - if broker.nextId == high(uint64): - error "Cannot add more listeners: ID space exhausted", nextId = $broker.nextId + + let bucketIdx = `getOrCreateBucketIdxIdent`(broker, brokerCtx) + if broker.buckets[bucketIdx].nextId == 0'u64: + broker.buckets[bucketIdx].nextId = 1'u64 + + if broker.buckets[bucketIdx].nextId == high(uint64): + error "Cannot add more listeners: ID space exhausted", + nextId = $broker.buckets[bucketIdx].nextId return err("Cannot add more listeners, listener ID space exhausted") - let newId = broker.nextId - inc broker.nextId - broker.listeners[newId] = handler + + let newId = broker.buckets[bucketIdx].nextId + inc broker.buckets[bucketIdx].nextId + broker.buckets[bucketIdx].listeners[newId] = handler return ok(`listenerHandleIdent`(id: newId)) ) result.add( quote do: - proc `dropListenerImplIdent`(handle: `listenerHandleIdent`) = + proc `dropListenerImplIdent`( + brokerCtx: BrokerContext, handle: `listenerHandleIdent` + ) = if handle.id == 0'u64: return var broker = `accessProcIdent`() - if broker.listeners.len == 0: + + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: return - broker.listeners.del(handle.id) + + if broker.buckets[bucketIdx].listeners.len == 0: + return + broker.buckets[bucketIdx].listeners.del(handle.id) + if brokerCtx != DefaultBrokerContext and + broker.buckets[bucketIdx].listeners.len == 0: + broker.buckets.delete(bucketIdx) ) result.add( quote do: - proc `dropAllListenersImplIdent`() = + proc `dropAllListenersImplIdent`(brokerCtx: BrokerContext) = var broker = `accessProcIdent`() - if broker.listeners.len > 0: - broker.listeners.clear() + + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: + return + if broker.buckets[bucketIdx].listeners.len > 0: + broker.buckets[bucketIdx].listeners.clear() + if brokerCtx != DefaultBrokerContext: + broker.buckets.delete(bucketIdx) ) @@ -214,17 +343,34 @@ macro EventBroker*(body: untyped): untyped = proc listen*( _: typedesc[`typeIdent`], handler: `handlerProcIdent` ): Result[`listenerHandleIdent`, string] = - return `listenImplIdent`(handler) + return `listenImplIdent`(DefaultBrokerContext, handler) + + proc listen*( + _: typedesc[`typeIdent`], + brokerCtx: BrokerContext, + handler: `handlerProcIdent`, + ): Result[`listenerHandleIdent`, string] = + return `listenImplIdent`(brokerCtx, handler) ) result.add( quote do: proc dropListener*(_: typedesc[`typeIdent`], handle: `listenerHandleIdent`) = - `dropListenerImplIdent`(handle) + `dropListenerImplIdent`(DefaultBrokerContext, handle) + + proc dropListener*( + _: typedesc[`typeIdent`], + brokerCtx: BrokerContext, + handle: `listenerHandleIdent`, + ) = + `dropListenerImplIdent`(brokerCtx, handle) proc dropAllListeners*(_: typedesc[`typeIdent`]) = - `dropAllListenersImplIdent`() + `dropAllListenersImplIdent`(DefaultBrokerContext) + + proc dropAllListeners*(_: typedesc[`typeIdent`], brokerCtx: BrokerContext) = + `dropAllListenersImplIdent`(brokerCtx) ) @@ -241,68 +387,114 @@ macro EventBroker*(body: untyped): untyped = error "Failed to execute event listener", error = getCurrentExceptionMsg() proc `emitImplIdent`( - event: `typeIdent` + brokerCtx: BrokerContext, event: `typeIdent` ): Future[void] {.async: (raises: []), gcsafe.} = - when `isRefObjectLit`: + when compiles(event.isNil()): if event.isNil(): error "Cannot emit uninitialized event object", eventType = `typeNameLit` return let broker = `accessProcIdent`() - if broker.listeners.len == 0: + let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx) + if bucketIdx < 0: # nothing to do as nobody is listening return + if broker.buckets[bucketIdx].listeners.len == 0: + return var callbacks: seq[`handlerProcIdent`] = @[] - for cb in broker.listeners.values: + for cb in broker.buckets[bucketIdx].listeners.values: callbacks.add(cb) for cb in callbacks: asyncSpawn `listenerTaskIdent`(cb, event) proc emit*(event: `typeIdent`) = - asyncSpawn `emitImplIdent`(event) + asyncSpawn `emitImplIdent`(DefaultBrokerContext, event) proc emit*(_: typedesc[`typeIdent`], event: `typeIdent`) = - asyncSpawn `emitImplIdent`(event) + asyncSpawn `emitImplIdent`(DefaultBrokerContext, event) + + proc emit*( + _: typedesc[`typeIdent`], brokerCtx: BrokerContext, event: `typeIdent` + ) = + asyncSpawn `emitImplIdent`(brokerCtx, event) ) - var emitCtorParams = newTree(nnkFormalParams, newEmptyNode()) - let typedescParamType = - newTree(nnkBracketExpr, ident("typedesc"), copyNimTree(typeIdent)) - emitCtorParams.add( - newTree(nnkIdentDefs, ident("_"), typedescParamType, newEmptyNode()) - ) - for i in 0 ..< fieldNames.len: + if hasInlineFields: + # Typedesc emit constructor overloads for inline object/ref object types. + var emitCtorParams = newTree(nnkFormalParams, newEmptyNode()) + let typedescParamType = + newTree(nnkBracketExpr, ident("typedesc"), copyNimTree(typeIdent)) emitCtorParams.add( - newTree( - nnkIdentDefs, - copyNimTree(fieldNames[i]), - copyNimTree(fieldTypes[i]), - newEmptyNode(), + newTree(nnkIdentDefs, ident("_"), typedescParamType, newEmptyNode()) + ) + for i in 0 ..< fieldNames.len: + emitCtorParams.add( + newTree( + nnkIdentDefs, + copyNimTree(fieldNames[i]), + copyNimTree(fieldTypes[i]), + newEmptyNode(), + ) ) + + var emitCtorExpr = newTree(nnkObjConstr, copyNimTree(typeIdent)) + for i in 0 ..< fieldNames.len: + emitCtorExpr.add( + newTree( + nnkExprColonExpr, copyNimTree(fieldNames[i]), copyNimTree(fieldNames[i]) + ) + ) + + let emitCtorCallDefault = + newCall(copyNimTree(emitImplIdent), ident("DefaultBrokerContext"), emitCtorExpr) + let emitCtorBodyDefault = quote: + asyncSpawn `emitCtorCallDefault` + + let typedescEmitProcDefault = newTree( + nnkProcDef, + postfix(ident("emit"), "*"), + newEmptyNode(), + newEmptyNode(), + emitCtorParams, + newEmptyNode(), + newEmptyNode(), + emitCtorBodyDefault, ) + result.add(typedescEmitProcDefault) - var emitCtorExpr = newTree(nnkObjConstr, copyNimTree(typeIdent)) - for i in 0 ..< fieldNames.len: - emitCtorExpr.add( - newTree(nnkExprColonExpr, copyNimTree(fieldNames[i]), copyNimTree(fieldNames[i])) + var emitCtorParamsCtx = newTree(nnkFormalParams, newEmptyNode()) + emitCtorParamsCtx.add( + newTree(nnkIdentDefs, ident("_"), typedescParamType, newEmptyNode()) ) + emitCtorParamsCtx.add( + newTree(nnkIdentDefs, ident("brokerCtx"), ident("BrokerContext"), newEmptyNode()) + ) + for i in 0 ..< fieldNames.len: + emitCtorParamsCtx.add( + newTree( + nnkIdentDefs, + copyNimTree(fieldNames[i]), + copyNimTree(fieldTypes[i]), + newEmptyNode(), + ) + ) - let emitCtorCall = newCall(copyNimTree(emitImplIdent), emitCtorExpr) - let emitCtorBody = quote: - asyncSpawn `emitCtorCall` + let emitCtorCallCtx = + newCall(copyNimTree(emitImplIdent), ident("brokerCtx"), copyNimTree(emitCtorExpr)) + let emitCtorBodyCtx = quote: + asyncSpawn `emitCtorCallCtx` - let typedescEmitProc = newTree( - nnkProcDef, - postfix(ident("emit"), "*"), - newEmptyNode(), - newEmptyNode(), - emitCtorParams, - newEmptyNode(), - newEmptyNode(), - emitCtorBody, - ) - - result.add(typedescEmitProc) + let typedescEmitProcCtx = newTree( + nnkProcDef, + postfix(ident("emit"), "*"), + newEmptyNode(), + newEmptyNode(), + emitCtorParamsCtx, + newEmptyNode(), + newEmptyNode(), + emitCtorBodyCtx, + ) + result.add(typedescEmitProcCtx) when defined(eventBrokerDebug): echo result.repr