cleanup
This commit is contained in:
parent
769c61efaa
commit
94186abd0c
|
@ -1,6 +1,7 @@
|
|||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
|
||||
type
|
||||
AsyncDataEventSubscription* = ref object
|
||||
|
@ -16,15 +17,13 @@ type
|
|||
|
||||
AsyncDataEventHandler*[T] = proc(data: T): Future[?!void]
|
||||
|
||||
proc newAsyncDataEvent*[T]: AsyncDataEvent[T] =
|
||||
echo "new event"
|
||||
proc newAsyncDataEvent*[T](): AsyncDataEvent[T] =
|
||||
AsyncDataEvent[T](
|
||||
queue: newAsyncEventQueue[?T](),
|
||||
subscriptions: newSeq[AsyncDataEventSubscription]()
|
||||
)
|
||||
|
||||
proc subscribeA*[T](event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]): AsyncDataEventSubscription =
|
||||
echo "subscribing..."
|
||||
proc subscribe*[T](event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]): AsyncDataEventSubscription =
|
||||
let subscription = AsyncDataEventSubscription(
|
||||
key: event.queue.register(),
|
||||
isRunning: true,
|
||||
|
@ -33,40 +32,34 @@ proc subscribeA*[T](event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T])
|
|||
)
|
||||
|
||||
proc listener() {.async.} =
|
||||
echo " >>> listener starting!"
|
||||
while subscription.isRunning:
|
||||
echo " >>> waiting for event"
|
||||
let items = await event.queue.waitEvents(subscription.key)
|
||||
for item in items:
|
||||
if data =? item:
|
||||
echo " >>> got data"
|
||||
subscription.lastResult = (await handler(data))
|
||||
subscription.fireEvent.fire()
|
||||
echo " >>> stopping..."
|
||||
subscription.stopEvent.fire()
|
||||
|
||||
asyncSpawn listener()
|
||||
|
||||
event.subscriptions.add(subscription)
|
||||
echo "subscribed"
|
||||
subscription
|
||||
|
||||
proc fireA*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} =
|
||||
echo "firing..."
|
||||
proc fire*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} =
|
||||
event.queue.emit(data.some)
|
||||
echo "checking results:"
|
||||
for subscription in event.subscriptions:
|
||||
await subscription.fireEvent.wait()
|
||||
if err =? subscription.lastResult.errorOption:
|
||||
return failure(err)
|
||||
echo "ok, fired"
|
||||
success()
|
||||
|
||||
proc unsubscribeA*[T](event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription) {.async.} =
|
||||
echo "unsubscribing..."
|
||||
proc unsubscribe*[T](event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription) {.async.} =
|
||||
subscription.isRunning = false
|
||||
event.queue.emit(T.none)
|
||||
echo "waiting for stop event"
|
||||
await subscription.stopEvent.wait()
|
||||
echo "all done"
|
||||
event.subscriptions.delete(event.subscriptions.find(subscription))
|
||||
|
||||
proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async.} =
|
||||
let all = event.subscriptions
|
||||
for subscription in all:
|
||||
await event.unsubscribe(subscription)
|
||||
|
|
|
@ -5,36 +5,73 @@ import codex/utils/asyncdataevent
|
|||
import ../../asynctest
|
||||
import ../helpers
|
||||
|
||||
asyncchecksuite "AsyncDataEvent":
|
||||
test "Successful event":
|
||||
let event = newAsyncDataEvent[int]()
|
||||
type
|
||||
ExampleData = object
|
||||
s: string
|
||||
|
||||
var data = 0
|
||||
proc eventHandler(d: int): Future[?!void] {.async.} =
|
||||
data = d
|
||||
asyncchecksuite "AsyncDataEvent":
|
||||
var event: AsyncDataEvent[ExampleData]
|
||||
let msg = "Yeah!"
|
||||
|
||||
setup:
|
||||
event = newAsyncDataEvent[ExampleData]()
|
||||
|
||||
teardown:
|
||||
await event.unsubscribeAll()
|
||||
|
||||
test "Successful event":
|
||||
var data = ""
|
||||
proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
|
||||
data = e.s
|
||||
success()
|
||||
|
||||
let handle = event.subscribeA(eventHandler)
|
||||
event.subscribe(eventHandler)
|
||||
|
||||
check:
|
||||
isOK(await event.fireA(123))
|
||||
data == 123
|
||||
|
||||
await event.unsubscribeA(handle)
|
||||
isOK(await event.fire(ExampleData(
|
||||
s: msg
|
||||
)))
|
||||
data == msg
|
||||
|
||||
test "Failed event preserves error message":
|
||||
let
|
||||
event = newAsyncDataEvent[int]()
|
||||
msg = "Error message!"
|
||||
|
||||
proc eventHandler(d: int): Future[?!void] {.async.} =
|
||||
proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
|
||||
failure(msg)
|
||||
|
||||
let handle = event.subscribeA(eventHandler)
|
||||
let fireResult = await event.fireA(123)
|
||||
event.subscribe(eventHandler)
|
||||
let fireResult = await event.fire(ExampleData(
|
||||
s: "a"
|
||||
))
|
||||
|
||||
check:
|
||||
fireResult.isErr
|
||||
fireResult.error.msg == msg
|
||||
|
||||
await event.unsubscribeA(handle)
|
||||
test "Emits data to multiple subscribers":
|
||||
var
|
||||
data1 = ""
|
||||
data2 = ""
|
||||
data3 = ""
|
||||
|
||||
proc handler1(e: ExampleData): Future[?!void] {.async.} =
|
||||
data1 = e.s
|
||||
success()
|
||||
proc handler2(e: ExampleData): Future[?!void] {.async.} =
|
||||
data2 = e.s
|
||||
success()
|
||||
proc handler3(e: ExampleData): Future[?!void] {.async.} =
|
||||
data3 = e.s
|
||||
success()
|
||||
|
||||
event.subscribe(handler1)
|
||||
event.subscribe(handler2)
|
||||
event.subscribe(handler3)
|
||||
|
||||
let fireResult = await event.fire(ExampleData(
|
||||
s: msg
|
||||
))
|
||||
|
||||
check:
|
||||
fireResult.isOK
|
||||
data1 == msg
|
||||
data2 == msg
|
||||
data3 == msg
|
||||
|
|
Loading…
Reference in New Issue