diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index 0feb51e..fa23471 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -62,50 +62,6 @@ type AsyncLockError* = object of AsyncError ## ``AsyncLock`` is either locked or unlocked. - EventBusSubscription*[T] = proc(bus: AsyncEventBus, - payload: EventPayload[T]): Future[void] {. - gcsafe, raises: [].} - ## EventBus subscription callback type. - - EventBusAllSubscription* = proc(bus: AsyncEventBus, - event: AwaitableEvent): Future[void] {. - gcsafe, raises: [].} - ## EventBus subscription callback type. - - EventBusCallback = proc(bus: AsyncEventBus, event: string, key: EventBusKey, - data: EventPayloadBase) {. - gcsafe, raises: [].} - - EventBusKey* = object - ## Unique subscription key. - eventName: string - typeName: string - unique: uint64 - cb: EventBusCallback - - EventItem = object - waiters: seq[FutureBase] - subscribers: seq[EventBusKey] - - AsyncEventBus* = ref object of RootObj - ## An eventbus object. - counter: uint64 - events: Table[string, EventItem] - subscribers: seq[EventBusKey] - waiters: seq[Future[AwaitableEvent]] - - EventPayloadBase* = ref object of RootObj - loc: ptr SrcLoc - - EventPayload*[T] = ref object of EventPayloadBase - ## Eventbus' event payload object - value: T - - AwaitableEvent* = object - ## Eventbus' event payload object - eventName: string - payload: EventPayloadBase - AsyncEventQueueFullError* = object of AsyncError EventQueueKey* = distinct uint64 @@ -471,190 +427,6 @@ proc `$`*[T](aq: AsyncQueue[T]): string = res.add("]") res -template generateKey(typeName, eventName: string): string = - "type[" & typeName & "]-key[" & eventName & "]" - -proc newAsyncEventBus*(): AsyncEventBus {. - deprecated: "Implementation has unfixable flaws, please use" & - "AsyncEventQueue[T] instead".} = - ## Creates new ``AsyncEventBus``. - AsyncEventBus(counter: 0'u64, events: initTable[string, EventItem]()) - -template get*[T](payload: EventPayload[T]): T = - ## Returns event payload data. - payload.value - -template location*(payload: EventPayloadBase): SrcLoc = - ## Returns source location address of event emitter. - payload.loc[] - -proc get*(event: AwaitableEvent, T: typedesc): T {. - deprecated: "Implementation has unfixable flaws, please use " & - "AsyncEventQueue[T] instead".} = - ## Returns event's payload of type ``T`` from event ``event``. - cast[EventPayload[T]](event.payload).value - -template event*(event: AwaitableEvent): string = - ## Returns event's name from event ``event``. - event.eventName - -template location*(event: AwaitableEvent): SrcLoc = - ## Returns source location address of event emitter. - event.payload.loc[] - -proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] {. - deprecated: "Implementation has unfixable flaws, please use " & - "AsyncEventQueue[T] instead".} = - ## Wait for the event from AsyncEventBus ``bus`` with name ``event``. - ## - ## Returned ``Future[T]`` will hold event's payload of type ``T``. - var default: EventItem - var retFuture = newFuture[T]("AsyncEventBus.waitEvent") - let eventKey = generateKey(T.name, event) - proc cancellation(udata: pointer) {.gcsafe, raises: [].} = - if not(retFuture.finished()): - bus.events.withValue(eventKey, item): - item.waiters.keepItIf(it != cast[FutureBase](retFuture)) - retFuture.cancelCallback = cancellation - let baseFuture = cast[FutureBase](retFuture) - bus.events.mgetOrPut(eventKey, default).waiters.add(baseFuture) - retFuture - -proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] {. - deprecated: "Implementation has unfixable flaws, please use " & - "AsyncEventQueue[T] instead".} = - ## Wait for any event from AsyncEventBus ``bus``. - ## - ## Returns ``Future`` which holds helper object. Using this object you can - ## retrieve event's name and payload. - var retFuture = newFuture[AwaitableEvent]("AsyncEventBus.waitAllEvents") - proc cancellation(udata: pointer) {.gcsafe, raises: [].} = - if not(retFuture.finished()): - bus.waiters.keepItIf(it != retFuture) - retFuture.cancelCallback = cancellation - bus.waiters.add(retFuture) - retFuture - -proc subscribe*[T](bus: AsyncEventBus, event: string, - callback: EventBusSubscription[T]): EventBusKey {. - deprecated: "Implementation has unfixable flaws, please use " & - "AsyncEventQueue[T] instead".} = - ## Subscribe to the event ``event`` passed through eventbus ``bus`` with - ## callback ``callback``. - ## - ## Returns key that can be used to unsubscribe. - proc trampoline(tbus: AsyncEventBus, event: string, key: EventBusKey, - data: EventPayloadBase) {.gcsafe, raises: [].} = - let payload = cast[EventPayload[T]](data) - asyncSpawn callback(bus, payload) - - let subkey = - block: - inc(bus.counter) - EventBusKey(eventName: event, typeName: T.name, unique: bus.counter, - cb: trampoline) - - var default: EventItem - let eventKey = generateKey(T.name, event) - bus.events.mgetOrPut(eventKey, default).subscribers.add(subkey) - subkey - -proc subscribeAll*(bus: AsyncEventBus, - callback: EventBusAllSubscription): EventBusKey {. - deprecated: "Implementation has unfixable flaws, please use " & - "AsyncEventQueue instead".} = - ## Subscribe to all events passed through eventbus ``bus`` with callback - ## ``callback``. - ## - ## Returns key that can be used to unsubscribe. - proc trampoline(tbus: AsyncEventBus, event: string, key: EventBusKey, - data: EventPayloadBase) {.gcsafe, raises: [].} = - let event = AwaitableEvent(eventName: event, payload: data) - asyncSpawn callback(bus, event) - - let subkey = - block: - inc(bus.counter) - EventBusKey(eventName: "", typeName: "", unique: bus.counter, - cb: trampoline) - bus.subscribers.add(subkey) - subkey - -proc unsubscribe*(bus: AsyncEventBus, key: EventBusKey) {. - deprecated: "Implementation has unfixable flaws, please use " & - "AsyncEventQueue instead".} = - ## Cancel subscription of subscriber with key ``key`` from eventbus ``bus``. - let eventKey = generateKey(key.typeName, key.eventName) - - # Clean event's subscribers. - bus.events.withValue(eventKey, item): - item.subscribers.keepItIf(it.unique != key.unique) - - # Clean subscribers subscribed to all events. - bus.subscribers.keepItIf(it.unique != key.unique) - -proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) = - let - eventKey = generateKey(T.name, event) - payload = - block: - var data = EventPayload[T](value: data, loc: loc) - cast[EventPayloadBase](data) - - # Used to capture the "subscriber" variable in the loops - # sugar.capture doesn't work in Nim <1.6 - proc triggerSubscriberCallback(subscriber: EventBusKey) = - callSoon(proc(udata: pointer) = - subscriber.cb(bus, event, subscriber, payload) - ) - - bus.events.withValue(eventKey, item): - # Schedule waiters which are waiting for the event ``event``. - for waiter in item.waiters: - var fut = cast[Future[T]](waiter) - fut.complete(data) - # Clear all the waiters. - item.waiters.setLen(0) - - # Schedule subscriber's callbacks, which are subscribed to the event. - for subscriber in item.subscribers: - triggerSubscriberCallback(subscriber) - - # Schedule waiters which are waiting all events - for waiter in bus.waiters: - waiter.complete(AwaitableEvent(eventName: event, payload: payload)) - # Clear all the waiters. - bus.waiters.setLen(0) - - # Schedule subscriber's callbacks which are subscribed to all events. - for subscriber in bus.subscribers: - triggerSubscriberCallback(subscriber) - -template emit*[T](bus: AsyncEventBus, event: string, data: T) {. - deprecated: "Implementation has unfixable flaws, please use " & - "AsyncEventQueue instead".} = - ## Emit new event ``event`` to the eventbus ``bus`` with payload ``data``. - emit(bus, event, data, getSrcLocation()) - -proc emitWait[T](bus: AsyncEventBus, event: string, data: T, - loc: ptr SrcLoc): Future[void] = - var retFuture = newFuture[void]("AsyncEventBus.emitWait") - proc continuation(udata: pointer) {.gcsafe.} = - if not(retFuture.finished()): - retFuture.complete() - emit(bus, event, data, loc) - callSoon(continuation) - return retFuture - -template emitWait*[T](bus: AsyncEventBus, event: string, - data: T): Future[void] {. - deprecated: "Implementation has unfixable flaws, please use " & - "AsyncEventQueue instead".} = - ## Emit new event ``event`` to the eventbus ``bus`` with payload ``data`` and - ## wait until all the subscribers/waiters will receive notification about - ## event. - emitWait(bus, event, data, getSrcLocation()) - proc `==`(a, b: EventQueueKey): bool {.borrow.} proc compact(ab: AsyncEventQueue) {.raises: [].} =