From 769c61efaac01375ba1c037c6e969827f32f8ad3 Mon Sep 17 00:00:00 2001 From: Ben Date: Wed, 24 Apr 2024 11:41:12 +0200 Subject: [PATCH] setting up data-event that preserves error results --- codex/utils/asyncdataevent.nim | 72 ++++++++++++++++++++++++ tests/codex/utils/testasyncdataevent.nim | 40 +++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 codex/utils/asyncdataevent.nim create mode 100644 tests/codex/utils/testasyncdataevent.nim diff --git a/codex/utils/asyncdataevent.nim b/codex/utils/asyncdataevent.nim new file mode 100644 index 00000000..71ff5491 --- /dev/null +++ b/codex/utils/asyncdataevent.nim @@ -0,0 +1,72 @@ +import pkg/questionable +import pkg/questionable/results +import pkg/chronos + +type + AsyncDataEventSubscription* = ref object + key: EventQueueKey + isRunning: bool + fireEvent: AsyncEvent + stopEvent: AsyncEvent + lastResult: ?!void + + AsyncDataEvent*[T] = ref object + queue: AsyncEventQueue[?T] + subscriptions: seq[AsyncDataEventSubscription] + + AsyncDataEventHandler*[T] = proc(data: T): Future[?!void] + +proc newAsyncDataEvent*[T]: AsyncDataEvent[T] = + echo "new event" + AsyncDataEvent[T]( + queue: newAsyncEventQueue[?T](), + subscriptions: newSeq[AsyncDataEventSubscription]() + ) + +proc subscribeA*[T](event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]): AsyncDataEventSubscription = + echo "subscribing..." + let subscription = AsyncDataEventSubscription( + key: event.queue.register(), + isRunning: true, + fireEvent: newAsyncEvent(), + stopEvent: newAsyncEvent() + ) + + proc listener() {.async.} = + echo " >>> listener starting!" + while subscription.isRunning: + echo " >>> waiting for event" + let items = await event.queue.waitEvents(subscription.key) + for item in items: + if data =? item: + echo " >>> got data" + subscription.lastResult = (await handler(data)) + subscription.fireEvent.fire() + echo " >>> stopping..." + subscription.stopEvent.fire() + + asyncSpawn listener() + + event.subscriptions.add(subscription) + echo "subscribed" + subscription + +proc fireA*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} = + echo "firing..." + event.queue.emit(data.some) + echo "checking results:" + for subscription in event.subscriptions: + await subscription.fireEvent.wait() + if err =? subscription.lastResult.errorOption: + return failure(err) + echo "ok, fired" + success() + +proc unsubscribeA*[T](event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription) {.async.} = + echo "unsubscribing..." + subscription.isRunning = false + event.queue.emit(T.none) + echo "waiting for stop event" + await subscription.stopEvent.wait() + echo "all done" + diff --git a/tests/codex/utils/testasyncdataevent.nim b/tests/codex/utils/testasyncdataevent.nim new file mode 100644 index 00000000..1dad361f --- /dev/null +++ b/tests/codex/utils/testasyncdataevent.nim @@ -0,0 +1,40 @@ +import pkg/chronos + +import codex/utils/asyncdataevent + +import ../../asynctest +import ../helpers + +asyncchecksuite "AsyncDataEvent": + test "Successful event": + let event = newAsyncDataEvent[int]() + + var data = 0 + proc eventHandler(d: int): Future[?!void] {.async.} = + data = d + success() + + let handle = event.subscribeA(eventHandler) + + check: + isOK(await event.fireA(123)) + data == 123 + + await event.unsubscribeA(handle) + + test "Failed event preserves error message": + let + event = newAsyncDataEvent[int]() + msg = "Error message!" + + proc eventHandler(d: int): Future[?!void] {.async.} = + failure(msg) + + let handle = event.subscribeA(eventHandler) + let fireResult = await event.fireA(123) + + check: + fireResult.isErr + fireResult.error.msg == msg + + await event.unsubscribeA(handle) \ No newline at end of file