Eventbus implementation (#214)

* Initial commit.

* Move AsyncEventBus implementation to asyncsync.nim
Add subscriptAll(), waitAllEvents() primitives.
Add emitWait() primitive.
Add emitter source location implementation.
Add tests.
This commit is contained in:
Eugene Kabanov 2021-09-15 16:55:15 +03:00 committed by GitHub
parent bbbcb55493
commit 80102a3b6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 387 additions and 11 deletions

View File

@ -8,11 +8,11 @@
# Apache License, version 2.0, (LICENSE-APACHEv2) # Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
## This module implements some core synchronization primitives ## This module implements some core synchronization primitives.
{.push raises: [Defect].} {.push raises: [Defect].}
import std/[sequtils, deques] import std/[sequtils, deques, tables, typetraits]
import ./asyncloop import ./asyncloop
export asyncloop export asyncloop
@ -62,6 +62,50 @@ type
AsyncLockError* = object of CatchableError AsyncLockError* = object of CatchableError
## ``AsyncLock`` is either locked or unlocked. ## ``AsyncLock`` is either locked or unlocked.
EventBusSubscription*[T] = proc(bus: AsyncEventBus,
payload: EventPayload[T]): Future[void] {.
gcsafe, raises: [Defect].}
## EventBus subscription callback type.
EventBusAllSubscription* = proc(bus: AsyncEventBus,
event: AwaitableEvent): Future[void] {.
gcsafe, raises: [Defect].}
## EventBus subscription callback type.
EventBusCallback = proc(bus: AsyncEventBus, event: string, key: EventBusKey,
data: EventPayloadBase) {.
gcsafe, raises: [Defect].}
EventBusKey* = object
## Unique subscription key.
eventName: string
typeName: string
unique: uint64
cb: EventBusCallback
EventItem = object
waiters: seq[FutureBase]
subscribers: seq[EventBusKey]
AsyncEventBus* = ref object of RootObj
## An eventbus object.
counter: uint64
events: Table[string, EventItem]
subscribers: seq[EventBusKey]
waiters: seq[Future[AwaitableEvent]]
EventPayloadBase* = ref object of RootObj
loc: ptr SrcLoc
EventPayload*[T] = ref object of EventPayloadBase
## Eventbus' event payload object
value: T
AwaitableEvent* = object
## Eventbus' event payload object
eventName: string
payload: EventPayloadBase
proc newAsyncLock*(): AsyncLock = proc newAsyncLock*(): AsyncLock =
## Creates new asynchronous lock ``AsyncLock``. ## Creates new asynchronous lock ``AsyncLock``.
## ##
@ -400,3 +444,166 @@ proc `$`*[T](aq: AsyncQueue[T]): string =
res.addQuoted(item) res.addQuoted(item)
res.add("]") res.add("]")
res res
template generateKey(typeName, eventName: string): string =
"type[" & typeName & "]-key[" & eventName & "]"
proc newAsyncEventBus*(): AsyncEventBus =
## Creates new ``AsyncEventBus``.
AsyncEventBus(counter: 0'u64, events: initTable[string, EventItem]())
template get*[T](payload: EventPayload[T]): T =
## Returns event payload data.
payload.value
template location*(payload: EventPayloadBase): SrcLoc =
## Returns source location address of event emitter.
payload.loc[]
proc get*(event: AwaitableEvent, T: typedesc): T =
## Returns event's payload of type ``T`` from event ``event``.
cast[EventPayload[T]](event.payload).value
template event*(event: AwaitableEvent): string =
## Returns event's name from event ``event``.
event.eventName
template location*(event: AwaitableEvent): SrcLoc =
## Returns source location address of event emitter.
event.payload.loc[]
proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] =
## Wait for the event from AsyncEventBus ``bus`` with name ``event``.
##
## Returned ``Future[T]`` will hold event's payload of type ``T``.
var default: EventItem
var retFuture = newFuture[T]("AsyncEventBus.waitEvent")
let eventKey = generateKey(T.name, event)
proc cancellation(udata: pointer) {.gcsafe, raises: [Defect].} =
if not(retFuture.finished()):
bus.events.withValue(eventKey, item):
item.waiters.keepItIf(it != cast[FutureBase](retFuture))
retFuture.cancelCallback = cancellation
let baseFuture = cast[FutureBase](retFuture)
bus.events.mgetOrPut(eventKey, default).waiters.add(baseFuture)
retFuture
proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] =
## Wait for any event from AsyncEventBus ``bus``.
##
## Returns ``Future`` which holds helper object. Using this object you can
## retrieve event's name and payload.
var retFuture = newFuture[AwaitableEvent]("AsyncEventBus.waitAllEvents")
proc cancellation(udata: pointer) {.gcsafe, raises: [Defect].} =
if not(retFuture.finished()):
bus.waiters.keepItIf(it != retFuture)
retFuture.cancelCallback = cancellation
bus.waiters.add(retFuture)
retFuture
proc subscribe*[T](bus: AsyncEventBus, event: string,
callback: EventBusSubscription[T]): EventBusKey =
## Subscribe to the event ``event`` passed through eventbus ``bus`` with
## callback ``callback``.
##
## Returns key that can be used to unsubscribe.
proc trampoline(tbus: AsyncEventBus, event: string, key: EventBusKey,
data: EventPayloadBase) {.gcsafe, raises: [Defect].} =
let payload = cast[EventPayload[T]](data)
asyncSpawn callback(bus, payload)
let subkey =
block:
inc(bus.counter)
EventBusKey(eventName: event, typeName: T.name, unique: bus.counter,
cb: trampoline)
var default: EventItem
let eventKey = generateKey(T.name, event)
bus.events.mgetOrPut(eventKey, default).subscribers.add(subkey)
subkey
proc subscribeAll*(bus: AsyncEventBus,
callback: EventBusAllSubscription): EventBusKey =
## Subscribe to all events passed through eventbus ``bus`` with callback
## ``callback``.
##
## Returns key that can be used to unsubscribe.
proc trampoline(tbus: AsyncEventBus, event: string, key: EventBusKey,
data: EventPayloadBase) {.gcsafe, raises: [Defect].} =
let event = AwaitableEvent(eventName: event, payload: data)
asyncSpawn callback(bus, event)
let subkey =
block:
inc(bus.counter)
EventBusKey(eventName: "", typeName: "", unique: bus.counter,
cb: trampoline)
bus.subscribers.add(subkey)
subkey
proc unsubscribe*(bus: AsyncEventBus, key: EventBusKey) =
## Cancel subscription of subscriber with key ``key`` from eventbus ``bus``.
let eventKey = generateKey(key.typeName, key.eventName)
# Clean event's subscribers.
bus.events.withValue(eventKey, item):
item.subscribers.keepItIf(it.unique != key.unique)
# Clean subscribers subscribed to all events.
bus.subscribers.keepItIf(it.unique != key.unique)
proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) =
let
eventKey = generateKey(T.name, event)
payload =
block:
var data = EventPayload[T](value: data, loc: loc)
cast[EventPayloadBase](data)
bus.events.withValue(eventKey, item):
# Schedule waiters which are waiting for the event ``event``.
for waiter in item.waiters:
var fut = cast[Future[T]](waiter)
fut.complete(data)
# Clear all the waiters.
item.waiters.setLen(0)
# Schedule subscriber's callbacks, which are subscribed to the event.
for subscriber in item.subscribers:
callSoon(proc(udata: pointer) =
subscriber.cb(bus, event, subscriber, payload)
)
# Schedule waiters which are waiting all events
for waiter in bus.waiters:
waiter.complete(AwaitableEvent(eventName: event, payload: payload))
# Clear all the waiters.
bus.waiters.setLen(0)
# Schedule subscriber's callbacks which are subscribed to all events.
for subscriber in bus.subscribers:
callSoon(proc(udata: pointer) =
subscriber.cb(bus, event, subscriber, payload)
)
template emit*[T](bus: AsyncEventBus, event: string, data: T) =
## Emit new event ``event`` to the eventbus ``bus`` with payload ``data``.
emit(bus, event, data, getSrcLocation())
proc emitWait[T](bus: AsyncEventBus, event: string, data: T,
loc: ptr SrcLoc): Future[void] =
var retFuture = newFuture[void]("AsyncEventBus.emitWait")
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
emit(bus, event, data, loc)
callSoon(continuation)
return retFuture
template emitWait*[T](bus: AsyncEventBus, event: string,
data: T): Future[void] =
## Emit new event ``event`` to the eventbus ``bus`` with payload ``data`` and
## wait until all the subscribers/waiters will receive notification about
## event.
emitWait(bus, event, data, getSrcLocation())

View File

@ -1,4 +1,13 @@
{.push raises: [].} #
# Chronos source location utilities
# (c) Copyright 2018-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
{.push raises: [Defect].}
import stew/base10
type type
SrcLoc* = object SrcLoc* = object
@ -7,15 +16,16 @@ type
line*: int line*: int
proc `$`*(loc: ptr SrcLoc): string = proc `$`*(loc: ptr SrcLoc): string =
result.add loc.file var res = $loc.file
result.add "(" res.add("(")
result.add $loc.line res.add(Base10.toString(uint64(loc.line)))
result.add ")" res.add(")")
result.add " " res.add(" ")
if len(loc.procedure) == 0: if len(loc.procedure) == 0:
result.add "[unspecified]" res.add("[unspecified]")
else: else:
result.add loc.procedure res.add($loc.procedure)
res
proc srcLocImpl(procedure: static string, proc srcLocImpl(procedure: static string,
file: static string, line: static int): ptr SrcLoc = file: static string, line: static int): ptr SrcLoc =

View File

@ -396,7 +396,7 @@ proc resolveTAddress*(address: string, port: Port,
domain: Domain): seq[TransportAddress] {. domain: Domain): seq[TransportAddress] {.
raises: [Defect, TransportAddressError].} = raises: [Defect, TransportAddressError].} =
var res: seq[TransportAddress] var res: seq[TransportAddress]
let aiList = getAddrInfo(address, Port(port), domain) let aiList = getAddrInfo(address, port, domain)
var it = aiList var it = aiList
while not(isNil(it)): while not(isNil(it)):
var ta: TransportAddress var ta: TransportAddress

View File

@ -352,3 +352,162 @@ suite "Asynchronous sync primitives test suite":
check test8() == true check test8() == true
test "AsyncQueue() contains test": test "AsyncQueue() contains test":
check test9() == true check test9() == true
test "AsyncEventBus() awaitable primitives test":
const TestsCount = 10
var bus = newAsyncEventBus()
var flag = ""
proc waiter(bus: AsyncEventBus) {.async.} =
for i in 0 ..< TestsCount:
let payload = await bus.waitEvent(string, "event")
flag = flag & payload
proc sender(bus: AsyncEventBus) {.async.} =
for i in 0 ..< (TestsCount + (TestsCount div 2)):
await bus.emitWait("event", $i)
waitFor allFutures(waiter(bus), sender(bus))
check flag == "0123456789"
test "AsyncEventBus() waiters test":
var bus = newAsyncEventBus()
let fut11 = bus.waitEvent(int, "event")
let fut12 = bus.waitEvent(int, "event")
let fut13 = bus.waitEvent(int, "event")
let fut21 = bus.waitEvent(string, "event")
let fut22 = bus.waitEvent(string, "event")
let fut23 = bus.waitEvent(string, "event")
bus.emit("event", 65535)
check:
fut11.done() == true
fut12.done() == true
fut13.done() == true
fut21.finished() == false
fut22.finished() == false
fut23.finished() == false
bus.emit("event", "data")
check:
fut21.done() == true
fut22.done() == true
fut23.done() == true
test "AsyncEventBus() subscribers test":
const TestsCount = 10
var bus = newAsyncEventBus()
var flagInt = 0
var flagStr = ""
proc eventIntCallback(bus: AsyncEventBus,
payload: EventPayload[int]) {.async.} =
flagInt = payload.get()
proc eventStrCallback(bus: AsyncEventBus,
payload: EventPayload[string]) {.async.} =
flagStr = payload.get()
let key1 = bus.subscribe("event", eventIntCallback)
let key2 = bus.subscribe("event", eventStrCallback)
proc test() {.async.} =
check:
flagInt == 0
flagStr == ""
for i in 0 ..< TestsCount:
await bus.emitWait("event", i)
check:
flagInt == i
flagStr == ""
flagInt = 0
for i in 0 ..< TestsCount:
await bus.emitWait("event", $i)
check:
flagInt == 0
flagStr == $i
flagInt = 0
flagStr = ""
bus.unsubscribe(key1)
for i in 0 ..< TestsCount:
await bus.emitWait("event", i)
check:
flagInt == 0
flagStr == ""
flagInt = 0
flagStr = ""
bus.unsubscribe(key2)
for i in 0 ..< TestsCount:
await bus.emitWait("event", $i)
check:
flagInt == 0
flagStr == ""
waitFor(test())
test "AsyncEventBus() waiters for all events test":
var bus = newAsyncEventBus()
let fut11 = bus.waitAllEvents()
let fut12 = bus.waitAllEvents()
bus.emit("intevent", 65535)
check:
fut11.done() == true
fut12.done() == true
let event11 = fut11.read()
let event12 = fut12.read()
check:
event11.event() == "intevent"
event12.event() == "intevent"
event11.get(int) == 65535
event12.get(int) == 65535
let fut21 = bus.waitAllEvents()
let fut22 = bus.waitAllEvents()
bus.emit("strevent", "hello")
check:
fut21.done() == true
fut22.done() == true
let event21 = fut21.read()
let event22 = fut22.read()
check:
event21.event() == "strevent"
event22.event() == "strevent"
event21.get(string) == "hello"
event22.get(string) == "hello"
test "AsyncEventBus() subscribers to all events test":
const TestsCount = 10
var
bus = newAsyncEventBus()
flagInt = 0
flagStr = ""
proc eventCallback(bus: AsyncEventBus, event: AwaitableEvent) {.async.} =
case event.event()
of "event1":
flagStr = ""
flagInt = event.get(int)
of "event2":
flagInt = 0
flagStr = event.get(string)
else:
flagInt = -1
flagStr = "error"
proc test() {.async.} =
let key = bus.subscribeAll(eventCallback)
for i in 0 ..< TestsCount:
await bus.emitWait("event1", i)
check:
flagStr == ""
flagInt == i
await bus.emitWait("event2", $i)
check:
flagStr == $i
flagInt == 0
bus.unsubscribe(key)
flagInt = high(int)
flagStr = "empty"
for i in 0 ..< TestsCount:
await bus.emitWait("event1", i)
check:
flagStr == "empty"
flagInt == high(int)
await bus.emitWait("event2", $i)
check:
flagStr == "empty"
flagInt == high(int)
waitFor(test())