Remove deprecated AsyncEventBus. (#461)
* Remove deprecated AsyncEventBus. Change number of tests for ThreadSignal. * Recover 1000 tests count.
This commit is contained in:
parent
9759f01016
commit
9896316599
|
@ -62,50 +62,6 @@ type
|
|||
AsyncLockError* = object of AsyncError
|
||||
## ``AsyncLock`` is either locked or unlocked.
|
||||
|
||||
EventBusSubscription*[T] = proc(bus: AsyncEventBus,
|
||||
payload: EventPayload[T]): Future[void] {.
|
||||
gcsafe, raises: [].}
|
||||
## EventBus subscription callback type.
|
||||
|
||||
EventBusAllSubscription* = proc(bus: AsyncEventBus,
|
||||
event: AwaitableEvent): Future[void] {.
|
||||
gcsafe, raises: [].}
|
||||
## EventBus subscription callback type.
|
||||
|
||||
EventBusCallback = proc(bus: AsyncEventBus, event: string, key: EventBusKey,
|
||||
data: EventPayloadBase) {.
|
||||
gcsafe, raises: [].}
|
||||
|
||||
EventBusKey* = object
|
||||
## Unique subscription key.
|
||||
eventName: string
|
||||
typeName: string
|
||||
unique: uint64
|
||||
cb: EventBusCallback
|
||||
|
||||
EventItem = object
|
||||
waiters: seq[FutureBase]
|
||||
subscribers: seq[EventBusKey]
|
||||
|
||||
AsyncEventBus* = ref object of RootObj
|
||||
## An eventbus object.
|
||||
counter: uint64
|
||||
events: Table[string, EventItem]
|
||||
subscribers: seq[EventBusKey]
|
||||
waiters: seq[Future[AwaitableEvent]]
|
||||
|
||||
EventPayloadBase* = ref object of RootObj
|
||||
loc: ptr SrcLoc
|
||||
|
||||
EventPayload*[T] = ref object of EventPayloadBase
|
||||
## Eventbus' event payload object
|
||||
value: T
|
||||
|
||||
AwaitableEvent* = object
|
||||
## Eventbus' event payload object
|
||||
eventName: string
|
||||
payload: EventPayloadBase
|
||||
|
||||
AsyncEventQueueFullError* = object of AsyncError
|
||||
|
||||
EventQueueKey* = distinct uint64
|
||||
|
@ -471,190 +427,6 @@ proc `$`*[T](aq: AsyncQueue[T]): string =
|
|||
res.add("]")
|
||||
res
|
||||
|
||||
template generateKey(typeName, eventName: string): string =
|
||||
"type[" & typeName & "]-key[" & eventName & "]"
|
||||
|
||||
proc newAsyncEventBus*(): AsyncEventBus {.
|
||||
deprecated: "Implementation has unfixable flaws, please use" &
|
||||
"AsyncEventQueue[T] instead".} =
|
||||
## Creates new ``AsyncEventBus``.
|
||||
AsyncEventBus(counter: 0'u64, events: initTable[string, EventItem]())
|
||||
|
||||
template get*[T](payload: EventPayload[T]): T =
|
||||
## Returns event payload data.
|
||||
payload.value
|
||||
|
||||
template location*(payload: EventPayloadBase): SrcLoc =
|
||||
## Returns source location address of event emitter.
|
||||
payload.loc[]
|
||||
|
||||
proc get*(event: AwaitableEvent, T: typedesc): T {.
|
||||
deprecated: "Implementation has unfixable flaws, please use " &
|
||||
"AsyncEventQueue[T] instead".} =
|
||||
## Returns event's payload of type ``T`` from event ``event``.
|
||||
cast[EventPayload[T]](event.payload).value
|
||||
|
||||
template event*(event: AwaitableEvent): string =
|
||||
## Returns event's name from event ``event``.
|
||||
event.eventName
|
||||
|
||||
template location*(event: AwaitableEvent): SrcLoc =
|
||||
## Returns source location address of event emitter.
|
||||
event.payload.loc[]
|
||||
|
||||
proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] {.
|
||||
deprecated: "Implementation has unfixable flaws, please use " &
|
||||
"AsyncEventQueue[T] instead".} =
|
||||
## Wait for the event from AsyncEventBus ``bus`` with name ``event``.
|
||||
##
|
||||
## Returned ``Future[T]`` will hold event's payload of type ``T``.
|
||||
var default: EventItem
|
||||
var retFuture = newFuture[T]("AsyncEventBus.waitEvent")
|
||||
let eventKey = generateKey(T.name, event)
|
||||
proc cancellation(udata: pointer) {.gcsafe, raises: [].} =
|
||||
if not(retFuture.finished()):
|
||||
bus.events.withValue(eventKey, item):
|
||||
item.waiters.keepItIf(it != cast[FutureBase](retFuture))
|
||||
retFuture.cancelCallback = cancellation
|
||||
let baseFuture = cast[FutureBase](retFuture)
|
||||
bus.events.mgetOrPut(eventKey, default).waiters.add(baseFuture)
|
||||
retFuture
|
||||
|
||||
proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] {.
|
||||
deprecated: "Implementation has unfixable flaws, please use " &
|
||||
"AsyncEventQueue[T] instead".} =
|
||||
## Wait for any event from AsyncEventBus ``bus``.
|
||||
##
|
||||
## Returns ``Future`` which holds helper object. Using this object you can
|
||||
## retrieve event's name and payload.
|
||||
var retFuture = newFuture[AwaitableEvent]("AsyncEventBus.waitAllEvents")
|
||||
proc cancellation(udata: pointer) {.gcsafe, raises: [].} =
|
||||
if not(retFuture.finished()):
|
||||
bus.waiters.keepItIf(it != retFuture)
|
||||
retFuture.cancelCallback = cancellation
|
||||
bus.waiters.add(retFuture)
|
||||
retFuture
|
||||
|
||||
proc subscribe*[T](bus: AsyncEventBus, event: string,
|
||||
callback: EventBusSubscription[T]): EventBusKey {.
|
||||
deprecated: "Implementation has unfixable flaws, please use " &
|
||||
"AsyncEventQueue[T] instead".} =
|
||||
## Subscribe to the event ``event`` passed through eventbus ``bus`` with
|
||||
## callback ``callback``.
|
||||
##
|
||||
## Returns key that can be used to unsubscribe.
|
||||
proc trampoline(tbus: AsyncEventBus, event: string, key: EventBusKey,
|
||||
data: EventPayloadBase) {.gcsafe, raises: [].} =
|
||||
let payload = cast[EventPayload[T]](data)
|
||||
asyncSpawn callback(bus, payload)
|
||||
|
||||
let subkey =
|
||||
block:
|
||||
inc(bus.counter)
|
||||
EventBusKey(eventName: event, typeName: T.name, unique: bus.counter,
|
||||
cb: trampoline)
|
||||
|
||||
var default: EventItem
|
||||
let eventKey = generateKey(T.name, event)
|
||||
bus.events.mgetOrPut(eventKey, default).subscribers.add(subkey)
|
||||
subkey
|
||||
|
||||
proc subscribeAll*(bus: AsyncEventBus,
|
||||
callback: EventBusAllSubscription): EventBusKey {.
|
||||
deprecated: "Implementation has unfixable flaws, please use " &
|
||||
"AsyncEventQueue instead".} =
|
||||
## Subscribe to all events passed through eventbus ``bus`` with callback
|
||||
## ``callback``.
|
||||
##
|
||||
## Returns key that can be used to unsubscribe.
|
||||
proc trampoline(tbus: AsyncEventBus, event: string, key: EventBusKey,
|
||||
data: EventPayloadBase) {.gcsafe, raises: [].} =
|
||||
let event = AwaitableEvent(eventName: event, payload: data)
|
||||
asyncSpawn callback(bus, event)
|
||||
|
||||
let subkey =
|
||||
block:
|
||||
inc(bus.counter)
|
||||
EventBusKey(eventName: "", typeName: "", unique: bus.counter,
|
||||
cb: trampoline)
|
||||
bus.subscribers.add(subkey)
|
||||
subkey
|
||||
|
||||
proc unsubscribe*(bus: AsyncEventBus, key: EventBusKey) {.
|
||||
deprecated: "Implementation has unfixable flaws, please use " &
|
||||
"AsyncEventQueue instead".} =
|
||||
## Cancel subscription of subscriber with key ``key`` from eventbus ``bus``.
|
||||
let eventKey = generateKey(key.typeName, key.eventName)
|
||||
|
||||
# Clean event's subscribers.
|
||||
bus.events.withValue(eventKey, item):
|
||||
item.subscribers.keepItIf(it.unique != key.unique)
|
||||
|
||||
# Clean subscribers subscribed to all events.
|
||||
bus.subscribers.keepItIf(it.unique != key.unique)
|
||||
|
||||
proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) =
|
||||
let
|
||||
eventKey = generateKey(T.name, event)
|
||||
payload =
|
||||
block:
|
||||
var data = EventPayload[T](value: data, loc: loc)
|
||||
cast[EventPayloadBase](data)
|
||||
|
||||
# Used to capture the "subscriber" variable in the loops
|
||||
# sugar.capture doesn't work in Nim <1.6
|
||||
proc triggerSubscriberCallback(subscriber: EventBusKey) =
|
||||
callSoon(proc(udata: pointer) =
|
||||
subscriber.cb(bus, event, subscriber, payload)
|
||||
)
|
||||
|
||||
bus.events.withValue(eventKey, item):
|
||||
# Schedule waiters which are waiting for the event ``event``.
|
||||
for waiter in item.waiters:
|
||||
var fut = cast[Future[T]](waiter)
|
||||
fut.complete(data)
|
||||
# Clear all the waiters.
|
||||
item.waiters.setLen(0)
|
||||
|
||||
# Schedule subscriber's callbacks, which are subscribed to the event.
|
||||
for subscriber in item.subscribers:
|
||||
triggerSubscriberCallback(subscriber)
|
||||
|
||||
# Schedule waiters which are waiting all events
|
||||
for waiter in bus.waiters:
|
||||
waiter.complete(AwaitableEvent(eventName: event, payload: payload))
|
||||
# Clear all the waiters.
|
||||
bus.waiters.setLen(0)
|
||||
|
||||
# Schedule subscriber's callbacks which are subscribed to all events.
|
||||
for subscriber in bus.subscribers:
|
||||
triggerSubscriberCallback(subscriber)
|
||||
|
||||
template emit*[T](bus: AsyncEventBus, event: string, data: T) {.
|
||||
deprecated: "Implementation has unfixable flaws, please use " &
|
||||
"AsyncEventQueue instead".} =
|
||||
## Emit new event ``event`` to the eventbus ``bus`` with payload ``data``.
|
||||
emit(bus, event, data, getSrcLocation())
|
||||
|
||||
proc emitWait[T](bus: AsyncEventBus, event: string, data: T,
|
||||
loc: ptr SrcLoc): Future[void] =
|
||||
var retFuture = newFuture[void]("AsyncEventBus.emitWait")
|
||||
proc continuation(udata: pointer) {.gcsafe.} =
|
||||
if not(retFuture.finished()):
|
||||
retFuture.complete()
|
||||
emit(bus, event, data, loc)
|
||||
callSoon(continuation)
|
||||
return retFuture
|
||||
|
||||
template emitWait*[T](bus: AsyncEventBus, event: string,
|
||||
data: T): Future[void] {.
|
||||
deprecated: "Implementation has unfixable flaws, please use " &
|
||||
"AsyncEventQueue instead".} =
|
||||
## Emit new event ``event`` to the eventbus ``bus`` with payload ``data`` and
|
||||
## wait until all the subscribers/waiters will receive notification about
|
||||
## event.
|
||||
emitWait(bus, event, data, getSrcLocation())
|
||||
|
||||
proc `==`(a, b: EventQueueKey): bool {.borrow.}
|
||||
|
||||
proc compact(ab: AsyncEventQueue) {.raises: [].} =
|
||||
|
|
Loading…
Reference in New Issue