AsyncEventBus: fix multiple listeners (#271)

Co-authored-by: dbrignoli <dbrignoli@audioscience.com>
This commit is contained in:
Tanguy 2022-05-20 11:57:37 +02:00 committed by GitHub
parent 1233f8fb4b
commit c5894bae1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 10 deletions

View File

@ -561,6 +561,13 @@ proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) =
var data = EventPayload[T](value: data, loc: loc) var data = EventPayload[T](value: data, loc: loc)
cast[EventPayloadBase](data) cast[EventPayloadBase](data)
# Used to capture the "subscriber" variable in the loops
# sugar.capture doesn't work in Nim <1.6
proc triggerSubscriberCallback(subscriber: EventBusKey) =
callSoon(proc(udata: pointer) =
subscriber.cb(bus, event, subscriber, payload)
)
bus.events.withValue(eventKey, item): bus.events.withValue(eventKey, item):
# Schedule waiters which are waiting for the event ``event``. # Schedule waiters which are waiting for the event ``event``.
for waiter in item.waiters: for waiter in item.waiters:
@ -571,11 +578,7 @@ proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) =
# Schedule subscriber's callbacks, which are subscribed to the event. # Schedule subscriber's callbacks, which are subscribed to the event.
for subscriber in item.subscribers: for subscriber in item.subscribers:
# Nim-1.6 says: "'subscriber' is of type <lent EventBusKey> which cannot be captured as it would violate memory safety" triggerSubscriberCallback(subscriber)
let subscriber_copy = subscriber
callSoon(proc(udata: pointer) =
subscriber_copy.cb(bus, event, subscriber_copy, payload)
)
# Schedule waiters which are waiting all events # Schedule waiters which are waiting all events
for waiter in bus.waiters: for waiter in bus.waiters:
@ -585,11 +588,7 @@ proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) =
# Schedule subscriber's callbacks which are subscribed to all events. # Schedule subscriber's callbacks which are subscribed to all events.
for subscriber in bus.subscribers: for subscriber in bus.subscribers:
# Nim-1.6 says: "'subscriber' is of type <lent EventBusKey> which cannot be captured as it would violate memory safety" triggerSubscriberCallback(subscriber)
let subscriber_copy = subscriber
callSoon(proc(udata: pointer) =
subscriber_copy.cb(bus, event, subscriber_copy, payload)
)
template emit*[T](bus: AsyncEventBus, event: string, data: T) = template emit*[T](bus: AsyncEventBus, event: string, data: T) =
## Emit new event ``event`` to the eventbus ``bus`` with payload ``data``. ## Emit new event ``event`` to the eventbus ``bus`` with payload ``data``.

View File

@ -511,3 +511,23 @@ suite "Asynchronous sync primitives test suite":
flagInt == high(int) flagInt == high(int)
waitFor(test()) waitFor(test())
test "AsyncEventBus() multiple subscribers test":
let
eventBus = newAsyncEventBus()
futA = newFuture[void]()
futB = newFuture[void]()
proc eventEV1(bus: AsyncEventBus, payload: EventPayload[int]) {.async.} =
futA.complete()
proc eventEV2(bus: AsyncEventBus, payload: EventPayload[int]) {.async.} =
futB.complete()
proc test() {.async.} =
discard eventBus.subscribe("EV", eventEV1)
discard eventBus.subscribe("EV", eventEV2)
eventBus.emit("EV", 5)
await allFutures(futA, futB).wait(1.seconds)
waitFor test()