diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index 007c44ae..824c0df3 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -561,6 +561,13 @@ proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) = var data = EventPayload[T](value: data, loc: loc) cast[EventPayloadBase](data) + # Used to capture the "subscriber" variable in the loops + # sugar.capture doesn't work in Nim <1.6 + proc triggerSubscriberCallback(subscriber: EventBusKey) = + callSoon(proc(udata: pointer) = + subscriber.cb(bus, event, subscriber, payload) + ) + bus.events.withValue(eventKey, item): # Schedule waiters which are waiting for the event ``event``. for waiter in item.waiters: @@ -571,11 +578,7 @@ proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) = # Schedule subscriber's callbacks, which are subscribed to the event. for subscriber in item.subscribers: - # Nim-1.6 says: "'subscriber' is of type which cannot be captured as it would violate memory safety" - let subscriber_copy = subscriber - callSoon(proc(udata: pointer) = - subscriber_copy.cb(bus, event, subscriber_copy, payload) - ) + triggerSubscriberCallback(subscriber) # Schedule waiters which are waiting all events for waiter in bus.waiters: @@ -585,11 +588,7 @@ proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) = # Schedule subscriber's callbacks which are subscribed to all events. for subscriber in bus.subscribers: - # Nim-1.6 says: "'subscriber' is of type which cannot be captured as it would violate memory safety" - let subscriber_copy = subscriber - callSoon(proc(udata: pointer) = - subscriber_copy.cb(bus, event, subscriber_copy, payload) - ) + triggerSubscriberCallback(subscriber) template emit*[T](bus: AsyncEventBus, event: string, data: T) = ## Emit new event ``event`` to the eventbus ``bus`` with payload ``data``. diff --git a/tests/testsync.nim b/tests/testsync.nim index 98a16b58..e1e0491f 100644 --- a/tests/testsync.nim +++ b/tests/testsync.nim @@ -511,3 +511,23 @@ suite "Asynchronous sync primitives test suite": flagInt == high(int) waitFor(test()) + + test "AsyncEventBus() multiple subscribers test": + let + eventBus = newAsyncEventBus() + futA = newFuture[void]() + futB = newFuture[void]() + + proc eventEV1(bus: AsyncEventBus, payload: EventPayload[int]) {.async.} = + futA.complete() + + proc eventEV2(bus: AsyncEventBus, payload: EventPayload[int]) {.async.} = + futB.complete() + + proc test() {.async.} = + discard eventBus.subscribe("EV", eventEV1) + discard eventBus.subscribe("EV", eventEV2) + eventBus.emit("EV", 5) + await allFutures(futA, futB).wait(1.seconds) + + waitFor test() \ No newline at end of file