diff --git a/codexcrawler/utils/asyncdataevent.nim b/codexcrawler/utils/asyncdataevent.nim new file mode 100644 index 0000000..2605a30 --- /dev/null +++ b/codexcrawler/utils/asyncdataevent.nim @@ -0,0 +1,64 @@ +import pkg/questionable +import pkg/questionable/results +import pkg/chronos + +type + AsyncDataEventSubscription* = ref object + key: EventQueueKey + isRunning: bool + fireEvent: AsyncEvent + stopEvent: AsyncEvent + lastResult: ?!void + + AsyncDataEvent*[T] = ref object + queue: AsyncEventQueue[?T] + subscriptions: seq[AsyncDataEventSubscription] + + AsyncDataEventHandler*[T] = proc(data: T): Future[?!void] + +proc newAsyncDataEvent*[T](): AsyncDataEvent[T] = + AsyncDataEvent[T]( + queue: newAsyncEventQueue[?T](), + subscriptions: newSeq[AsyncDataEventSubscription]() + ) + +proc subscribe*[T](event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]): AsyncDataEventSubscription = + let subscription = AsyncDataEventSubscription( + key: event.queue.register(), + isRunning: true, + fireEvent: newAsyncEvent(), + stopEvent: newAsyncEvent() + ) + + proc listener() {.async.} = + while subscription.isRunning: + let items = await event.queue.waitEvents(subscription.key) + for item in items: + if data =? item: + subscription.lastResult = (await handler(data)) + subscription.fireEvent.fire() + subscription.stopEvent.fire() + + asyncSpawn listener() + + event.subscriptions.add(subscription) + subscription + +proc fire*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} = + event.queue.emit(data.some) + for subscription in event.subscriptions: + await subscription.fireEvent.wait() + if err =? subscription.lastResult.errorOption: + return failure(err) + success() + +proc unsubscribe*[T](event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription) {.async.} = + subscription.isRunning = false + event.queue.emit(T.none) + await subscription.stopEvent.wait() + event.subscriptions.delete(event.subscriptions.find(subscription)) + +proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async.} = + let all = event.subscriptions + for subscription in all: + await event.unsubscribe(subscription) diff --git a/tests/codexcrawler/exampletest.nim b/tests/codexcrawler/exampletest.nim deleted file mode 100644 index ca0d9b4..0000000 --- a/tests/codexcrawler/exampletest.nim +++ /dev/null @@ -1,7 +0,0 @@ -import pkg/asynctest/chronos/unittest - -suite "Example tests": - test "Example": - echo "Woo!" - check: - 1 == 1 diff --git a/tests/codexcrawler/testutils.nim b/tests/codexcrawler/testutils.nim new file mode 100644 index 0000000..eaec989 --- /dev/null +++ b/tests/codexcrawler/testutils.nim @@ -0,0 +1,3 @@ +import ./utils/testasyncdataevent + +{.warning[UnusedImport]: off.} diff --git a/tests/codexcrawler/utils/testasyncdataevent.nim b/tests/codexcrawler/utils/testasyncdataevent.nim new file mode 100644 index 0000000..9b034c8 --- /dev/null +++ b/tests/codexcrawler/utils/testasyncdataevent.nim @@ -0,0 +1,86 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/asynctest/chronos/unittest + +import ../../../codexcrawler/utils/asyncdataevent + +type + ExampleData = object + s: string + +suite "AsyncDataEvent": + var event: AsyncDataEvent[ExampleData] + let msg = "Yeah!" + + setup: + event = newAsyncDataEvent[ExampleData]() + + teardown: + await event.unsubscribeAll() + + test "Successful event": + var data = "" + proc eventHandler(e: ExampleData): Future[?!void] {.async.} = + data = e.s + success() + + let s = event.subscribe(eventHandler) + + check: + isOK(await event.fire(ExampleData( + s: msg + ))) + data == msg + + await event.unsubscribe(s) + + test "Failed event preserves error message": + proc eventHandler(e: ExampleData): Future[?!void] {.async.} = + failure(msg) + + let s = event.subscribe(eventHandler) + let fireResult = await event.fire(ExampleData( + s: "a" + )) + + check: + fireResult.isErr + fireResult.error.msg == msg + + await event.unsubscribe(s) + + test "Emits data to multiple subscribers": + var + data1 = "" + data2 = "" + data3 = "" + + proc handler1(e: ExampleData): Future[?!void] {.async.} = + data1 = e.s + success() + proc handler2(e: ExampleData): Future[?!void] {.async.} = + data2 = e.s + success() + proc handler3(e: ExampleData): Future[?!void] {.async.} = + data3 = e.s + success() + + let + s1 = event.subscribe(handler1) + s2 = event.subscribe(handler2) + s3 = event.subscribe(handler3) + + let fireResult = await event.fire(ExampleData( + s: msg + )) + + check: + fireResult.isOK + data1 == msg + data2 == msg + data3 == msg + + await event.unsubscribe(s1) + await event.unsubscribe(s2) + await event.unsubscribe(s3) diff --git a/tests/test.nim b/tests/test.nim index d7c2006..ca1588f 100644 --- a/tests/test.nim +++ b/tests/test.nim @@ -1,3 +1,3 @@ -import ./codexcrawler/exampletest +import ./codexcrawler/testutils {.warning[UnusedImport]: off.}