103 lines
3.0 KiB
Nim
Raw Permalink Normal View History

2025-02-10 13:51:26 +01:00
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
type
AsyncDataEventSubscription* = ref object
key: EventQueueKey
2025-02-11 12:42:20 +01:00
listenFuture: Future[void]
2025-02-10 13:51:26 +01:00
fireEvent: AsyncEvent
lastResult: ?!void
2025-02-11 12:42:20 +01:00
inHandler: bool
delayedUnsubscribe: bool
2025-02-10 13:51:26 +01:00
AsyncDataEvent*[T] = ref object
queue: AsyncEventQueue[?T]
subscriptions: seq[AsyncDataEventSubscription]
2025-06-02 16:16:41 +02:00
AsyncDataEventHandler*[T] =
proc(data: T): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).}
2025-02-10 13:51:26 +01:00
proc newAsyncDataEvent*[T](): AsyncDataEvent[T] =
AsyncDataEvent[T](
2025-02-10 15:34:41 +01:00
queue: newAsyncEventQueue[?T](), subscriptions: newSeq[AsyncDataEventSubscription]()
2025-02-10 13:51:26 +01:00
)
2025-02-11 12:43:55 +01:00
proc performUnsubscribe[T](
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
2025-06-02 14:30:28 +02:00
) {.async: (raises: [CancelledError]).} =
2025-02-11 12:42:20 +01:00
if subscription in event.subscriptions:
await subscription.listenFuture.cancelAndWait()
event.subscriptions.delete(event.subscriptions.find(subscription))
2025-02-10 15:34:41 +01:00
proc subscribe*[T](
event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]
): AsyncDataEventSubscription =
2025-02-11 12:42:20 +01:00
var subscription = AsyncDataEventSubscription(
2025-02-10 13:51:26 +01:00
key: event.queue.register(),
2025-02-11 12:42:20 +01:00
listenFuture: newFuture[void](),
2025-02-10 13:51:26 +01:00
fireEvent: newAsyncEvent(),
2025-02-11 12:42:20 +01:00
inHandler: false,
2025-02-11 12:43:55 +01:00
delayedUnsubscribe: false,
2025-02-10 13:51:26 +01:00
)
2025-06-02 14:30:28 +02:00
proc listener() {.async: (raises: [CancelledError]).} =
2025-06-02 15:00:32 +02:00
try:
while true:
let items = await event.queue.waitEvents(subscription.key)
for item in items:
if data =? item:
subscription.inHandler = true
subscription.lastResult = (await handler(data))
subscription.inHandler = false
subscription.fireEvent.fire()
except AsyncEventQueueFullError as err:
raiseAssert("AsyncEventQueueFullError in asyncdataevent.listener()")
2025-02-10 13:51:26 +01:00
2025-02-11 12:42:20 +01:00
subscription.listenFuture = listener()
2025-02-10 13:51:26 +01:00
event.subscriptions.add(subscription)
subscription
2025-02-11 16:31:23 +01:00
proc fire*[T](
event: AsyncDataEvent[T], data: T
): Future[?!void] {.async: (raises: []).} =
2025-02-10 13:51:26 +01:00
event.queue.emit(data.some)
2025-02-11 12:42:20 +01:00
var toUnsubscribe = newSeq[AsyncDataEventSubscription]()
for sub in event.subscriptions:
2025-02-11 16:31:23 +01:00
try:
await sub.fireEvent.wait()
sub.fireEvent.clear()
2025-02-11 16:31:23 +01:00
except CancelledError:
discard
2025-02-11 12:42:20 +01:00
if err =? sub.lastResult.errorOption:
2025-02-10 13:51:26 +01:00
return failure(err)
2025-02-11 12:42:20 +01:00
if sub.delayedUnsubscribe:
toUnsubscribe.add(sub)
2025-02-11 12:43:55 +01:00
2025-02-11 12:42:20 +01:00
for sub in toUnsubscribe:
2025-02-11 16:31:23 +01:00
try:
await event.unsubscribe(sub)
except CatchableError as exc:
return failure(exc.msg)
2025-02-11 12:42:20 +01:00
2025-02-10 13:51:26 +01:00
success()
2025-02-10 15:34:41 +01:00
proc unsubscribe*[T](
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
2025-06-02 14:30:28 +02:00
) {.async: (raises: [CancelledError]).} =
2025-02-11 12:42:20 +01:00
if subscription.inHandler:
subscription.delayedUnsubscribe = true
else:
await event.performUnsubscribe(subscription)
2025-02-10 13:51:26 +01:00
2025-06-02 16:16:41 +02:00
proc unsubscribeAll*[T](
event: AsyncDataEvent[T]
) {.async: (raises: [CancelledError]).} =
2025-02-10 13:51:26 +01:00
let all = event.subscriptions
for subscription in all:
await event.unsubscribe(subscription)
2025-02-11 12:42:20 +01:00
proc listeners*[T](event: AsyncDataEvent[T]): int =
event.subscriptions.len