From 80102a3b6a8bc9c205302ec5d0f895d01a4e43df Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Wed, 15 Sep 2021 16:55:15 +0300 Subject: [PATCH] Eventbus implementation (#214) * Initial commit. * Move AsyncEventBus implementation to asyncsync.nim Add subscriptAll(), waitAllEvents() primitives. Add emitWait() primitive. Add emitter source location implementation. Add tests. --- chronos/asyncsync.nim | 211 +++++++++++++++++++++++++++++++++- chronos/srcloc.nim | 26 +++-- chronos/transports/common.nim | 2 +- tests/testsync.nim | 159 +++++++++++++++++++++++++ 4 files changed, 387 insertions(+), 11 deletions(-) diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index c28bc659..eed5f4e4 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -8,11 +8,11 @@ # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -## This module implements some core synchronization primitives +## This module implements some core synchronization primitives. {.push raises: [Defect].} -import std/[sequtils, deques] +import std/[sequtils, deques, tables, typetraits] import ./asyncloop export asyncloop @@ -62,6 +62,50 @@ type AsyncLockError* = object of CatchableError ## ``AsyncLock`` is either locked or unlocked. + EventBusSubscription*[T] = proc(bus: AsyncEventBus, + payload: EventPayload[T]): Future[void] {. + gcsafe, raises: [Defect].} + ## EventBus subscription callback type. + + EventBusAllSubscription* = proc(bus: AsyncEventBus, + event: AwaitableEvent): Future[void] {. + gcsafe, raises: [Defect].} + ## EventBus subscription callback type. + + EventBusCallback = proc(bus: AsyncEventBus, event: string, key: EventBusKey, + data: EventPayloadBase) {. + gcsafe, raises: [Defect].} + + EventBusKey* = object + ## Unique subscription key. + eventName: string + typeName: string + unique: uint64 + cb: EventBusCallback + + EventItem = object + waiters: seq[FutureBase] + subscribers: seq[EventBusKey] + + AsyncEventBus* = ref object of RootObj + ## An eventbus object. + counter: uint64 + events: Table[string, EventItem] + subscribers: seq[EventBusKey] + waiters: seq[Future[AwaitableEvent]] + + EventPayloadBase* = ref object of RootObj + loc: ptr SrcLoc + + EventPayload*[T] = ref object of EventPayloadBase + ## Eventbus' event payload object + value: T + + AwaitableEvent* = object + ## Eventbus' event payload object + eventName: string + payload: EventPayloadBase + proc newAsyncLock*(): AsyncLock = ## Creates new asynchronous lock ``AsyncLock``. ## @@ -400,3 +444,166 @@ proc `$`*[T](aq: AsyncQueue[T]): string = res.addQuoted(item) res.add("]") res + +template generateKey(typeName, eventName: string): string = + "type[" & typeName & "]-key[" & eventName & "]" + +proc newAsyncEventBus*(): AsyncEventBus = + ## Creates new ``AsyncEventBus``. + AsyncEventBus(counter: 0'u64, events: initTable[string, EventItem]()) + +template get*[T](payload: EventPayload[T]): T = + ## Returns event payload data. + payload.value + +template location*(payload: EventPayloadBase): SrcLoc = + ## Returns source location address of event emitter. + payload.loc[] + +proc get*(event: AwaitableEvent, T: typedesc): T = + ## Returns event's payload of type ``T`` from event ``event``. + cast[EventPayload[T]](event.payload).value + +template event*(event: AwaitableEvent): string = + ## Returns event's name from event ``event``. + event.eventName + +template location*(event: AwaitableEvent): SrcLoc = + ## Returns source location address of event emitter. + event.payload.loc[] + +proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] = + ## Wait for the event from AsyncEventBus ``bus`` with name ``event``. + ## + ## Returned ``Future[T]`` will hold event's payload of type ``T``. + var default: EventItem + var retFuture = newFuture[T]("AsyncEventBus.waitEvent") + let eventKey = generateKey(T.name, event) + proc cancellation(udata: pointer) {.gcsafe, raises: [Defect].} = + if not(retFuture.finished()): + bus.events.withValue(eventKey, item): + item.waiters.keepItIf(it != cast[FutureBase](retFuture)) + retFuture.cancelCallback = cancellation + let baseFuture = cast[FutureBase](retFuture) + bus.events.mgetOrPut(eventKey, default).waiters.add(baseFuture) + retFuture + +proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] = + ## Wait for any event from AsyncEventBus ``bus``. + ## + ## Returns ``Future`` which holds helper object. Using this object you can + ## retrieve event's name and payload. + var retFuture = newFuture[AwaitableEvent]("AsyncEventBus.waitAllEvents") + proc cancellation(udata: pointer) {.gcsafe, raises: [Defect].} = + if not(retFuture.finished()): + bus.waiters.keepItIf(it != retFuture) + retFuture.cancelCallback = cancellation + bus.waiters.add(retFuture) + retFuture + +proc subscribe*[T](bus: AsyncEventBus, event: string, + callback: EventBusSubscription[T]): EventBusKey = + ## Subscribe to the event ``event`` passed through eventbus ``bus`` with + ## callback ``callback``. + ## + ## Returns key that can be used to unsubscribe. + proc trampoline(tbus: AsyncEventBus, event: string, key: EventBusKey, + data: EventPayloadBase) {.gcsafe, raises: [Defect].} = + let payload = cast[EventPayload[T]](data) + asyncSpawn callback(bus, payload) + + let subkey = + block: + inc(bus.counter) + EventBusKey(eventName: event, typeName: T.name, unique: bus.counter, + cb: trampoline) + + var default: EventItem + let eventKey = generateKey(T.name, event) + bus.events.mgetOrPut(eventKey, default).subscribers.add(subkey) + subkey + +proc subscribeAll*(bus: AsyncEventBus, + callback: EventBusAllSubscription): EventBusKey = + ## Subscribe to all events passed through eventbus ``bus`` with callback + ## ``callback``. + ## + ## Returns key that can be used to unsubscribe. + proc trampoline(tbus: AsyncEventBus, event: string, key: EventBusKey, + data: EventPayloadBase) {.gcsafe, raises: [Defect].} = + let event = AwaitableEvent(eventName: event, payload: data) + asyncSpawn callback(bus, event) + + let subkey = + block: + inc(bus.counter) + EventBusKey(eventName: "", typeName: "", unique: bus.counter, + cb: trampoline) + bus.subscribers.add(subkey) + subkey + +proc unsubscribe*(bus: AsyncEventBus, key: EventBusKey) = + ## Cancel subscription of subscriber with key ``key`` from eventbus ``bus``. + let eventKey = generateKey(key.typeName, key.eventName) + + # Clean event's subscribers. + bus.events.withValue(eventKey, item): + item.subscribers.keepItIf(it.unique != key.unique) + + # Clean subscribers subscribed to all events. + bus.subscribers.keepItIf(it.unique != key.unique) + +proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) = + let + eventKey = generateKey(T.name, event) + payload = + block: + var data = EventPayload[T](value: data, loc: loc) + cast[EventPayloadBase](data) + + bus.events.withValue(eventKey, item): + # Schedule waiters which are waiting for the event ``event``. + for waiter in item.waiters: + var fut = cast[Future[T]](waiter) + fut.complete(data) + # Clear all the waiters. + item.waiters.setLen(0) + + # Schedule subscriber's callbacks, which are subscribed to the event. + for subscriber in item.subscribers: + callSoon(proc(udata: pointer) = + subscriber.cb(bus, event, subscriber, payload) + ) + + # Schedule waiters which are waiting all events + for waiter in bus.waiters: + waiter.complete(AwaitableEvent(eventName: event, payload: payload)) + # Clear all the waiters. + bus.waiters.setLen(0) + + # Schedule subscriber's callbacks which are subscribed to all events. + for subscriber in bus.subscribers: + callSoon(proc(udata: pointer) = + subscriber.cb(bus, event, subscriber, payload) + ) + +template emit*[T](bus: AsyncEventBus, event: string, data: T) = + ## Emit new event ``event`` to the eventbus ``bus`` with payload ``data``. + emit(bus, event, data, getSrcLocation()) + +proc emitWait[T](bus: AsyncEventBus, event: string, data: T, + loc: ptr SrcLoc): Future[void] = + var retFuture = newFuture[void]("AsyncEventBus.emitWait") + proc continuation(udata: pointer) {.gcsafe.} = + if not(retFuture.finished()): + retFuture.complete() + emit(bus, event, data, loc) + callSoon(continuation) + return retFuture + +template emitWait*[T](bus: AsyncEventBus, event: string, + data: T): Future[void] = + ## Emit new event ``event`` to the eventbus ``bus`` with payload ``data`` and + ## wait until all the subscribers/waiters will receive notification about + ## event. + emitWait(bus, event, data, getSrcLocation()) diff --git a/chronos/srcloc.nim b/chronos/srcloc.nim index 576fc58a..8652e9dd 100644 --- a/chronos/srcloc.nim +++ b/chronos/srcloc.nim @@ -1,4 +1,13 @@ -{.push raises: [].} +# +# Chronos source location utilities +# (c) Copyright 2018-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +{.push raises: [Defect].} +import stew/base10 type SrcLoc* = object @@ -7,15 +16,16 @@ type line*: int proc `$`*(loc: ptr SrcLoc): string = - result.add loc.file - result.add "(" - result.add $loc.line - result.add ")" - result.add " " + var res = $loc.file + res.add("(") + res.add(Base10.toString(uint64(loc.line))) + res.add(")") + res.add(" ") if len(loc.procedure) == 0: - result.add "[unspecified]" + res.add("[unspecified]") else: - result.add loc.procedure + res.add($loc.procedure) + res proc srcLocImpl(procedure: static string, file: static string, line: static int): ptr SrcLoc = diff --git a/chronos/transports/common.nim b/chronos/transports/common.nim index 305090c7..b6a4d127 100644 --- a/chronos/transports/common.nim +++ b/chronos/transports/common.nim @@ -396,7 +396,7 @@ proc resolveTAddress*(address: string, port: Port, domain: Domain): seq[TransportAddress] {. raises: [Defect, TransportAddressError].} = var res: seq[TransportAddress] - let aiList = getAddrInfo(address, Port(port), domain) + let aiList = getAddrInfo(address, port, domain) var it = aiList while not(isNil(it)): var ta: TransportAddress diff --git a/tests/testsync.nim b/tests/testsync.nim index 75ccd120..98a16b58 100644 --- a/tests/testsync.nim +++ b/tests/testsync.nim @@ -352,3 +352,162 @@ suite "Asynchronous sync primitives test suite": check test8() == true test "AsyncQueue() contains test": check test9() == true + test "AsyncEventBus() awaitable primitives test": + const TestsCount = 10 + var bus = newAsyncEventBus() + var flag = "" + + proc waiter(bus: AsyncEventBus) {.async.} = + for i in 0 ..< TestsCount: + let payload = await bus.waitEvent(string, "event") + flag = flag & payload + + proc sender(bus: AsyncEventBus) {.async.} = + for i in 0 ..< (TestsCount + (TestsCount div 2)): + await bus.emitWait("event", $i) + + waitFor allFutures(waiter(bus), sender(bus)) + check flag == "0123456789" + test "AsyncEventBus() waiters test": + var bus = newAsyncEventBus() + let fut11 = bus.waitEvent(int, "event") + let fut12 = bus.waitEvent(int, "event") + let fut13 = bus.waitEvent(int, "event") + let fut21 = bus.waitEvent(string, "event") + let fut22 = bus.waitEvent(string, "event") + let fut23 = bus.waitEvent(string, "event") + bus.emit("event", 65535) + check: + fut11.done() == true + fut12.done() == true + fut13.done() == true + fut21.finished() == false + fut22.finished() == false + fut23.finished() == false + bus.emit("event", "data") + check: + fut21.done() == true + fut22.done() == true + fut23.done() == true + test "AsyncEventBus() subscribers test": + const TestsCount = 10 + var bus = newAsyncEventBus() + var flagInt = 0 + var flagStr = "" + proc eventIntCallback(bus: AsyncEventBus, + payload: EventPayload[int]) {.async.} = + flagInt = payload.get() + proc eventStrCallback(bus: AsyncEventBus, + payload: EventPayload[string]) {.async.} = + flagStr = payload.get() + + let key1 = bus.subscribe("event", eventIntCallback) + let key2 = bus.subscribe("event", eventStrCallback) + + proc test() {.async.} = + check: + flagInt == 0 + flagStr == "" + for i in 0 ..< TestsCount: + await bus.emitWait("event", i) + check: + flagInt == i + flagStr == "" + flagInt = 0 + for i in 0 ..< TestsCount: + await bus.emitWait("event", $i) + check: + flagInt == 0 + flagStr == $i + flagInt = 0 + flagStr = "" + bus.unsubscribe(key1) + for i in 0 ..< TestsCount: + await bus.emitWait("event", i) + check: + flagInt == 0 + flagStr == "" + flagInt = 0 + flagStr = "" + bus.unsubscribe(key2) + for i in 0 ..< TestsCount: + await bus.emitWait("event", $i) + check: + flagInt == 0 + flagStr == "" + waitFor(test()) + test "AsyncEventBus() waiters for all events test": + var bus = newAsyncEventBus() + let fut11 = bus.waitAllEvents() + let fut12 = bus.waitAllEvents() + bus.emit("intevent", 65535) + check: + fut11.done() == true + fut12.done() == true + let event11 = fut11.read() + let event12 = fut12.read() + check: + event11.event() == "intevent" + event12.event() == "intevent" + event11.get(int) == 65535 + event12.get(int) == 65535 + + let fut21 = bus.waitAllEvents() + let fut22 = bus.waitAllEvents() + bus.emit("strevent", "hello") + check: + fut21.done() == true + fut22.done() == true + let event21 = fut21.read() + let event22 = fut22.read() + check: + event21.event() == "strevent" + event22.event() == "strevent" + event21.get(string) == "hello" + event22.get(string) == "hello" + test "AsyncEventBus() subscribers to all events test": + const TestsCount = 10 + var + bus = newAsyncEventBus() + flagInt = 0 + flagStr = "" + + proc eventCallback(bus: AsyncEventBus, event: AwaitableEvent) {.async.} = + case event.event() + of "event1": + flagStr = "" + flagInt = event.get(int) + of "event2": + flagInt = 0 + flagStr = event.get(string) + else: + flagInt = -1 + flagStr = "error" + + proc test() {.async.} = + let key = bus.subscribeAll(eventCallback) + for i in 0 ..< TestsCount: + await bus.emitWait("event1", i) + check: + flagStr == "" + flagInt == i + await bus.emitWait("event2", $i) + check: + flagStr == $i + flagInt == 0 + + bus.unsubscribe(key) + + flagInt = high(int) + flagStr = "empty" + for i in 0 ..< TestsCount: + await bus.emitWait("event1", i) + check: + flagStr == "empty" + flagInt == high(int) + await bus.emitWait("event2", $i) + check: + flagStr == "empty" + flagInt == high(int) + + waitFor(test())