diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index 824c0df3..4d782beb 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -12,7 +12,7 @@ {.push raises: [Defect].} -import std/[sequtils, deques, tables, typetraits] +import std/[sequtils, math, deques, tables, typetraits] import ./asyncloop export asyncloop @@ -55,11 +55,11 @@ type queue: Deque[T] maxsize: int - AsyncQueueEmptyError* = object of CatchableError + AsyncQueueEmptyError* = object of AsyncError ## ``AsyncQueue`` is empty. - AsyncQueueFullError* = object of CatchableError + AsyncQueueFullError* = object of AsyncError ## ``AsyncQueue`` is full. - AsyncLockError* = object of CatchableError + AsyncLockError* = object of AsyncError ## ``AsyncLock`` is either locked or unlocked. EventBusSubscription*[T] = proc(bus: AsyncEventBus, @@ -106,6 +106,23 @@ type eventName: string payload: EventPayloadBase + AsyncEventQueueFullError* = object of AsyncError + + EventQueueKey* = distinct uint64 + + EventQueueReader* = object + key: EventQueueKey + offset: int + waiter: Future[void] + overflow: bool + + AsyncEventQueue*[T] = ref object of RootObj + readers: seq[EventQueueReader] + queue: Deque[T] + counter: uint64 + limit: int + offset: int + proc newAsyncLock*(): AsyncLock = ## Creates new asynchronous lock ``AsyncLock``. ## @@ -448,7 +465,9 @@ proc `$`*[T](aq: AsyncQueue[T]): string = template generateKey(typeName, eventName: string): string = "type[" & typeName & "]-key[" & eventName & "]" -proc newAsyncEventBus*(): AsyncEventBus = +proc newAsyncEventBus*(): AsyncEventBus {. + deprecated: "Implementation has unfixable flaws, please use" & + "AsyncEventQueue[T] instead".} = ## Creates new ``AsyncEventBus``. AsyncEventBus(counter: 0'u64, events: initTable[string, EventItem]()) @@ -460,7 +479,9 @@ template location*(payload: EventPayloadBase): SrcLoc = ## Returns source location address of event emitter. payload.loc[] -proc get*(event: AwaitableEvent, T: typedesc): T = +proc get*(event: AwaitableEvent, T: typedesc): T {. + deprecated: "Implementation has unfixable flaws, please use " & + "AsyncEventQueue[T] instead".} = ## Returns event's payload of type ``T`` from event ``event``. cast[EventPayload[T]](event.payload).value @@ -472,7 +493,9 @@ template location*(event: AwaitableEvent): SrcLoc = ## Returns source location address of event emitter. event.payload.loc[] -proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] = +proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] {. + deprecated: "Implementation has unfixable flaws, please use " & + "AsyncEventQueue[T] instead".} = ## Wait for the event from AsyncEventBus ``bus`` with name ``event``. ## ## Returned ``Future[T]`` will hold event's payload of type ``T``. @@ -488,7 +511,9 @@ proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] = bus.events.mgetOrPut(eventKey, default).waiters.add(baseFuture) retFuture -proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] = +proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] {. + deprecated: "Implementation has unfixable flaws, please use " & + "AsyncEventQueue[T] instead".} = ## Wait for any event from AsyncEventBus ``bus``. ## ## Returns ``Future`` which holds helper object. Using this object you can @@ -502,7 +527,9 @@ proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] = retFuture proc subscribe*[T](bus: AsyncEventBus, event: string, - callback: EventBusSubscription[T]): EventBusKey = + callback: EventBusSubscription[T]): EventBusKey {. + deprecated: "Implementation has unfixable flaws, please use " & + "AsyncEventQueue[T] instead".} = ## Subscribe to the event ``event`` passed through eventbus ``bus`` with ## callback ``callback``. ## @@ -524,7 +551,9 @@ proc subscribe*[T](bus: AsyncEventBus, event: string, subkey proc subscribeAll*(bus: AsyncEventBus, - callback: EventBusAllSubscription): EventBusKey = + callback: EventBusAllSubscription): EventBusKey {. + deprecated: "Implementation has unfixable flaws, please use " & + "AsyncEventQueue instead".} = ## Subscribe to all events passed through eventbus ``bus`` with callback ## ``callback``. ## @@ -542,7 +571,9 @@ proc subscribeAll*(bus: AsyncEventBus, bus.subscribers.add(subkey) subkey -proc unsubscribe*(bus: AsyncEventBus, key: EventBusKey) = +proc unsubscribe*(bus: AsyncEventBus, key: EventBusKey) {. + deprecated: "Implementation has unfixable flaws, please use " & + "AsyncEventQueue instead".} = ## Cancel subscription of subscriber with key ``key`` from eventbus ``bus``. let eventKey = generateKey(key.typeName, key.eventName) @@ -590,7 +621,9 @@ proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) = for subscriber in bus.subscribers: triggerSubscriberCallback(subscriber) -template emit*[T](bus: AsyncEventBus, event: string, data: T) = +template emit*[T](bus: AsyncEventBus, event: string, data: T) {. + deprecated: "Implementation has unfixable flaws, please use " & + "AsyncEventQueue instead".} = ## Emit new event ``event`` to the eventbus ``bus`` with payload ``data``. emit(bus, event, data, getSrcLocation()) @@ -605,8 +638,205 @@ proc emitWait[T](bus: AsyncEventBus, event: string, data: T, return retFuture template emitWait*[T](bus: AsyncEventBus, event: string, - data: T): Future[void] = + data: T): Future[void] {. + deprecated: "Implementation has unfixable flaws, please use " & + "AsyncEventQueue instead".} = ## Emit new event ``event`` to the eventbus ``bus`` with payload ``data`` and ## wait until all the subscribers/waiters will receive notification about ## event. emitWait(bus, event, data, getSrcLocation()) + +proc `==`(a, b: EventQueueKey): bool {.borrow.} + +proc compact(ab: AsyncEventQueue) {.raises: [Defect].} = + if len(ab.readers) > 0: + let minOffset = + block: + var res = -1 + for reader in ab.readers.items(): + if not(reader.overflow): + res = reader.offset + break + res + + if minOffset == -1: + ab.offset += len(ab.queue) + ab.queue.clear() + else: + doAssert(minOffset >= ab.offset) + if minOffset > ab.offset: + let delta = minOffset - ab.offset + ab.queue.shrink(fromFirst = delta) + ab.offset += delta + else: + ab.queue.clear() + +proc getReaderIndex(ab: AsyncEventQueue, key: EventQueueKey): int {. + raises: [Defect].} = + for index, value in ab.readers.pairs(): + if value.key == key: + return index + -1 + +proc newAsyncEventQueue*[T](limitSize = 0): AsyncEventQueue[T] {. + raises: [Defect].} = + ## Creates new ``AsyncEventBus`` maximum size of ``limitSize`` (default is + ## ``0`` which means that there no limits). + ## + ## When number of events emitted exceeds ``limitSize`` - emit() procedure + ## will discard new events, consumers which has number of pending events + ## more than ``limitSize`` will get ``AsyncEventQueueFullError`` + ## error. + doAssert(limitSize >= 0, "Limit size should be non-negative integer") + let queue = + if limitSize == 0: + initDeque[T]() + elif isPowerOfTwo(limitSize + 1): + initDeque[T](limitSize + 1) + else: + initDeque[T](nextPowerOfTwo(limitSize + 1)) + AsyncEventQueue[T](counter: 0'u64, queue: queue, limit: limitSize) + +proc len*(ab: AsyncEventQueue): int {.raises: [Defect].} = + len(ab.queue) + +proc register*(ab: AsyncEventQueue): EventQueueKey {.raises: [Defect].} = + inc(ab.counter) + let reader = EventQueueReader(key: EventQueueKey(ab.counter), + offset: ab.offset + len(ab.queue), + overflow: false) + ab.readers.add(reader) + EventQueueKey(ab.counter) + +proc unregister*(ab: AsyncEventQueue, key: EventQueueKey) {. + raises: [Defect] .} = + let index = ab.getReaderIndex(key) + if index >= 0: + let reader = ab.readers[index] + # Completing pending Future to avoid deadlock. + if not(isNil(reader.waiter)) and not(reader.waiter.finished()): + reader.waiter.complete() + ab.readers.delete(index) + ab.compact() + +proc close*(ab: AsyncEventQueue) {.raises: [Defect].} = + for reader in ab.readers.items(): + if not(isNil(reader.waiter)) and not(reader.waiter.finished()): + reader.waiter.complete() + ab.readers.reset() + ab.queue.clear() + +proc closeWait*(ab: AsyncEventQueue): Future[void] {.raises: [Defect].} = + var retFuture = newFuture[void]("AsyncEventQueue.closeWait()") + proc continuation(udata: pointer) {.gcsafe.} = + if not(retFuture.finished()): + retFuture.complete() + ab.close() + # Schedule `continuation` to be called only after all the `reader` + # notifications will be scheduled and processed. + callSoon(continuation) + retFuture + +template readerOverflow*(ab: AsyncEventQueue, + reader: EventQueueReader): bool = + ab.limit + (reader.offset - ab.offset) <= len(ab.queue) + +proc emit*[T](ab: AsyncEventQueue[T], data: T) {.raises: [Defect].} = + if len(ab.readers) > 0: + # We enqueue `data` only if there active reader present. + var changesPresent = false + let couldEmit = + if ab.limit == 0: + true + else: + # Because ab.readers is sequence sorted by `offset`, we will apply our + # limit to the most recent consumer. + if ab.readerOverflow(ab.readers[^1]): + false + else: + true + + if couldEmit: + if ab.limit != 0: + for reader in ab.readers.mitems(): + if not(reader.overflow): + if ab.readerOverflow(reader): + reader.overflow = true + changesPresent = true + ab.queue.addLast(data) + for reader in ab.readers.mitems(): + if not(isNil(reader.waiter)) and not(reader.waiter.finished()): + reader.waiter.complete() + else: + for reader in ab.readers.mitems(): + if not(reader.overflow): + reader.overflow = true + changesPresent = true + + if changesPresent: + ab.compact() + +proc waitEvents*[T](ab: AsyncEventQueue[T], + key: EventQueueKey, + eventsCount = -1): Future[seq[T]] {.async.} = + ## Wait for events + var + events: seq[T] + resetFuture = false + + while true: + # We need to obtain reader index at every iteration, because `ab.readers` + # sequence could be changed after `await waitFuture` call. + let index = ab.getReaderIndex(key) + if index < 0: + # We going to return everything we have in `events`. + break + + if resetFuture: + resetFuture = false + ab.readers[index].waiter = nil + + let reader = ab.readers[index] + doAssert(isNil(reader.waiter), + "Concurrent waits on same key are not allowed!") + + if reader.overflow: + raise newException(AsyncEventQueueFullError, + "AsyncEventQueue size exceeds limits") + + let length = len(ab.queue) + ab.offset + doAssert(length >= ab.readers[index].offset) + if length == ab.readers[index].offset: + # We are at the end of queue, it means that we should wait for new events. + let waitFuture = newFuture[void]("AsyncEventQueue.waitEvents") + ab.readers[index].waiter = waitFuture + resetFuture = true + await waitFuture + else: + let + itemsInQueue = length - ab.readers[index].offset + itemsOffset = ab.readers[index].offset - ab.offset + itemsCount = + if eventsCount <= 0: + itemsInQueue + else: + min(itemsInQueue, eventsCount - len(events)) + + for i in 0 ..< itemsCount: + events.add(ab.queue[itemsOffset + i]) + ab.readers[index].offset += itemsCount + + # Keep readers sequence sorted by `offset` field. + var slider = index + while (slider + 1 < len(ab.readers)) and + (ab.readers[slider].offset > ab.readers[slider + 1].offset): + swap(ab.readers[slider], ab.readers[slider + 1]) + inc(slider) + + # Shrink data queue. + ab.compact() + + if (eventsCount <= 0) or (len(events) == eventsCount): + break + + return events diff --git a/tests/testsync.nim b/tests/testsync.nim index e1e0491f..9acea50d 100644 --- a/tests/testsync.nim +++ b/tests/testsync.nim @@ -352,182 +352,534 @@ suite "Asynchronous sync primitives test suite": check test8() == true test "AsyncQueue() contains test": check test9() == true - test "AsyncEventBus() awaitable primitives test": - const TestsCount = 10 - var bus = newAsyncEventBus() - var flag = "" - proc waiter(bus: AsyncEventBus) {.async.} = - for i in 0 ..< TestsCount: - let payload = await bus.waitEvent(string, "event") - flag = flag & payload + test "AsyncEventQueue() behavior test": + let eventQueue = newAsyncEventQueue[int]() + let key = eventQueue.register() + eventQueue.emit(100) + eventQueue.emit(200) + eventQueue.emit(300) - proc sender(bus: AsyncEventBus) {.async.} = - for i in 0 ..< (TestsCount + (TestsCount div 2)): - await bus.emitWait("event", $i) - - waitFor allFutures(waiter(bus), sender(bus)) - check flag == "0123456789" - test "AsyncEventBus() waiters test": - var bus = newAsyncEventBus() - let fut11 = bus.waitEvent(int, "event") - let fut12 = bus.waitEvent(int, "event") - let fut13 = bus.waitEvent(int, "event") - let fut21 = bus.waitEvent(string, "event") - let fut22 = bus.waitEvent(string, "event") - let fut23 = bus.waitEvent(string, "event") - bus.emit("event", 65535) - check: - fut11.done() == true - fut12.done() == true - fut13.done() == true - fut21.finished() == false - fut22.finished() == false - fut23.finished() == false - bus.emit("event", "data") - check: - fut21.done() == true - fut22.done() == true - fut23.done() == true - test "AsyncEventBus() subscribers test": - const TestsCount = 10 - var bus = newAsyncEventBus() - var flagInt = 0 - var flagStr = "" - proc eventIntCallback(bus: AsyncEventBus, - payload: EventPayload[int]) {.async.} = - flagInt = payload.get() - proc eventStrCallback(bus: AsyncEventBus, - payload: EventPayload[string]) {.async.} = - flagStr = payload.get() - - let key1 = bus.subscribe("event", eventIntCallback) - let key2 = bus.subscribe("event", eventStrCallback) - - proc test() {.async.} = + proc test1() = + let dataFut = eventQueue.waitEvents(key) check: - flagInt == 0 - flagStr == "" - for i in 0 ..< TestsCount: - await bus.emitWait("event", i) - check: - flagInt == i - flagStr == "" - flagInt = 0 - for i in 0 ..< TestsCount: - await bus.emitWait("event", $i) - check: - flagInt == 0 - flagStr == $i - flagInt = 0 - flagStr = "" - bus.unsubscribe(key1) - for i in 0 ..< TestsCount: - await bus.emitWait("event", i) - check: - flagInt == 0 - flagStr == "" - flagInt = 0 - flagStr = "" - bus.unsubscribe(key2) - for i in 0 ..< TestsCount: - await bus.emitWait("event", $i) - check: - flagInt == 0 - flagStr == "" - waitFor(test()) - test "AsyncEventBus() waiters for all events test": - var bus = newAsyncEventBus() - let fut11 = bus.waitAllEvents() - let fut12 = bus.waitAllEvents() - bus.emit("intevent", 65535) - check: - fut11.done() == true - fut12.done() == true - let event11 = fut11.read() - let event12 = fut12.read() - check: - event11.event() == "intevent" - event12.event() == "intevent" - event11.get(int) == 65535 - event12.get(int) == 65535 + dataFut.finished() == true + dataFut.read() == @[100, 200, 300] - let fut21 = bus.waitAllEvents() - let fut22 = bus.waitAllEvents() - bus.emit("strevent", "hello") - check: - fut21.done() == true - fut22.done() == true - let event21 = fut21.read() - let event22 = fut22.read() - check: - event21.event() == "strevent" - event22.event() == "strevent" - event21.get(string) == "hello" - event22.get(string) == "hello" - test "AsyncEventBus() subscribers to all events test": - const TestsCount = 10 - var - bus = newAsyncEventBus() - flagInt = 0 - flagStr = "" + proc test2() = + let dataFut = eventQueue.waitEvents(key) + check: + dataFut.finished() == false + eventQueue.emit(400) + eventQueue.emit(500) + poll() + check: + dataFut.finished() == true + dataFut.read() == @[400, 500] - proc eventCallback(bus: AsyncEventBus, event: AwaitableEvent) {.async.} = - case event.event() - of "event1": - flagStr = "" - flagInt = event.get(int) - of "event2": - flagInt = 0 - flagStr = event.get(string) - else: - flagInt = -1 - flagStr = "error" + test1() + test2() + waitFor eventQueue.closeWait() + test "AsyncEventQueue() concurrency test": + let eventQueue = newAsyncEventQueue[int]() + let key0 = eventQueue.register() + let key1 = eventQueue.register() + eventQueue.emit(100) + let key2 = eventQueue.register() + eventQueue.emit(200) + eventQueue.emit(300) + let key3 = eventQueue.register() + eventQueue.emit(400) + eventQueue.emit(500) + eventQueue.emit(600) + let key4 = eventQueue.register() + eventQueue.emit(700) + eventQueue.emit(800) + eventQueue.emit(900) + eventQueue.emit(1000) + let key5 = eventQueue.register() + let key6 = eventQueue.register() + + let dataFut1 = eventQueue.waitEvents(key1) + let dataFut2 = eventQueue.waitEvents(key2) + let dataFut3 = eventQueue.waitEvents(key3) + let dataFut4 = eventQueue.waitEvents(key4) + let dataFut5 = eventQueue.waitEvents(key5) + let dataFut6 = eventQueue.waitEvents(key6) + check: + dataFut1.finished() == true + dataFut1.read() == @[100, 200, 300, 400, 500, 600, 700, 800, 900, 1000] + dataFut2.finished() == true + dataFut2.read() == @[200, 300, 400, 500, 600, 700, 800, 900, 1000] + dataFut3.finished() == true + dataFut3.read() == @[400, 500, 600, 700, 800, 900, 1000] + dataFut4.finished() == true + dataFut4.read() == @[700, 800, 900, 1000] + dataFut5.finished() == false + dataFut6.finished() == false + + eventQueue.emit(2000) + poll() + let dataFut0 = eventQueue.waitEvents(key0) + check: + dataFut5.finished() == true + dataFut5.read() == @[2000] + dataFut6.finished() == true + dataFut6.read() == @[2000] + dataFut0.finished() == true + dataFut0.read() == @[100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, + 2000] + + waitFor eventQueue.closeWait() + + test "AsyncEventQueue() specific number test": + let eventQueue = newAsyncEventQueue[int]() + let key = eventQueue.register() + + let dataFut1 = eventQueue.waitEvents(key, 1) + eventQueue.emit(100) + eventQueue.emit(200) + eventQueue.emit(300) + eventQueue.emit(400) + check dataFut1.finished() == false + poll() + check: + dataFut1.finished() == true + dataFut1.read() == @[100] + + let dataFut2 = eventQueue.waitEvents(key, 2) + check: + dataFut2.finished() == true + dataFut2.read() == @[200, 300] + + let dataFut3 = eventQueue.waitEvents(key, 5) + check dataFut3.finished() == false + eventQueue.emit(500) + eventQueue.emit(600) + eventQueue.emit(700) + eventQueue.emit(800) + check dataFut3.finished() == false + poll() + check: + dataFut3.finished() == true + dataFut3.read() == @[400, 500, 600, 700, 800] + + let dataFut4 = eventQueue.waitEvents(key, -1) + check dataFut4.finished() == false + eventQueue.emit(900) + eventQueue.emit(1000) + eventQueue.emit(1100) + eventQueue.emit(1200) + eventQueue.emit(1300) + eventQueue.emit(1400) + eventQueue.emit(1500) + eventQueue.emit(1600) + check dataFut4.finished() == false + poll() + check: + dataFut4.finished() == true + dataFut4.read() == @[900, 1000, 1100, 1200, 1300, 1400, 1500, 1600] + + waitFor eventQueue.closeWait() + + test "AsyncEventQueue() register()/unregister() test": + var emptySeq: seq[int] + let eventQueue = newAsyncEventQueue[int]() + let key1 = eventQueue.register() + + let dataFut1 = eventQueue.waitEvents(key1, 1) + check dataFut1.finished() == false + eventQueue.unregister(key1) + check dataFut1.finished() == false + poll() + check: + dataFut1.finished() == true + dataFut1.read() == emptySeq + + let key2 = eventQueue.register() + let dataFut2 = eventQueue.waitEvents(key2, 5) + check dataFut2.finished() == false + eventQueue.emit(100) + eventQueue.emit(200) + eventQueue.emit(300) + eventQueue.emit(400) + eventQueue.emit(500) + check dataFut2.finished() == false + eventQueue.unregister(key2) + poll() + check: + dataFut2.finished() == true + dataFut2.read() == emptySeq + + let key3 = eventQueue.register() + let dataFut3 = eventQueue.waitEvents(key3, 5) + check dataFut3.finished() == false + eventQueue.emit(100) + eventQueue.emit(200) + eventQueue.emit(300) + check dataFut3.finished() == false + poll() + eventQueue.unregister(key3) + eventQueue.emit(400) + check dataFut3.finished() == false + poll() + check: + dataFut3.finished() == true + dataFut3.read() == @[100, 200, 300] + + waitFor eventQueue.closeWait() + + test "AsyncEventQueue() garbage collection test": + let eventQueue = newAsyncEventQueue[int]() + let key1 = eventQueue.register() + check len(eventQueue) == 0 + eventQueue.emit(100) + eventQueue.emit(200) + eventQueue.emit(300) + check len(eventQueue) == 3 + let key2 = eventQueue.register() + eventQueue.emit(400) + eventQueue.emit(500) + eventQueue.emit(600) + eventQueue.emit(700) + check len(eventQueue) == 7 + let key3 = eventQueue.register() + eventQueue.emit(800) + eventQueue.emit(900) + eventQueue.emit(1000) + eventQueue.emit(1100) + eventQueue.emit(1200) + check len(eventQueue) == 12 + let dataFut1 = eventQueue.waitEvents(key1) + check: + dataFut1.finished() == true + dataFut1.read() == @[100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, + 1100, 1200] + len(eventQueue) == 9 + + let dataFut3 = eventQueue.waitEvents(key3) + check: + dataFut3.finished() == true + dataFut3.read() == @[800, 900, 1000, 1100, 1200] + len(eventQueue) == 9 + + let dataFut2 = eventQueue.waitEvents(key2) + check: + dataFut2.finished() == true + dataFut2.read() == @[400, 500, 600, 700, 800, 900, 1000, 1100, 1200] + len(eventQueue) == 0 + + waitFor eventQueue.closeWait() + + test "AsyncEventQueue() 1,000,000 of events to 10 clients test": proc test() {.async.} = - let key = bus.subscribeAll(eventCallback) - for i in 0 ..< TestsCount: - await bus.emitWait("event1", i) - check: - flagStr == "" - flagInt == i - await bus.emitWait("event2", $i) - check: - flagStr == $i - flagInt == 0 + let eventQueue = newAsyncEventQueue[int]() + var keys = @[ + eventQueue.register(), eventQueue.register(), + eventQueue.register(), eventQueue.register(), + eventQueue.register(), eventQueue.register(), + eventQueue.register(), eventQueue.register(), + eventQueue.register(), eventQueue.register() + ] - bus.unsubscribe(key) + proc clientTask(queue: AsyncEventQueue[int], + key: EventQueueKey): Future[seq[int]] {.async.} = + var events: seq[int] + while true: + let res = await queue.waitEvents(key) + if len(res) == 0: + break + events.add(res) + queue.unregister(key) + return events - flagInt = high(int) - flagStr = "empty" - for i in 0 ..< TestsCount: - await bus.emitWait("event1", i) - check: - flagStr == "empty" - flagInt == high(int) - await bus.emitWait("event2", $i) - check: - flagStr == "empty" - flagInt == high(int) + var futs = @[ + clientTask(eventQueue, keys[0]), clientTask(eventQueue, keys[1]), + clientTask(eventQueue, keys[2]), clientTask(eventQueue, keys[3]), + clientTask(eventQueue, keys[4]), clientTask(eventQueue, keys[5]), + clientTask(eventQueue, keys[6]), clientTask(eventQueue, keys[7]), + clientTask(eventQueue, keys[8]), clientTask(eventQueue, keys[9]) + ] - waitFor(test()) + for i in 1 .. 1_000_000: + if (i mod 1000) == 0: + # Give some CPU for clients. + await sleepAsync(0.milliseconds) + eventQueue.emit(i) - test "AsyncEventBus() multiple subscribers test": - let - eventBus = newAsyncEventBus() - futA = newFuture[void]() - futB = newFuture[void]() + await eventQueue.closeWait() - proc eventEV1(bus: AsyncEventBus, payload: EventPayload[int]) {.async.} = - futA.complete() + await allFutures(futs) + for index in 0 ..< len(futs): + let fut = futs[index] + check fut.finished() == true + let data = fut.read() + var counter = 1 + for item in data: + check item == counter + inc(counter) + futs[index] = nil - proc eventEV2(bus: AsyncEventBus, payload: EventPayload[int]) {.async.} = - futB.complete() + waitFor test() + test "AsyncEventQueue() one consumer limits test": proc test() {.async.} = - discard eventBus.subscribe("EV", eventEV1) - discard eventBus.subscribe("EV", eventEV2) - eventBus.emit("EV", 5) - await allFutures(futA, futB).wait(1.seconds) + let eventQueue = newAsyncEventQueue[int](4) + check len(eventQueue) == 0 + eventQueue.emit(100) + eventQueue.emit(200) + eventQueue.emit(300) + eventQueue.emit(400) + # There no consumers, so all the items should be discarded + check len(eventQueue) == 0 + let key1 = eventQueue.register() + check len(eventQueue) == 0 + eventQueue.emit(500) + eventQueue.emit(600) + eventQueue.emit(700) + eventQueue.emit(800) + # So exact `limit` number of items added, consumer should receive all of + # them. + check len(eventQueue) == 4 + let dataFut1 = eventQueue.waitEvents(key1) + check: + dataFut1.finished() == true + dataFut1.read() == @[500, 600, 700, 800] + len(eventQueue) == 0 - waitFor test() \ No newline at end of file + eventQueue.emit(900) + eventQueue.emit(1000) + eventQueue.emit(1100) + eventQueue.emit(1200) + check len(eventQueue) == 4 + # Overfilling queue + eventQueue.emit(1300) + # Because overfill for single consumer happend, whole queue should become + # empty. + check len(eventQueue) == 0 + eventQueue.emit(1400) + eventQueue.emit(1500) + eventQueue.emit(1600) + eventQueue.emit(1700) + eventQueue.emit(1800) + check len(eventQueue) == 0 + let errorFut1 = eventQueue.waitEvents(key1) + check errorFut1.finished() == true + let checkException = + try: + let res {.used.} = await errorFut1 + false + except AsyncEventQueueFullError: + true + except CatchableError: + false + check checkException == true + # There should be no items because consumer was overflowed. + check len(eventQueue) == 0 + eventQueue.unregister(key1) + # All items should be garbage collected after unregister. + check len(eventQueue) == 0 + await eventQueue.closeWait() + + waitFor test() + + test "AsyncEventQueue() many consumers limits test": + proc test() {.async.} = + let eventQueue = newAsyncEventQueue[int](4) + block: + let key1 = eventQueue.register() + eventQueue.emit(100) + check len(eventQueue) == 1 + let key2 = eventQueue.register() + eventQueue.emit(200) + check len(eventQueue) == 2 + let key3 = eventQueue.register() + eventQueue.emit(300) + check len(eventQueue) == 3 + let key4 = eventQueue.register() + eventQueue.emit(400) + check len(eventQueue) == 4 + let key5 = eventQueue.register() + eventQueue.emit(500) + # At this point consumer with `key1` is overfilled, so after `emit()` + # queue length should be decreased by one item. + # So queue should look like this: [200, 300, 400, 500] + check len(eventQueue) == 4 + eventQueue.emit(600) + # At this point consumers with `key2` is overfilled, so after `emit()` + # queue length should be decreased by one item. + # So queue should look like this: [300, 400, 500, 600] + check len(eventQueue) == 4 + eventQueue.emit(700) + # At this point consumers with `key3` is overfilled, so after `emit()` + # queue length should be decreased by one item. + # So queue should look like this: [400, 500, 600, 700] + check len(eventQueue) == 4 + eventQueue.emit(800) + # At this point consumers with `key4` is overfilled, so after `emit()` + # queue length should be decreased by one item. + # So queue should look like this: [500, 600, 700, 800] + check len(eventQueue) == 4 + # Consumer with key5 is not overfilled. + let dataFut5 = eventQueue.waitEvents(key5) + check: + dataFut5.finished() == true + dataFut5.read() == @[500, 600, 700, 800] + # No more items should be left because all other consumers are overfilled. + check len(eventQueue) == 0 + eventQueue.unregister(key5) + check len(eventQueue) == 0 + + let dataFut2 = eventQueue.waitEvents(key2) + check dataFut2.finished() == true + expect AsyncEventQueueFullError: + let res {.used.} = dataFut2.read() + check len(eventQueue) == 0 + eventQueue.unregister(key2) + check len(eventQueue) == 0 + + let dataFut4 = eventQueue.waitEvents(key4) + check dataFut4.finished() == true + expect AsyncEventQueueFullError: + let res {.used.} = dataFut4.read() + check len(eventQueue) == 0 + eventQueue.unregister(key4) + check len(eventQueue) == 0 + + let dataFut3 = eventQueue.waitEvents(key3) + check dataFut3.finished() == true + expect AsyncEventQueueFullError: + let res {.used.} = dataFut3.read() + check len(eventQueue) == 0 + eventQueue.unregister(key3) + check len(eventQueue) == 0 + + let dataFut1 = eventQueue.waitEvents(key1) + check dataFut1.finished() == true + expect AsyncEventQueueFullError: + let res {.used.} = dataFut1.read() + check len(eventQueue) == 0 + eventQueue.unregister(key1) + check len(eventQueue) == 0 + + block: + let key1 = eventQueue.register() + eventQueue.emit(100) + check len(eventQueue) == 1 + let key2 = eventQueue.register() + eventQueue.emit(200) + check len(eventQueue) == 2 + let key3 = eventQueue.register() + eventQueue.emit(300) + check len(eventQueue) == 3 + let key4 = eventQueue.register() + eventQueue.emit(400) + check len(eventQueue) == 4 + let key5 = eventQueue.register() + eventQueue.emit(500) + # At this point consumer with `key1` is overfilled, so after `emit()` + # queue length should be decreased by one item. + # So queue should look like this: [200, 300, 400, 500] + check len(eventQueue) == 4 + eventQueue.emit(600) + # At this point consumer with `key2` is overfilled, so after `emit()` + # queue length should be decreased by one item. + # So queue should look like this: [300, 400, 500, 600] + check len(eventQueue) == 4 + eventQueue.emit(700) + # At this point consumer with `key3` is overfilled, so after `emit()` + # queue length should be decreased by one item. + # So queue should look like this: [400, 500, 600, 700] + check len(eventQueue) == 4 + eventQueue.emit(800) + # At this point consumer with `key4` is overfilled, so after `emit()` + # queue length should be decreased by one item. + # So queue should look like this: [500, 600, 700, 800] + check len(eventQueue) == 4 + eventQueue.emit(900) + # At this point all consumers are overfilled, so after `emit()` + # queue length should become 0. + check len(eventQueue) == 0 + eventQueue.emit(1000) + eventQueue.emit(1100) + eventQueue.emit(1200) + eventQueue.emit(1300) + eventQueue.emit(1400) + eventQueue.emit(1500) + eventQueue.emit(1600) + eventQueue.emit(1700) + eventQueue.emit(1800) + eventQueue.emit(1900) + # No more events should be accepted. + check len(eventQueue) == 0 + + let dataFut1 = eventQueue.waitEvents(key1) + check dataFut1.finished() == true + expect AsyncEventQueueFullError: + let res {.used.} = dataFut1.read() + check len(eventQueue) == 0 + eventQueue.unregister(key1) + check len(eventQueue) == 0 + + let dataFut2 = eventQueue.waitEvents(key2) + check dataFut2.finished() == true + expect AsyncEventQueueFullError: + let res {.used.} = dataFut2.read() + check len(eventQueue) == 0 + eventQueue.unregister(key2) + check len(eventQueue) == 0 + + let dataFut3 = eventQueue.waitEvents(key3) + check dataFut3.finished() == true + expect AsyncEventQueueFullError: + let res {.used.} = dataFut3.read() + check len(eventQueue) == 0 + eventQueue.unregister(key3) + check len(eventQueue) == 0 + + let dataFut4 = eventQueue.waitEvents(key4) + check dataFut4.finished() == true + expect AsyncEventQueueFullError: + let res {.used.} = dataFut4.read() + check len(eventQueue) == 0 + eventQueue.unregister(key4) + check len(eventQueue) == 0 + + let dataFut5 = eventQueue.waitEvents(key5) + check dataFut5.finished() == true + expect AsyncEventQueueFullError: + let res {.used.} = dataFut5.read() + check len(eventQueue) == 0 + eventQueue.unregister(key5) + check len(eventQueue) == 0 + await eventQueue.closeWait() + + waitFor test() + + test "AsyncEventQueue() slow and fast consumer test": + proc test() {.async.} = + let eventQueue = newAsyncEventQueue[int](1) + let + fastConsumer = eventQueue.register() + slowConsumer = eventQueue.register() + slowFut = eventQueue.waitEvents(slowConsumer) + + for i in 0 ..< 1000: + eventQueue.emit(i) + let fastData {.used.} = await eventQueue.waitEvents(fastConsumer) + + check len(eventQueue) == 0 + await allFutures(slowFut) + check len(eventQueue) == 0 + expect AsyncEventQueueFullError: + let res {.used.} = slowFut.read() + + check len(eventQueue) == 0 + eventQueue.unregister(fastConsumer) + check len(eventQueue) == 0 + eventQueue.unregister(slowConsumer) + check len(eventQueue) == 0 + await eventQueue.closeWait() + + waitFor test()