2025-02-10 13:51:26 +01:00
|
|
|
import pkg/questionable
|
|
|
|
|
import pkg/questionable/results
|
|
|
|
|
import pkg/chronos
|
|
|
|
|
|
|
|
|
|
type
|
|
|
|
|
AsyncDataEventSubscription* = ref object
|
|
|
|
|
key: EventQueueKey
|
2025-02-11 12:42:20 +01:00
|
|
|
listenFuture: Future[void]
|
2025-02-10 13:51:26 +01:00
|
|
|
fireEvent: AsyncEvent
|
|
|
|
|
lastResult: ?!void
|
2025-02-11 12:42:20 +01:00
|
|
|
inHandler: bool
|
|
|
|
|
delayedUnsubscribe: bool
|
2025-02-10 13:51:26 +01:00
|
|
|
|
|
|
|
|
AsyncDataEvent*[T] = ref object
|
|
|
|
|
queue: AsyncEventQueue[?T]
|
|
|
|
|
subscriptions: seq[AsyncDataEventSubscription]
|
|
|
|
|
|
2025-06-02 16:16:41 +02:00
|
|
|
AsyncDataEventHandler*[T] =
|
|
|
|
|
proc(data: T): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).}
|
2025-02-10 13:51:26 +01:00
|
|
|
|
|
|
|
|
proc newAsyncDataEvent*[T](): AsyncDataEvent[T] =
|
|
|
|
|
AsyncDataEvent[T](
|
2025-02-10 15:34:41 +01:00
|
|
|
queue: newAsyncEventQueue[?T](), subscriptions: newSeq[AsyncDataEventSubscription]()
|
2025-02-10 13:51:26 +01:00
|
|
|
)
|
|
|
|
|
|
2025-02-11 12:43:55 +01:00
|
|
|
proc performUnsubscribe[T](
|
|
|
|
|
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
|
2025-06-02 14:30:28 +02:00
|
|
|
) {.async: (raises: [CancelledError]).} =
|
2025-02-11 12:42:20 +01:00
|
|
|
if subscription in event.subscriptions:
|
|
|
|
|
await subscription.listenFuture.cancelAndWait()
|
|
|
|
|
event.subscriptions.delete(event.subscriptions.find(subscription))
|
|
|
|
|
|
2025-02-10 15:34:41 +01:00
|
|
|
proc subscribe*[T](
|
|
|
|
|
event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]
|
|
|
|
|
): AsyncDataEventSubscription =
|
2025-02-11 12:42:20 +01:00
|
|
|
var subscription = AsyncDataEventSubscription(
|
2025-02-10 13:51:26 +01:00
|
|
|
key: event.queue.register(),
|
2025-02-11 12:42:20 +01:00
|
|
|
listenFuture: newFuture[void](),
|
2025-02-10 13:51:26 +01:00
|
|
|
fireEvent: newAsyncEvent(),
|
2025-02-11 12:42:20 +01:00
|
|
|
inHandler: false,
|
2025-02-11 12:43:55 +01:00
|
|
|
delayedUnsubscribe: false,
|
2025-02-10 13:51:26 +01:00
|
|
|
)
|
|
|
|
|
|
2025-06-02 14:30:28 +02:00
|
|
|
proc listener() {.async: (raises: [CancelledError]).} =
|
2025-06-02 15:00:32 +02:00
|
|
|
try:
|
|
|
|
|
while true:
|
|
|
|
|
let items = await event.queue.waitEvents(subscription.key)
|
|
|
|
|
for item in items:
|
|
|
|
|
if data =? item:
|
|
|
|
|
subscription.inHandler = true
|
|
|
|
|
subscription.lastResult = (await handler(data))
|
|
|
|
|
subscription.inHandler = false
|
|
|
|
|
subscription.fireEvent.fire()
|
|
|
|
|
except AsyncEventQueueFullError as err:
|
|
|
|
|
raiseAssert("AsyncEventQueueFullError in asyncdataevent.listener()")
|
2025-02-10 13:51:26 +01:00
|
|
|
|
2025-02-11 12:42:20 +01:00
|
|
|
subscription.listenFuture = listener()
|
2025-02-10 13:51:26 +01:00
|
|
|
|
|
|
|
|
event.subscriptions.add(subscription)
|
|
|
|
|
subscription
|
|
|
|
|
|
2025-02-11 16:31:23 +01:00
|
|
|
proc fire*[T](
|
|
|
|
|
event: AsyncDataEvent[T], data: T
|
|
|
|
|
): Future[?!void] {.async: (raises: []).} =
|
2025-02-10 13:51:26 +01:00
|
|
|
event.queue.emit(data.some)
|
2025-02-11 12:42:20 +01:00
|
|
|
var toUnsubscribe = newSeq[AsyncDataEventSubscription]()
|
|
|
|
|
for sub in event.subscriptions:
|
2025-02-11 16:31:23 +01:00
|
|
|
try:
|
|
|
|
|
await sub.fireEvent.wait()
|
2025-02-13 14:55:45 +01:00
|
|
|
sub.fireEvent.clear()
|
2025-02-11 16:31:23 +01:00
|
|
|
except CancelledError:
|
|
|
|
|
discard
|
2025-02-11 12:42:20 +01:00
|
|
|
if err =? sub.lastResult.errorOption:
|
2025-02-10 13:51:26 +01:00
|
|
|
return failure(err)
|
2025-02-11 12:42:20 +01:00
|
|
|
if sub.delayedUnsubscribe:
|
|
|
|
|
toUnsubscribe.add(sub)
|
2025-02-11 12:43:55 +01:00
|
|
|
|
2025-02-11 12:42:20 +01:00
|
|
|
for sub in toUnsubscribe:
|
2025-02-11 16:31:23 +01:00
|
|
|
try:
|
|
|
|
|
await event.unsubscribe(sub)
|
|
|
|
|
except CatchableError as exc:
|
|
|
|
|
return failure(exc.msg)
|
2025-02-11 12:42:20 +01:00
|
|
|
|
2025-02-10 13:51:26 +01:00
|
|
|
success()
|
|
|
|
|
|
2025-02-10 15:34:41 +01:00
|
|
|
proc unsubscribe*[T](
|
|
|
|
|
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
|
2025-06-02 14:30:28 +02:00
|
|
|
) {.async: (raises: [CancelledError]).} =
|
2025-02-11 12:42:20 +01:00
|
|
|
if subscription.inHandler:
|
|
|
|
|
subscription.delayedUnsubscribe = true
|
|
|
|
|
else:
|
|
|
|
|
await event.performUnsubscribe(subscription)
|
2025-02-10 13:51:26 +01:00
|
|
|
|
2025-06-02 16:16:41 +02:00
|
|
|
proc unsubscribeAll*[T](
|
|
|
|
|
event: AsyncDataEvent[T]
|
|
|
|
|
) {.async: (raises: [CancelledError]).} =
|
2025-02-10 13:51:26 +01:00
|
|
|
let all = event.subscriptions
|
|
|
|
|
for subscription in all:
|
|
|
|
|
await event.unsubscribe(subscription)
|
2025-02-11 12:42:20 +01:00
|
|
|
|
|
|
|
|
proc listeners*[T](event: AsyncDataEvent[T]): int =
|
|
|
|
|
event.subscriptions.len
|