Context aware extension for EventBroker, EventBoker support for native or external types

This commit is contained in:
NagyZoltanPeter 2025-12-17 01:44:23 +01:00
parent 07a35f8bd0
commit bc4ff10ea9
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
2 changed files with 371 additions and 103 deletions

View File

@ -4,6 +4,15 @@ import testutils/unittests
import waku/common/broker/event_broker
type ExternalDefinedEventType = object
label*: string
EventBroker:
type IntEvent = int
EventBroker:
type ExternalAliasEvent = distinct ExternalDefinedEventType
EventBroker:
type SampleEvent = object
value*: int
@ -123,3 +132,70 @@ suite "EventBroker":
check counter == 21 # 1+2+3 + 4+5+6
RefEvent.dropAllListeners()
test "supports BrokerContext-scoped listeners":
SampleEvent.dropAllListeners()
let ctxA = NewBrokerContext()
let ctxB = NewBrokerContext()
var seenA: seq[int] = @[]
var seenB: seq[int] = @[]
discard SampleEvent.listen(
ctxA,
proc(evt: SampleEvent): Future[void] {.async: (raises: []).} =
seenA.add(evt.value),
)
discard SampleEvent.listen(
ctxB,
proc(evt: SampleEvent): Future[void] {.async: (raises: []).} =
seenB.add(evt.value),
)
SampleEvent.emit(ctxA, SampleEvent(value: 1, label: "a"))
SampleEvent.emit(ctxB, SampleEvent(value: 2, label: "b"))
waitForListeners()
check seenA == @[1]
check seenB == @[2]
SampleEvent.dropAllListeners(ctxA)
SampleEvent.emit(ctxA, SampleEvent(value: 3, label: "a2"))
SampleEvent.emit(ctxB, SampleEvent(value: 4, label: "b2"))
waitForListeners()
check seenA == @[1]
check seenB == @[2, 4]
SampleEvent.dropAllListeners(ctxB)
test "supports non-object event types (auto-distinct)":
var seen: seq[int] = @[]
discard IntEvent.listen(
proc(evt: IntEvent): Future[void] {.async: (raises: []).} =
seen.add(int(evt))
)
IntEvent.emit(IntEvent(42))
waitForListeners()
check seen == @[42]
IntEvent.dropAllListeners()
test "supports externally-defined type aliases (auto-distinct)":
var seen: seq[string] = @[]
discard ExternalAliasEvent.listen(
proc(evt: ExternalAliasEvent): Future[void] {.async: (raises: []).} =
let base = ExternalDefinedEventType(evt)
seen.add(base.label)
)
ExternalAliasEvent.emit(ExternalAliasEvent(ExternalDefinedEventType(label: "x")))
waitForListeners()
check seen == @["x"]
ExternalAliasEvent.dropAllListeners()

View File

@ -5,10 +5,35 @@
## need for direct dependencies in between emitters and listeners.
## Worth considering using it in a single or many emitters to many listeners scenario.
##
## Generates a standalone, type-safe event broker for the declared object type.
## Generates a standalone, type-safe event broker for the declared type.
## The macro exports the value type itself plus a broker companion that manages
## listeners via thread-local storage.
##
## Type definitions:
## - Inline `object` / `ref object` definitions are supported.
## - Native types, aliases, and externally-defined types are also supported.
## In that case, EventBroker will automatically wrap the declared RHS type in
## `distinct` unless you already used `distinct`.
## This keeps event types unique even when multiple brokers share the same
## underlying base type.
##
## Default vs. context aware use:
## Every generated broker is a thread-local global instance. This means EventBroker
## enables decoupled event exchange threadwise.
##
## Sometimes we use brokers inside a context (e.g. within a component that has many
## modules or subsystems). If you instantiate multiple such components in a single
## thread, and each component must have its own listener set for the same EventBroker
## type, you can use context-aware EventBroker.
##
## Context awareness is supported through the `BrokerContext` argument for
## `listen`, `emit`, `dropListener`, and `dropAllListeners`.
## Listener stores are kept separate per broker context.
##
## Default broker context is defined as `DefaultBrokerContext`. If you don't need
## context awareness, you can keep using the interfaces without the context
## argument, which operate on `DefaultBrokerContext`.
##
## Usage:
## Declare your desired event type inside an `EventBroker` macro, add any number of fields.:
## ```nim
@ -47,11 +72,30 @@
## GreetingEvent.dropListener(handle)
## ```
## Example (non-object event type):
## ```nim
## EventBroker:
## type CounterEvent = int # exported as: `distinct int`
##
## discard CounterEvent.listen(
## proc(evt: CounterEvent): Future[void] {.async.} =
## echo int(evt)
## )
## CounterEvent.emit(CounterEvent(42))
## ```
import std/[macros, tables]
import chronos, chronicles, results
import ./helper/broker_utils
import ./helper/broker_utils, broker_context
export chronicles, results, chronos
export chronicles, results, chronos, broker_context
proc ensureDistinctType(rhs: NimNode): NimNode =
## For PODs / aliases / externally-defined types, wrap in `distinct` unless
## it's already distinct.
if rhs.kind == nnkDistinctTy:
return rhs
newTree(nnkDistinctTy, copyNimTree(rhs))
macro EventBroker*(body: untyped): untyped =
when defined(eventBrokerDebug):
@ -60,30 +104,23 @@ macro EventBroker*(body: untyped): untyped =
var objectDef: NimNode = nil
var fieldNames: seq[NimNode] = @[]
var fieldTypes: seq[NimNode] = @[]
var isRefObject = false
var hasInlineFields = false
for stmt in body:
if stmt.kind == nnkTypeSection:
for def in stmt:
if def.kind != nnkTypeDef:
continue
let rhs = def[2]
var objectType: NimNode
if not typeIdent.isNil():
error("Only one type may be declared inside EventBroker", def)
typeIdent = baseTypeIdent(def[0])
# Inline object/ref object definitions.
case rhs.kind
of nnkObjectTy:
objectType = rhs
of nnkRefTy:
isRefObject = true
if rhs.len != 1 or rhs[0].kind != nnkObjectTy:
error("EventBroker ref object must wrap a concrete object definition", rhs)
objectType = rhs[0]
else:
continue
if not typeIdent.isNil():
error("Only one object type may be declared inside EventBroker", def)
typeIdent = baseTypeIdent(def[0])
let recList = objectType[2]
let recList = rhs[2]
if recList.kind != nnkRecList:
error("EventBroker object must declare a standard field list", objectType)
error("EventBroker object must declare a standard field list", rhs)
var exportedRecList = newTree(nnkRecList)
for field in recList:
case field.kind
@ -106,28 +143,63 @@ macro EventBroker*(body: untyped): untyped =
field,
)
let exportedObjectType = newTree(
nnkObjectTy,
copyNimTree(objectType[0]),
copyNimTree(objectType[1]),
exportedRecList,
nnkObjectTy, copyNimTree(rhs[0]), copyNimTree(rhs[1]), exportedRecList
)
if isRefObject:
objectDef = newTree(nnkRefTy, exportedObjectType)
else:
objectDef = exportedObjectType
hasInlineFields = true
of nnkRefTy:
if rhs.len != 1 or rhs[0].kind != nnkObjectTy:
error("EventBroker ref object must wrap a concrete object definition", rhs)
let obj = rhs[0]
let recList = obj[2]
if recList.kind != nnkRecList:
error("EventBroker object must declare a standard field list", obj)
var exportedRecList = newTree(nnkRecList)
for field in recList:
case field.kind
of nnkIdentDefs:
ensureFieldDef(field)
let fieldTypeNode = field[field.len - 2]
for i in 0 ..< field.len - 2:
let baseFieldIdent = baseTypeIdent(field[i])
fieldNames.add(copyNimTree(baseFieldIdent))
fieldTypes.add(copyNimTree(fieldTypeNode))
var cloned = copyNimTree(field)
for i in 0 ..< cloned.len - 2:
cloned[i] = exportIdentNode(cloned[i])
exportedRecList.add(cloned)
of nnkEmpty:
discard
else:
error(
"EventBroker object definition only supports simple field declarations",
field,
)
let exportedObjectType = newTree(
nnkObjectTy, copyNimTree(obj[0]), copyNimTree(obj[1]), exportedRecList
)
objectDef = newTree(nnkRefTy, exportedObjectType)
hasInlineFields = true
else:
# Native type / alias / externally-defined type.
# Ensure we create a unique event type by wrapping in `distinct` unless
# the user already did.
objectDef = ensureDistinctType(rhs)
if typeIdent.isNil():
error("EventBroker body must declare exactly one object type", body)
error("EventBroker body must declare exactly one type", body)
let exportedTypeIdent = postfix(copyNimTree(typeIdent), "*")
let sanitized = sanitizeIdentName(typeIdent)
let typeNameLit = newLit($typeIdent)
let isRefObjectLit = newLit(isRefObject)
let handlerProcIdent = ident(sanitized & "ListenerProc")
let listenerHandleIdent = ident(sanitized & "Listener")
let brokerTypeIdent = ident(sanitized & "Broker")
let exportedHandlerProcIdent = postfix(copyNimTree(handlerProcIdent), "*")
let exportedListenerHandleIdent = postfix(copyNimTree(listenerHandleIdent), "*")
let exportedBrokerTypeIdent = postfix(copyNimTree(brokerTypeIdent), "*")
let bucketTypeIdent = ident(sanitized & "CtxBucket")
let findBucketIdxIdent = ident(sanitized & "FindBucketIdx")
let getOrCreateBucketIdxIdent = ident(sanitized & "GetOrCreateBucketIdx")
let accessProcIdent = ident("access" & sanitized & "Broker")
let globalVarIdent = ident("g" & sanitized & "Broker")
let listenImplIdent = ident("register" & sanitized & "Listener")
@ -147,10 +219,14 @@ macro EventBroker*(body: untyped): untyped =
`exportedHandlerProcIdent` =
proc(event: `typeIdent`): Future[void] {.async: (raises: []), gcsafe.}
`exportedBrokerTypeIdent` = ref object
`bucketTypeIdent` = object
brokerCtx: BrokerContext
listeners: Table[uint64, `handlerProcIdent`]
nextId: uint64
`exportedBrokerTypeIdent` = ref object
buckets: seq[`bucketTypeIdent`]
)
result.add(
@ -163,49 +239,102 @@ macro EventBroker*(body: untyped): untyped =
proc `accessProcIdent`(): `brokerTypeIdent` =
if `globalVarIdent`.isNil():
new(`globalVarIdent`)
`globalVarIdent`.listeners = initTable[uint64, `handlerProcIdent`]()
`globalVarIdent`.buckets =
@[
`bucketTypeIdent`(
brokerCtx: DefaultBrokerContext,
listeners: initTable[uint64, `handlerProcIdent`](),
nextId: 1'u64,
)
]
`globalVarIdent`
)
result.add(
quote do:
proc `findBucketIdxIdent`(
broker: `brokerTypeIdent`, brokerCtx: BrokerContext
): int =
if brokerCtx == DefaultBrokerContext:
return 0
for i in 1 ..< broker.buckets.len:
if broker.buckets[i].brokerCtx == brokerCtx:
return i
return -1
proc `getOrCreateBucketIdxIdent`(
broker: `brokerTypeIdent`, brokerCtx: BrokerContext
): int =
let idx = `findBucketIdxIdent`(broker, brokerCtx)
if idx >= 0:
return idx
broker.buckets.add(
`bucketTypeIdent`(
brokerCtx: brokerCtx,
listeners: initTable[uint64, `handlerProcIdent`](),
nextId: 1'u64,
)
)
return broker.buckets.high
proc `listenImplIdent`(
handler: `handlerProcIdent`
brokerCtx: BrokerContext, handler: `handlerProcIdent`
): Result[`listenerHandleIdent`, string] =
if handler.isNil():
return err("Must provide a non-nil event handler")
var broker = `accessProcIdent`()
if broker.nextId == 0'u64:
broker.nextId = 1'u64
if broker.nextId == high(uint64):
error "Cannot add more listeners: ID space exhausted", nextId = $broker.nextId
let bucketIdx = `getOrCreateBucketIdxIdent`(broker, brokerCtx)
if broker.buckets[bucketIdx].nextId == 0'u64:
broker.buckets[bucketIdx].nextId = 1'u64
if broker.buckets[bucketIdx].nextId == high(uint64):
error "Cannot add more listeners: ID space exhausted",
nextId = $broker.buckets[bucketIdx].nextId
return err("Cannot add more listeners, listener ID space exhausted")
let newId = broker.nextId
inc broker.nextId
broker.listeners[newId] = handler
let newId = broker.buckets[bucketIdx].nextId
inc broker.buckets[bucketIdx].nextId
broker.buckets[bucketIdx].listeners[newId] = handler
return ok(`listenerHandleIdent`(id: newId))
)
result.add(
quote do:
proc `dropListenerImplIdent`(handle: `listenerHandleIdent`) =
proc `dropListenerImplIdent`(
brokerCtx: BrokerContext, handle: `listenerHandleIdent`
) =
if handle.id == 0'u64:
return
var broker = `accessProcIdent`()
if broker.listeners.len == 0:
let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx)
if bucketIdx < 0:
return
broker.listeners.del(handle.id)
if broker.buckets[bucketIdx].listeners.len == 0:
return
broker.buckets[bucketIdx].listeners.del(handle.id)
if brokerCtx != DefaultBrokerContext and
broker.buckets[bucketIdx].listeners.len == 0:
broker.buckets.delete(bucketIdx)
)
result.add(
quote do:
proc `dropAllListenersImplIdent`() =
proc `dropAllListenersImplIdent`(brokerCtx: BrokerContext) =
var broker = `accessProcIdent`()
if broker.listeners.len > 0:
broker.listeners.clear()
let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx)
if bucketIdx < 0:
return
if broker.buckets[bucketIdx].listeners.len > 0:
broker.buckets[bucketIdx].listeners.clear()
if brokerCtx != DefaultBrokerContext:
broker.buckets.delete(bucketIdx)
)
@ -214,17 +343,34 @@ macro EventBroker*(body: untyped): untyped =
proc listen*(
_: typedesc[`typeIdent`], handler: `handlerProcIdent`
): Result[`listenerHandleIdent`, string] =
return `listenImplIdent`(handler)
return `listenImplIdent`(DefaultBrokerContext, handler)
proc listen*(
_: typedesc[`typeIdent`],
brokerCtx: BrokerContext,
handler: `handlerProcIdent`,
): Result[`listenerHandleIdent`, string] =
return `listenImplIdent`(brokerCtx, handler)
)
result.add(
quote do:
proc dropListener*(_: typedesc[`typeIdent`], handle: `listenerHandleIdent`) =
`dropListenerImplIdent`(handle)
`dropListenerImplIdent`(DefaultBrokerContext, handle)
proc dropListener*(
_: typedesc[`typeIdent`],
brokerCtx: BrokerContext,
handle: `listenerHandleIdent`,
) =
`dropListenerImplIdent`(brokerCtx, handle)
proc dropAllListeners*(_: typedesc[`typeIdent`]) =
`dropAllListenersImplIdent`()
`dropAllListenersImplIdent`(DefaultBrokerContext)
proc dropAllListeners*(_: typedesc[`typeIdent`], brokerCtx: BrokerContext) =
`dropAllListenersImplIdent`(brokerCtx)
)
@ -241,30 +387,40 @@ macro EventBroker*(body: untyped): untyped =
error "Failed to execute event listener", error = getCurrentExceptionMsg()
proc `emitImplIdent`(
event: `typeIdent`
brokerCtx: BrokerContext, event: `typeIdent`
): Future[void] {.async: (raises: []), gcsafe.} =
when `isRefObjectLit`:
when compiles(event.isNil()):
if event.isNil():
error "Cannot emit uninitialized event object", eventType = `typeNameLit`
return
let broker = `accessProcIdent`()
if broker.listeners.len == 0:
let bucketIdx = `findBucketIdxIdent`(broker, brokerCtx)
if bucketIdx < 0:
# nothing to do as nobody is listening
return
if broker.buckets[bucketIdx].listeners.len == 0:
return
var callbacks: seq[`handlerProcIdent`] = @[]
for cb in broker.listeners.values:
for cb in broker.buckets[bucketIdx].listeners.values:
callbacks.add(cb)
for cb in callbacks:
asyncSpawn `listenerTaskIdent`(cb, event)
proc emit*(event: `typeIdent`) =
asyncSpawn `emitImplIdent`(event)
asyncSpawn `emitImplIdent`(DefaultBrokerContext, event)
proc emit*(_: typedesc[`typeIdent`], event: `typeIdent`) =
asyncSpawn `emitImplIdent`(event)
asyncSpawn `emitImplIdent`(DefaultBrokerContext, event)
proc emit*(
_: typedesc[`typeIdent`], brokerCtx: BrokerContext, event: `typeIdent`
) =
asyncSpawn `emitImplIdent`(brokerCtx, event)
)
if hasInlineFields:
# Typedesc emit constructor overloads for inline object/ref object types.
var emitCtorParams = newTree(nnkFormalParams, newEmptyNode())
let typedescParamType =
newTree(nnkBracketExpr, ident("typedesc"), copyNimTree(typeIdent))
@ -284,14 +440,17 @@ macro EventBroker*(body: untyped): untyped =
var emitCtorExpr = newTree(nnkObjConstr, copyNimTree(typeIdent))
for i in 0 ..< fieldNames.len:
emitCtorExpr.add(
newTree(nnkExprColonExpr, copyNimTree(fieldNames[i]), copyNimTree(fieldNames[i]))
newTree(
nnkExprColonExpr, copyNimTree(fieldNames[i]), copyNimTree(fieldNames[i])
)
)
let emitCtorCall = newCall(copyNimTree(emitImplIdent), emitCtorExpr)
let emitCtorBody = quote:
asyncSpawn `emitCtorCall`
let emitCtorCallDefault =
newCall(copyNimTree(emitImplIdent), ident("DefaultBrokerContext"), emitCtorExpr)
let emitCtorBodyDefault = quote:
asyncSpawn `emitCtorCallDefault`
let typedescEmitProc = newTree(
let typedescEmitProcDefault = newTree(
nnkProcDef,
postfix(ident("emit"), "*"),
newEmptyNode(),
@ -299,10 +458,43 @@ macro EventBroker*(body: untyped): untyped =
emitCtorParams,
newEmptyNode(),
newEmptyNode(),
emitCtorBody,
emitCtorBodyDefault,
)
result.add(typedescEmitProcDefault)
var emitCtorParamsCtx = newTree(nnkFormalParams, newEmptyNode())
emitCtorParamsCtx.add(
newTree(nnkIdentDefs, ident("_"), typedescParamType, newEmptyNode())
)
emitCtorParamsCtx.add(
newTree(nnkIdentDefs, ident("brokerCtx"), ident("BrokerContext"), newEmptyNode())
)
for i in 0 ..< fieldNames.len:
emitCtorParamsCtx.add(
newTree(
nnkIdentDefs,
copyNimTree(fieldNames[i]),
copyNimTree(fieldTypes[i]),
newEmptyNode(),
)
)
result.add(typedescEmitProc)
let emitCtorCallCtx =
newCall(copyNimTree(emitImplIdent), ident("brokerCtx"), copyNimTree(emitCtorExpr))
let emitCtorBodyCtx = quote:
asyncSpawn `emitCtorCallCtx`
let typedescEmitProcCtx = newTree(
nnkProcDef,
postfix(ident("emit"), "*"),
newEmptyNode(),
newEmptyNode(),
emitCtorParamsCtx,
newEmptyNode(),
newEmptyNode(),
emitCtorBodyCtx,
)
result.add(typedescEmitProcCtx)
when defined(eventBrokerDebug):
echo result.repr