From 94186abd0c950782c379c12d11c2867852a8d626 Mon Sep 17 00:00:00 2001 From: Ben Date: Wed, 24 Apr 2024 12:18:29 +0200 Subject: [PATCH] cleanup --- codex/utils/asyncdataevent.nim | 27 ++++----- tests/codex/utils/testasyncdataevent.nim | 75 ++++++++++++++++++------ 2 files changed, 66 insertions(+), 36 deletions(-) diff --git a/codex/utils/asyncdataevent.nim b/codex/utils/asyncdataevent.nim index 71ff5491..4946ad6f 100644 --- a/codex/utils/asyncdataevent.nim +++ b/codex/utils/asyncdataevent.nim @@ -1,6 +1,7 @@ import pkg/questionable import pkg/questionable/results import pkg/chronos +import pkg/chronicles type AsyncDataEventSubscription* = ref object @@ -16,15 +17,13 @@ type AsyncDataEventHandler*[T] = proc(data: T): Future[?!void] -proc newAsyncDataEvent*[T]: AsyncDataEvent[T] = - echo "new event" +proc newAsyncDataEvent*[T](): AsyncDataEvent[T] = AsyncDataEvent[T]( queue: newAsyncEventQueue[?T](), subscriptions: newSeq[AsyncDataEventSubscription]() ) -proc subscribeA*[T](event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]): AsyncDataEventSubscription = - echo "subscribing..." +proc subscribe*[T](event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]): AsyncDataEventSubscription = let subscription = AsyncDataEventSubscription( key: event.queue.register(), isRunning: true, @@ -33,40 +32,34 @@ proc subscribeA*[T](event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]) ) proc listener() {.async.} = - echo " >>> listener starting!" while subscription.isRunning: - echo " >>> waiting for event" let items = await event.queue.waitEvents(subscription.key) for item in items: if data =? item: - echo " >>> got data" subscription.lastResult = (await handler(data)) subscription.fireEvent.fire() - echo " >>> stopping..." subscription.stopEvent.fire() asyncSpawn listener() event.subscriptions.add(subscription) - echo "subscribed" subscription -proc fireA*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} = - echo "firing..." +proc fire*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} = event.queue.emit(data.some) - echo "checking results:" for subscription in event.subscriptions: await subscription.fireEvent.wait() if err =? subscription.lastResult.errorOption: return failure(err) - echo "ok, fired" success() -proc unsubscribeA*[T](event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription) {.async.} = - echo "unsubscribing..." +proc unsubscribe*[T](event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription) {.async.} = subscription.isRunning = false event.queue.emit(T.none) - echo "waiting for stop event" await subscription.stopEvent.wait() - echo "all done" + event.subscriptions.delete(event.subscriptions.find(subscription)) +proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async.} = + let all = event.subscriptions + for subscription in all: + await event.unsubscribe(subscription) diff --git a/tests/codex/utils/testasyncdataevent.nim b/tests/codex/utils/testasyncdataevent.nim index 1dad361f..70b3445c 100644 --- a/tests/codex/utils/testasyncdataevent.nim +++ b/tests/codex/utils/testasyncdataevent.nim @@ -5,36 +5,73 @@ import codex/utils/asyncdataevent import ../../asynctest import ../helpers -asyncchecksuite "AsyncDataEvent": - test "Successful event": - let event = newAsyncDataEvent[int]() +type + ExampleData = object + s: string - var data = 0 - proc eventHandler(d: int): Future[?!void] {.async.} = - data = d +asyncchecksuite "AsyncDataEvent": + var event: AsyncDataEvent[ExampleData] + let msg = "Yeah!" + + setup: + event = newAsyncDataEvent[ExampleData]() + + teardown: + await event.unsubscribeAll() + + test "Successful event": + var data = "" + proc eventHandler(e: ExampleData): Future[?!void] {.async.} = + data = e.s success() - let handle = event.subscribeA(eventHandler) + event.subscribe(eventHandler) check: - isOK(await event.fireA(123)) - data == 123 - - await event.unsubscribeA(handle) + isOK(await event.fire(ExampleData( + s: msg + ))) + data == msg test "Failed event preserves error message": - let - event = newAsyncDataEvent[int]() - msg = "Error message!" - - proc eventHandler(d: int): Future[?!void] {.async.} = + proc eventHandler(e: ExampleData): Future[?!void] {.async.} = failure(msg) - let handle = event.subscribeA(eventHandler) - let fireResult = await event.fireA(123) + event.subscribe(eventHandler) + let fireResult = await event.fire(ExampleData( + s: "a" + )) check: fireResult.isErr fireResult.error.msg == msg - await event.unsubscribeA(handle) \ No newline at end of file + test "Emits data to multiple subscribers": + var + data1 = "" + data2 = "" + data3 = "" + + proc handler1(e: ExampleData): Future[?!void] {.async.} = + data1 = e.s + success() + proc handler2(e: ExampleData): Future[?!void] {.async.} = + data2 = e.s + success() + proc handler3(e: ExampleData): Future[?!void] {.async.} = + data3 = e.s + success() + + event.subscribe(handler1) + event.subscribe(handler2) + event.subscribe(handler3) + + let fireResult = await event.fire(ExampleData( + s: msg + )) + + check: + fireResult.isOK + data1 == msg + data2 == msg + data3 == msg