setting up data-event that preserves error results

This commit is contained in:
Ben 2024-04-24 11:41:12 +02:00
parent 3041f5ff5f
commit 769c61efaa
No known key found for this signature in database
GPG Key ID: 541B9D8C9F1426A1
2 changed files with 112 additions and 0 deletions

View File

@ -0,0 +1,72 @@
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
type
AsyncDataEventSubscription* = ref object
key: EventQueueKey
isRunning: bool
fireEvent: AsyncEvent
stopEvent: AsyncEvent
lastResult: ?!void
AsyncDataEvent*[T] = ref object
queue: AsyncEventQueue[?T]
subscriptions: seq[AsyncDataEventSubscription]
AsyncDataEventHandler*[T] = proc(data: T): Future[?!void]
proc newAsyncDataEvent*[T]: AsyncDataEvent[T] =
echo "new event"
AsyncDataEvent[T](
queue: newAsyncEventQueue[?T](),
subscriptions: newSeq[AsyncDataEventSubscription]()
)
proc subscribeA*[T](event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]): AsyncDataEventSubscription =
echo "subscribing..."
let subscription = AsyncDataEventSubscription(
key: event.queue.register(),
isRunning: true,
fireEvent: newAsyncEvent(),
stopEvent: newAsyncEvent()
)
proc listener() {.async.} =
echo " >>> listener starting!"
while subscription.isRunning:
echo " >>> waiting for event"
let items = await event.queue.waitEvents(subscription.key)
for item in items:
if data =? item:
echo " >>> got data"
subscription.lastResult = (await handler(data))
subscription.fireEvent.fire()
echo " >>> stopping..."
subscription.stopEvent.fire()
asyncSpawn listener()
event.subscriptions.add(subscription)
echo "subscribed"
subscription
proc fireA*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} =
echo "firing..."
event.queue.emit(data.some)
echo "checking results:"
for subscription in event.subscriptions:
await subscription.fireEvent.wait()
if err =? subscription.lastResult.errorOption:
return failure(err)
echo "ok, fired"
success()
proc unsubscribeA*[T](event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription) {.async.} =
echo "unsubscribing..."
subscription.isRunning = false
event.queue.emit(T.none)
echo "waiting for stop event"
await subscription.stopEvent.wait()
echo "all done"

View File

@ -0,0 +1,40 @@
import pkg/chronos
import codex/utils/asyncdataevent
import ../../asynctest
import ../helpers
asyncchecksuite "AsyncDataEvent":
test "Successful event":
let event = newAsyncDataEvent[int]()
var data = 0
proc eventHandler(d: int): Future[?!void] {.async.} =
data = d
success()
let handle = event.subscribeA(eventHandler)
check:
isOK(await event.fireA(123))
data == 123
await event.unsubscribeA(handle)
test "Failed event preserves error message":
let
event = newAsyncDataEvent[int]()
msg = "Error message!"
proc eventHandler(d: int): Future[?!void] {.async.} =
failure(msg)
let handle = event.subscribeA(eventHandler)
let fireResult = await event.fireA(123)
check:
fireResult.isErr
fireResult.error.msg == msg
await event.unsubscribeA(handle)