# # Chronos synchronization primitives # # (c) Copyright 2018-Present Eugene Kabanov # (c) Copyright 2018-Present Status Research & Development GmbH # # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) ## This module implements some core synchronization primitives. {.push raises: [].} import std/[sequtils, math, deques, tables, typetraits] import ./asyncloop export asyncloop type AsyncLock* = ref object of RootRef ## A primitive lock is a synchronization primitive that is not owned by ## a particular coroutine when locked. A primitive lock is in one of two ## states, ``locked`` or ``unlocked``. ## ## When more than one coroutine is blocked in ``acquire()`` waiting for ## the state to turn to unlocked, only one coroutine proceeds when a ## ``release()`` call resets the state to unlocked; first coroutine which ## is blocked in ``acquire()`` is being processed. locked: bool acquired: bool waiters: seq[Future[void]] AsyncEvent* = ref object of RootRef ## A primitive event object. ## ## An event manages a flag that can be set to `true` with the ``fire()`` ## procedure and reset to `false` with the ``clear()`` procedure. ## The ``wait()`` coroutine blocks until the flag is `false`. ## ## If more than one coroutine blocked in ``wait()`` waiting for event ## state to be signaled, when event get fired, then all coroutines ## continue proceeds in order, they have entered waiting state. flag: bool waiters: seq[Future[void]] AsyncQueue*[T] = ref object of RootRef ## A queue, useful for coordinating producer and consumer coroutines. ## ## If ``maxsize`` is less than or equal to zero, the queue size is ## infinite. If it is an integer greater than ``0``, then "await put()" ## will block when the queue reaches ``maxsize``, until an item is ## removed by "await get()". getters: seq[Future[void]] putters: seq[Future[void]] queue: Deque[T] maxsize: int AsyncQueueEmptyError* = object of AsyncError ## ``AsyncQueue`` is empty. AsyncQueueFullError* = object of AsyncError ## ``AsyncQueue`` is full. AsyncLockError* = object of AsyncError ## ``AsyncLock`` is either locked or unlocked. EventBusSubscription*[T] = proc(bus: AsyncEventBus, payload: EventPayload[T]): Future[void] {. gcsafe, raises: [].} ## EventBus subscription callback type. EventBusAllSubscription* = proc(bus: AsyncEventBus, event: AwaitableEvent): Future[void] {. gcsafe, raises: [].} ## EventBus subscription callback type. EventBusCallback = proc(bus: AsyncEventBus, event: string, key: EventBusKey, data: EventPayloadBase) {. gcsafe, raises: [].} EventBusKey* = object ## Unique subscription key. eventName: string typeName: string unique: uint64 cb: EventBusCallback EventItem = object waiters: seq[FutureBase] subscribers: seq[EventBusKey] AsyncEventBus* = ref object of RootObj ## An eventbus object. counter: uint64 events: Table[string, EventItem] subscribers: seq[EventBusKey] waiters: seq[Future[AwaitableEvent]] EventPayloadBase* = ref object of RootObj loc: ptr SrcLoc EventPayload*[T] = ref object of EventPayloadBase ## Eventbus' event payload object value: T AwaitableEvent* = object ## Eventbus' event payload object eventName: string payload: EventPayloadBase AsyncEventQueueFullError* = object of AsyncError EventQueueKey* = distinct uint64 EventQueueReader* = object key: EventQueueKey offset: int waiter: Future[void] overflow: bool AsyncEventQueue*[T] = ref object of RootObj readers: seq[EventQueueReader] queue: Deque[T] counter: uint64 limit: int offset: int proc newAsyncLock*(): AsyncLock = ## Creates new asynchronous lock ``AsyncLock``. ## ## Lock is created in the unlocked state. When the state is unlocked, ## ``acquire()`` changes the state to locked and returns immediately. ## When the state is locked, ``acquire()`` blocks until a call to ## ``release()`` in another coroutine changes it to unlocked. ## ## The ``release()`` procedure changes the state to unlocked and returns ## immediately. # Workaround for callSoon() not worked correctly before # getThreadDispatcher() call. discard getThreadDispatcher() AsyncLock(waiters: newSeq[Future[void]](), locked: false, acquired: false) proc wakeUpFirst(lock: AsyncLock): bool {.inline.} = ## Wake up the first waiter if it isn't done. var i = 0 var res = false while i < len(lock.waiters): var waiter = lock.waiters[i] inc(i) if not(waiter.finished()): waiter.complete() res = true break if i > 0: when compiles(lock.waiters.delete(0 .. (i - 1))): lock.waiters.delete(0 .. (i - 1)) else: lock.waiters.delete(0, i - 1) res proc checkAll(lock: AsyncLock): bool {.inline.} = ## Returns ``true`` if waiters array is empty or full of cancelled futures. for fut in lock.waiters.mitems(): if not(fut.cancelled()): return false return true proc acquire*(lock: AsyncLock) {.async.} = ## Acquire a lock ``lock``. ## ## This procedure blocks until the lock ``lock`` is unlocked, then sets it ## to locked and returns. if not(lock.locked) and lock.checkAll(): lock.acquired = true lock.locked = true else: var w = newFuture[void]("AsyncLock.acquire") lock.waiters.add(w) await w lock.acquired = true lock.locked = true proc locked*(lock: AsyncLock): bool = ## Return `true` if the lock ``lock`` is acquired, `false` otherwise. lock.locked proc release*(lock: AsyncLock) {.raises: [AsyncLockError].} = ## Release a lock ``lock``. ## ## When the ``lock`` is locked, reset it to unlocked, and return. If any ## other coroutines are blocked waiting for the lock to become unlocked, ## allow exactly one of them to proceed. if lock.locked: # We set ``lock.locked`` to ``false`` only when there no active waiters. # If active waiters are present, then ``lock.locked`` will be set to `true` # in ``acquire()`` procedure's continuation. if not(lock.acquired): raise newException(AsyncLockError, "AsyncLock was already released!") else: lock.acquired = false if not(lock.wakeUpFirst()): lock.locked = false else: raise newException(AsyncLockError, "AsyncLock is not acquired!") proc newAsyncEvent*(): AsyncEvent = ## Creates new asyncronous event ``AsyncEvent``. ## ## An event manages a flag that can be set to `true` with the `fire()` ## procedure and reset to `false` with the `clear()` procedure. ## The `wait()` procedure blocks until the flag is `true`. The flag is ## initially `false`. # Workaround for callSoon() not worked correctly before # getThreadDispatcher() call. discard getThreadDispatcher() AsyncEvent(waiters: newSeq[Future[void]](), flag: false) proc wait*(event: AsyncEvent): Future[void] = ## Block until the internal flag of ``event`` is `true`. ## If the internal flag is `true` on entry, return immediately. Otherwise, ## block until another task calls `fire()` to set the flag to `true`, ## then return. let retFuture = newFuture[void]("AsyncEvent.wait") proc cancellation(udata: pointer) {.gcsafe, raises: [].} = event.waiters.keepItIf(it != retFuture) if not(event.flag): retFuture.cancelCallback = cancellation event.waiters.add(retFuture) else: retFuture.complete() retFuture proc fire*(event: AsyncEvent) = ## Set the internal flag of ``event`` to `true`. All tasks waiting for it ## to become `true` are awakened. Task that call `wait()` once the flag is ## `true` will not block at all. if not(event.flag): event.flag = true for fut in event.waiters: if not(fut.finished()): # Could have been cancelled fut.complete() event.waiters.setLen(0) proc clear*(event: AsyncEvent) = ## Reset the internal flag of ``event`` to `false`. Subsequently, tasks ## calling `wait()` will block until `fire()` is called to set the internal ## flag to `true` again. event.flag = false proc isSet*(event: AsyncEvent): bool = ## Return `true` if and only if the internal flag of ``event`` is `true`. event.flag proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] = ## Creates a new asynchronous queue ``AsyncQueue``. # Workaround for callSoon() not worked correctly before # getThreadDispatcher() call. discard getThreadDispatcher() AsyncQueue[T]( getters: newSeq[Future[void]](), putters: newSeq[Future[void]](), queue: initDeque[T](), maxsize: maxsize ) proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} = var i = 0 while i < len(waiters): var waiter = waiters[i] inc(i) if not(waiter.finished()): waiter.complete() break if i > 0: when compiles(waiters.delete(0 .. (i - 1))): waiters.delete(0 .. (i - 1)) else: waiters.delete(0, i - 1) proc full*[T](aq: AsyncQueue[T]): bool {.inline.} = ## Return ``true`` if there are ``maxsize`` items in the queue. ## ## Note: If the ``aq`` was initialized with ``maxsize = 0`` (default), ## then ``full()`` is never ``true``. if aq.maxsize <= 0: false else: (len(aq.queue) >= aq.maxsize) proc empty*[T](aq: AsyncQueue[T]): bool {.inline.} = ## Return ``true`` if the queue is empty, ``false`` otherwise. (len(aq.queue) == 0) proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {. raises: [AsyncQueueFullError].}= ## Put an item ``item`` to the beginning of the queue ``aq`` immediately. ## ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised. if aq.full(): raise newException(AsyncQueueFullError, "AsyncQueue is full!") aq.queue.addFirst(item) aq.getters.wakeupNext() proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {. raises: [AsyncQueueFullError].}= ## Put an item ``item`` at the end of the queue ``aq`` immediately. ## ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised. if aq.full(): raise newException(AsyncQueueFullError, "AsyncQueue is full!") aq.queue.addLast(item) aq.getters.wakeupNext() proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {. raises: [AsyncQueueEmptyError].} = ## Get an item from the beginning of the queue ``aq`` immediately. ## ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. if aq.empty(): raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") let res = aq.queue.popFirst() aq.putters.wakeupNext() res proc popLastNoWait*[T](aq: AsyncQueue[T]): T {. raises: [AsyncQueueEmptyError].} = ## Get an item from the end of the queue ``aq`` immediately. ## ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. if aq.empty(): raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") let res = aq.queue.popLast() aq.putters.wakeupNext() res proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async.} = ## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full, ## wait until a free slot is available before adding item. while aq.full(): var putter = newFuture[void]("AsyncQueue.addFirst") aq.putters.add(putter) try: await putter except CatchableError as exc: if not(aq.full()) and not(putter.cancelled()): aq.putters.wakeupNext() raise exc aq.addFirstNoWait(item) proc addLast*[T](aq: AsyncQueue[T], item: T) {.async.} = ## Put an ``item`` to the end of the queue ``aq``. If the queue is full, ## wait until a free slot is available before adding item. while aq.full(): var putter = newFuture[void]("AsyncQueue.addLast") aq.putters.add(putter) try: await putter except CatchableError as exc: if not(aq.full()) and not(putter.cancelled()): aq.putters.wakeupNext() raise exc aq.addLastNoWait(item) proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async.} = ## Remove and return an ``item`` from the beginning of the queue ``aq``. ## If the queue is empty, wait until an item is available. while aq.empty(): var getter = newFuture[void]("AsyncQueue.popFirst") aq.getters.add(getter) try: await getter except CatchableError as exc: if not(aq.empty()) and not(getter.cancelled()): aq.getters.wakeupNext() raise exc return aq.popFirstNoWait() proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async.} = ## Remove and return an ``item`` from the end of the queue ``aq``. ## If the queue is empty, wait until an item is available. while aq.empty(): var getter = newFuture[void]("AsyncQueue.popLast") aq.getters.add(getter) try: await getter except CatchableError as exc: if not(aq.empty()) and not(getter.cancelled()): aq.getters.wakeupNext() raise exc return aq.popLastNoWait() proc putNoWait*[T](aq: AsyncQueue[T], item: T) {. raises: [AsyncQueueFullError].} = ## Alias of ``addLastNoWait()``. aq.addLastNoWait(item) proc getNoWait*[T](aq: AsyncQueue[T]): T {. raises: [AsyncQueueEmptyError].} = ## Alias of ``popFirstNoWait()``. aq.popFirstNoWait() proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.inline.} = ## Alias of ``addLast()``. aq.addLast(item) proc get*[T](aq: AsyncQueue[T]): Future[T] {.inline.} = ## Alias of ``popFirst()``. aq.popFirst() proc clear*[T](aq: AsyncQueue[T]) {.inline.} = ## Clears all elements of queue ``aq``. aq.queue.clear() proc len*[T](aq: AsyncQueue[T]): int {.inline.} = ## Return the number of elements in ``aq``. len(aq.queue) proc size*[T](aq: AsyncQueue[T]): int {.inline.} = ## Return the maximum number of elements in ``aq``. len(aq.maxsize) proc `[]`*[T](aq: AsyncQueue[T], i: Natural) : T {.inline.} = ## Access the i-th element of ``aq`` by order from first to last. ## ``aq[0]`` is the first element, ``aq[^1]`` is the last element. aq.queue[i] proc `[]`*[T](aq: AsyncQueue[T], i: BackwardsIndex) : T {.inline.} = ## Access the i-th element of ``aq`` by order from first to last. ## ``aq[0]`` is the first element, ``aq[^1]`` is the last element. aq.queue[len(aq.queue) - int(i)] proc `[]=`* [T](aq: AsyncQueue[T], i: Natural, item: T) {.inline.} = ## Change the i-th element of ``aq``. aq.queue[i] = item proc `[]=`* [T](aq: AsyncQueue[T], i: BackwardsIndex, item: T) {.inline.} = ## Change the i-th element of ``aq``. aq.queue[len(aq.queue) - int(i)] = item iterator items*[T](aq: AsyncQueue[T]): T {.inline.} = ## Yield every element of ``aq``. for item in aq.queue.items(): yield item iterator mitems*[T](aq: AsyncQueue[T]): var T {.inline.} = ## Yield every element of ``aq``. for mitem in aq.queue.mitems(): yield mitem iterator pairs*[T](aq: AsyncQueue[T]): tuple[key: int, val: T] {.inline.} = ## Yield every (position, value) of ``aq``. for pair in aq.queue.pairs(): yield pair proc contains*[T](aq: AsyncQueue[T], item: T): bool {.inline.} = ## Return true if ``item`` is in ``aq`` or false if not found. Usually used ## via the ``in`` operator. for e in aq.queue.items(): if e == item: return true return false proc `$`*[T](aq: AsyncQueue[T]): string = ## Turn an async queue ``aq`` into its string representation. var res = "[" for item in aq.queue.items(): if len(res) > 1: res.add(", ") res.addQuoted(item) res.add("]") res template generateKey(typeName, eventName: string): string = "type[" & typeName & "]-key[" & eventName & "]" proc newAsyncEventBus*(): AsyncEventBus {. deprecated: "Implementation has unfixable flaws, please use" & "AsyncEventQueue[T] instead".} = ## Creates new ``AsyncEventBus``. AsyncEventBus(counter: 0'u64, events: initTable[string, EventItem]()) template get*[T](payload: EventPayload[T]): T = ## Returns event payload data. payload.value template location*(payload: EventPayloadBase): SrcLoc = ## Returns source location address of event emitter. payload.loc[] proc get*(event: AwaitableEvent, T: typedesc): T {. deprecated: "Implementation has unfixable flaws, please use " & "AsyncEventQueue[T] instead".} = ## Returns event's payload of type ``T`` from event ``event``. cast[EventPayload[T]](event.payload).value template event*(event: AwaitableEvent): string = ## Returns event's name from event ``event``. event.eventName template location*(event: AwaitableEvent): SrcLoc = ## Returns source location address of event emitter. event.payload.loc[] proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] {. deprecated: "Implementation has unfixable flaws, please use " & "AsyncEventQueue[T] instead".} = ## Wait for the event from AsyncEventBus ``bus`` with name ``event``. ## ## Returned ``Future[T]`` will hold event's payload of type ``T``. var default: EventItem var retFuture = newFuture[T]("AsyncEventBus.waitEvent") let eventKey = generateKey(T.name, event) proc cancellation(udata: pointer) {.gcsafe, raises: [].} = if not(retFuture.finished()): bus.events.withValue(eventKey, item): item.waiters.keepItIf(it != cast[FutureBase](retFuture)) retFuture.cancelCallback = cancellation let baseFuture = cast[FutureBase](retFuture) bus.events.mgetOrPut(eventKey, default).waiters.add(baseFuture) retFuture proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] {. deprecated: "Implementation has unfixable flaws, please use " & "AsyncEventQueue[T] instead".} = ## Wait for any event from AsyncEventBus ``bus``. ## ## Returns ``Future`` which holds helper object. Using this object you can ## retrieve event's name and payload. var retFuture = newFuture[AwaitableEvent]("AsyncEventBus.waitAllEvents") proc cancellation(udata: pointer) {.gcsafe, raises: [].} = if not(retFuture.finished()): bus.waiters.keepItIf(it != retFuture) retFuture.cancelCallback = cancellation bus.waiters.add(retFuture) retFuture proc subscribe*[T](bus: AsyncEventBus, event: string, callback: EventBusSubscription[T]): EventBusKey {. deprecated: "Implementation has unfixable flaws, please use " & "AsyncEventQueue[T] instead".} = ## Subscribe to the event ``event`` passed through eventbus ``bus`` with ## callback ``callback``. ## ## Returns key that can be used to unsubscribe. proc trampoline(tbus: AsyncEventBus, event: string, key: EventBusKey, data: EventPayloadBase) {.gcsafe, raises: [].} = let payload = cast[EventPayload[T]](data) asyncSpawn callback(bus, payload) let subkey = block: inc(bus.counter) EventBusKey(eventName: event, typeName: T.name, unique: bus.counter, cb: trampoline) var default: EventItem let eventKey = generateKey(T.name, event) bus.events.mgetOrPut(eventKey, default).subscribers.add(subkey) subkey proc subscribeAll*(bus: AsyncEventBus, callback: EventBusAllSubscription): EventBusKey {. deprecated: "Implementation has unfixable flaws, please use " & "AsyncEventQueue instead".} = ## Subscribe to all events passed through eventbus ``bus`` with callback ## ``callback``. ## ## Returns key that can be used to unsubscribe. proc trampoline(tbus: AsyncEventBus, event: string, key: EventBusKey, data: EventPayloadBase) {.gcsafe, raises: [].} = let event = AwaitableEvent(eventName: event, payload: data) asyncSpawn callback(bus, event) let subkey = block: inc(bus.counter) EventBusKey(eventName: "", typeName: "", unique: bus.counter, cb: trampoline) bus.subscribers.add(subkey) subkey proc unsubscribe*(bus: AsyncEventBus, key: EventBusKey) {. deprecated: "Implementation has unfixable flaws, please use " & "AsyncEventQueue instead".} = ## Cancel subscription of subscriber with key ``key`` from eventbus ``bus``. let eventKey = generateKey(key.typeName, key.eventName) # Clean event's subscribers. bus.events.withValue(eventKey, item): item.subscribers.keepItIf(it.unique != key.unique) # Clean subscribers subscribed to all events. bus.subscribers.keepItIf(it.unique != key.unique) proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) = let eventKey = generateKey(T.name, event) payload = block: var data = EventPayload[T](value: data, loc: loc) cast[EventPayloadBase](data) # Used to capture the "subscriber" variable in the loops # sugar.capture doesn't work in Nim <1.6 proc triggerSubscriberCallback(subscriber: EventBusKey) = callSoon(proc(udata: pointer) = subscriber.cb(bus, event, subscriber, payload) ) bus.events.withValue(eventKey, item): # Schedule waiters which are waiting for the event ``event``. for waiter in item.waiters: var fut = cast[Future[T]](waiter) fut.complete(data) # Clear all the waiters. item.waiters.setLen(0) # Schedule subscriber's callbacks, which are subscribed to the event. for subscriber in item.subscribers: triggerSubscriberCallback(subscriber) # Schedule waiters which are waiting all events for waiter in bus.waiters: waiter.complete(AwaitableEvent(eventName: event, payload: payload)) # Clear all the waiters. bus.waiters.setLen(0) # Schedule subscriber's callbacks which are subscribed to all events. for subscriber in bus.subscribers: triggerSubscriberCallback(subscriber) template emit*[T](bus: AsyncEventBus, event: string, data: T) {. deprecated: "Implementation has unfixable flaws, please use " & "AsyncEventQueue instead".} = ## Emit new event ``event`` to the eventbus ``bus`` with payload ``data``. emit(bus, event, data, getSrcLocation()) proc emitWait[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc): Future[void] = var retFuture = newFuture[void]("AsyncEventBus.emitWait") proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): retFuture.complete() emit(bus, event, data, loc) callSoon(continuation) return retFuture template emitWait*[T](bus: AsyncEventBus, event: string, data: T): Future[void] {. deprecated: "Implementation has unfixable flaws, please use " & "AsyncEventQueue instead".} = ## Emit new event ``event`` to the eventbus ``bus`` with payload ``data`` and ## wait until all the subscribers/waiters will receive notification about ## event. emitWait(bus, event, data, getSrcLocation()) proc `==`(a, b: EventQueueKey): bool {.borrow.} proc compact(ab: AsyncEventQueue) {.raises: [].} = if len(ab.readers) > 0: let minOffset = block: var res = -1 for reader in ab.readers.items(): if not(reader.overflow): res = reader.offset break res if minOffset == -1: ab.offset += len(ab.queue) ab.queue.clear() else: doAssert(minOffset >= ab.offset) if minOffset > ab.offset: let delta = minOffset - ab.offset ab.queue.shrink(fromFirst = delta) ab.offset += delta else: ab.queue.clear() proc getReaderIndex(ab: AsyncEventQueue, key: EventQueueKey): int {. raises: [].} = for index, value in ab.readers.pairs(): if value.key == key: return index -1 proc newAsyncEventQueue*[T](limitSize = 0): AsyncEventQueue[T] {. raises: [].} = ## Creates new ``AsyncEventBus`` maximum size of ``limitSize`` (default is ## ``0`` which means that there no limits). ## ## When number of events emitted exceeds ``limitSize`` - emit() procedure ## will discard new events, consumers which has number of pending events ## more than ``limitSize`` will get ``AsyncEventQueueFullError`` ## error. doAssert(limitSize >= 0, "Limit size should be non-negative integer") let queue = if limitSize == 0: initDeque[T]() elif isPowerOfTwo(limitSize + 1): initDeque[T](limitSize + 1) else: initDeque[T](nextPowerOfTwo(limitSize + 1)) AsyncEventQueue[T](counter: 0'u64, queue: queue, limit: limitSize) proc len*(ab: AsyncEventQueue): int {.raises: [].} = len(ab.queue) proc register*(ab: AsyncEventQueue): EventQueueKey {.raises: [].} = inc(ab.counter) let reader = EventQueueReader(key: EventQueueKey(ab.counter), offset: ab.offset + len(ab.queue), overflow: false) ab.readers.add(reader) EventQueueKey(ab.counter) proc unregister*(ab: AsyncEventQueue, key: EventQueueKey) {. raises: [] .} = let index = ab.getReaderIndex(key) if index >= 0: let reader = ab.readers[index] # Completing pending Future to avoid deadlock. if not(isNil(reader.waiter)) and not(reader.waiter.finished()): reader.waiter.complete() ab.readers.delete(index) ab.compact() proc close*(ab: AsyncEventQueue) {.raises: [].} = for reader in ab.readers.items(): if not(isNil(reader.waiter)) and not(reader.waiter.finished()): reader.waiter.complete() ab.readers.reset() ab.queue.clear() proc closeWait*(ab: AsyncEventQueue): Future[void] {.raises: [].} = let retFuture = newFuture[void]("AsyncEventQueue.closeWait()", {FutureFlag.OwnCancelSchedule}) proc continuation(udata: pointer) {.gcsafe.} = retFuture.complete() proc cancellation(udata: pointer) {.gcsafe.} = # We are not going to change the state of `retFuture` to cancelled, so we # will prevent the entire sequence of Futures from being cancelled. discard ab.close() # Schedule `continuation` to be called only after all the `reader` # notifications will be scheduled and processed. retFuture.cancelCallback = cancellation callSoon(continuation) retFuture template readerOverflow*(ab: AsyncEventQueue, reader: EventQueueReader): bool = ab.limit + (reader.offset - ab.offset) <= len(ab.queue) proc emit*[T](ab: AsyncEventQueue[T], data: T) {.raises: [].} = if len(ab.readers) > 0: # We enqueue `data` only if there active reader present. var changesPresent = false let couldEmit = if ab.limit == 0: true else: # Because ab.readers is sequence sorted by `offset`, we will apply our # limit to the most recent consumer. if ab.readerOverflow(ab.readers[^1]): false else: true if couldEmit: if ab.limit != 0: for reader in ab.readers.mitems(): if not(reader.overflow): if ab.readerOverflow(reader): reader.overflow = true changesPresent = true ab.queue.addLast(data) for reader in ab.readers.mitems(): if not(isNil(reader.waiter)) and not(reader.waiter.finished()): reader.waiter.complete() else: for reader in ab.readers.mitems(): if not(reader.overflow): reader.overflow = true changesPresent = true if changesPresent: ab.compact() proc waitEvents*[T](ab: AsyncEventQueue[T], key: EventQueueKey, eventsCount = -1): Future[seq[T]] {.async.} = ## Wait for events var events: seq[T] resetFuture = false while true: # We need to obtain reader index at every iteration, because `ab.readers` # sequence could be changed after `await waitFuture` call. let index = ab.getReaderIndex(key) if index < 0: # We going to return everything we have in `events`. break if resetFuture: resetFuture = false ab.readers[index].waiter = nil let reader = ab.readers[index] doAssert(isNil(reader.waiter), "Concurrent waits on same key are not allowed!") if reader.overflow: raise newException(AsyncEventQueueFullError, "AsyncEventQueue size exceeds limits") let length = len(ab.queue) + ab.offset doAssert(length >= ab.readers[index].offset) if length == ab.readers[index].offset: # We are at the end of queue, it means that we should wait for new events. let waitFuture = newFuture[void]("AsyncEventQueue.waitEvents") ab.readers[index].waiter = waitFuture resetFuture = true await waitFuture else: let itemsInQueue = length - ab.readers[index].offset itemsOffset = ab.readers[index].offset - ab.offset itemsCount = if eventsCount <= 0: itemsInQueue else: min(itemsInQueue, eventsCount - len(events)) for i in 0 ..< itemsCount: events.add(ab.queue[itemsOffset + i]) ab.readers[index].offset += itemsCount # Keep readers sequence sorted by `offset` field. var slider = index while (slider + 1 < len(ab.readers)) and (ab.readers[slider].offset > ab.readers[slider + 1].offset): swap(ab.readers[slider], ab.readers[slider + 1]) inc(slider) # Shrink data queue. ab.compact() if (eventsCount <= 0) or (len(events) == eventsCount): break return events