From 5fa90c5c2fb546b37423039c3fdbb4b1bc06d0ce Mon Sep 17 00:00:00 2001 From: Ben Date: Tue, 11 Feb 2025 12:42:20 +0100 Subject: [PATCH] Implements and tests nodestore --- codexcrawler/components/nodestore.nim | 59 +++++++++++++++---- codexcrawler/list.nim | 4 -- codexcrawler/utils/asyncdataevent.nim | 48 ++++++++++----- .../codexcrawler/components/testnodestore.nim | 11 ++-- tests/codexcrawler/mockstate.nim | 9 ++- .../codexcrawler/utils/testasyncdataevent.nim | 22 +++++++ tests/config.nims | 1 + 7 files changed, 119 insertions(+), 35 deletions(-) create mode 100644 tests/config.nims diff --git a/codexcrawler/components/nodestore.nim b/codexcrawler/components/nodestore.nim index 1eeb0d8..37ccb7e 100644 --- a/codexcrawler/components/nodestore.nim +++ b/codexcrawler/components/nodestore.nim @@ -1,4 +1,5 @@ import std/os +import pkg/datastore import pkg/datastore/typedds import pkg/questionable/results import pkg/chronicles @@ -11,13 +12,16 @@ import ../state import ../utils/datastoreutils import ../utils/asyncdataevent -type - OnNodeId = proc(item: Nid): Future[?!void] {.async: (raises: []), gcsafe.} +const + nodestoreName = "nodestore" +type NodeEntry* = object id*: Nid lastVisit*: uint64 + OnNodeEntry = proc(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} + NodeStore* = ref object of Component state: State store: TypedDatastore @@ -55,18 +59,53 @@ proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T = return success(NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64)) return NodeEntry.fromBytes(bytes) +proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} = + without key =? Key.init(nodestoreName / $nid), err: + return failure(err) + without exists =? (await s.store.has(key)), err: + return failure(err) + + if not exists: + let entry = NodeEntry( + id: nid, + lastVisit: 0 + ) + ?await s.store.put(key, entry) + + return success(not exists) + +proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = + await s.state.events.newNodesDiscovered.fire(nids) + proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = - # put the nodes in the store. - # track all new ones, if any, raise newNodes event. + var newNodes = newSeq[Nid]() + + for nid in nids: + without isNew =? (await s.storeNodeIsNew(nid)), err: + return failure(err) + + if isNew: + newNodes.add(nid) + + if newNodes.len > 0: + ? await s.fireNewNodesDiscovered(newNodes) return success() -proc iterateAll*(s: NodeStore, onNodeId: OnNodeId) {.async.} = - discard - # query iterator, yield items to callback. - # for item in this.items: - # onItem(item) - # await sleepAsync(1.millis) +proc iterateAll*(s: NodeStore, onNode: OnNodeEntry): Future[?!void] {.async.} = + without queryKey =? Key.init(nodestoreName), err: + return failure(err) + without iter =? (await query[NodeEntry](s.store, Query.init(queryKey))), err: + return failure(err) + while not iter.finished: + without item =? (await iter.next()), err: + return failure(err) + without value =? item.value, err: + return failure(err) + + ?await onNode(value) + return success() + method start*(s: NodeStore): Future[?!void] {.async.} = info "Starting nodestore..." diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index 638042b..e7cc444 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -43,10 +43,6 @@ proc saveItem(this: List, item: Nid): Future[?!void] {.async.} = return success() proc load*(this: List): Future[?!void] {.async.} = - let id = Nid.fromStr("0") - let bytes = newSeq[byte]() - let ne = Nid.fromBytes(bytes) - without queryKey =? Key.init(this.name), err: return failure(err) without iter =? (await query[Nid](this.store, Query.init(queryKey))), err: diff --git a/codexcrawler/utils/asyncdataevent.nim b/codexcrawler/utils/asyncdataevent.nim index cb2ec75..86aec1d 100644 --- a/codexcrawler/utils/asyncdataevent.nim +++ b/codexcrawler/utils/asyncdataevent.nim @@ -5,10 +5,11 @@ import pkg/chronos type AsyncDataEventSubscription* = ref object key: EventQueueKey - isRunning: bool + listenFuture: Future[void] fireEvent: AsyncEvent - stopEvent: AsyncEvent lastResult: ?!void + inHandler: bool + delayedUnsubscribe: bool AsyncDataEvent*[T] = ref object queue: AsyncEventQueue[?T] @@ -21,47 +22,64 @@ proc newAsyncDataEvent*[T](): AsyncDataEvent[T] = queue: newAsyncEventQueue[?T](), subscriptions: newSeq[AsyncDataEventSubscription]() ) +proc performUnsubscribe[T](event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription) {.async.} = + if subscription in event.subscriptions: + await subscription.listenFuture.cancelAndWait() + event.subscriptions.delete(event.subscriptions.find(subscription)) + proc subscribe*[T]( event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T] ): AsyncDataEventSubscription = - let subscription = AsyncDataEventSubscription( + var subscription = AsyncDataEventSubscription( key: event.queue.register(), - isRunning: true, + listenFuture: newFuture[void](), fireEvent: newAsyncEvent(), - stopEvent: newAsyncEvent(), + inHandler: false, + delayedUnsubscribe: false ) proc listener() {.async.} = - while subscription.isRunning: + while true: let items = await event.queue.waitEvents(subscription.key) for item in items: if data =? item: + subscription.inHandler = true subscription.lastResult = (await handler(data)) + subscription.inHandler = false subscription.fireEvent.fire() - subscription.stopEvent.fire() - asyncSpawn listener() + subscription.listenFuture = listener() event.subscriptions.add(subscription) subscription proc fire*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} = event.queue.emit(data.some) - for subscription in event.subscriptions: - await subscription.fireEvent.wait() - if err =? subscription.lastResult.errorOption: + var toUnsubscribe = newSeq[AsyncDataEventSubscription]() + for sub in event.subscriptions: + await sub.fireEvent.wait() + if err =? sub.lastResult.errorOption: return failure(err) + if sub.delayedUnsubscribe: + toUnsubscribe.add(sub) + + for sub in toUnsubscribe: + await event.unsubscribe(sub) + success() proc unsubscribe*[T]( event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription ) {.async.} = - subscription.isRunning = false - event.queue.emit(T.none) - await subscription.stopEvent.wait() - event.subscriptions.delete(event.subscriptions.find(subscription)) + if subscription.inHandler: + subscription.delayedUnsubscribe = true + else: + await event.performUnsubscribe(subscription) proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async.} = let all = event.subscriptions for subscription in all: await event.unsubscribe(subscription) + +proc listeners*[T](event: AsyncDataEvent[T]): int = + event.subscriptions.len diff --git a/tests/codexcrawler/components/testnodestore.nim b/tests/codexcrawler/components/testnodestore.nim index 2034e9f..554a966 100644 --- a/tests/codexcrawler/components/testnodestore.nim +++ b/tests/codexcrawler/components/testnodestore.nim @@ -29,9 +29,12 @@ suite "Nodestore": state, ds ) + (await store.start()).tryGet() + teardown: + (await store.stop()).tryGet() (await ds.close()).tryGet() - # state.cleanupMock() + state.checkAllUnsubscribed() removeDir(dsPath) test "nodeEntry encoding": @@ -115,11 +118,11 @@ suite "Nodestore": (await state.events.nodesFound.fire(@[nid1, nid2, nid3])).tryGet() var iterNodes = newSeq[Nid]() - proc onNodeId(nid: Nid): Future[?!void] {.async: (raises: []), gcsafe.} = - iterNodes.add(nid) + proc onNode(entry: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} = + iterNodes.add(entry.id) return success() - await store.iterateAll(onNodeId) + (await store.iterateAll(onNode)).tryGet() check: nid1 in iterNodes diff --git a/tests/codexcrawler/mockstate.nim b/tests/codexcrawler/mockstate.nim index af302b7..1974069 100644 --- a/tests/codexcrawler/mockstate.nim +++ b/tests/codexcrawler/mockstate.nim @@ -1,3 +1,4 @@ +import pkg/asynctest/chronos/unittest import ../../codexcrawler/state import ../../codexcrawler/utils/asyncdataevent import ../../codexcrawler/types @@ -20,5 +21,9 @@ proc createMockState*(): MockState = ), ) -proc cleanupMock*(this: MockState) = - discard +proc checkAllUnsubscribed*(this: MockState) = + check: + this.events.nodesFound.listeners == 0 + this.events.newNodesDiscovered.listeners == 0 + this.events.dhtNodeCheck.listeners == 0 + this.events.nodesExpired.listeners == 0 diff --git a/tests/codexcrawler/utils/testasyncdataevent.nim b/tests/codexcrawler/utils/testasyncdataevent.nim index 7e47a7b..8c1efd8 100644 --- a/tests/codexcrawler/utils/testasyncdataevent.nim +++ b/tests/codexcrawler/utils/testasyncdataevent.nim @@ -79,3 +79,25 @@ suite "AsyncDataEvent": await event.unsubscribe(s1) await event.unsubscribe(s2) await event.unsubscribe(s3) + + test "Can unsubscribe in handler": + proc doNothing() {.async, closure.} = + await sleepAsync(1.millis) + + var callback = doNothing + + proc eventHandler(e: ExampleData): Future[?!void] {.async.} = + await callback() + success() + + let s = event.subscribe(eventHandler) + + proc doUnsubscribe() {.async.} = + await event.unsubscribe(s) + + callback = doUnsubscribe + + check: + isOK(await event.fire(ExampleData(s: msg))) + + await event.unsubscribe(s) diff --git a/tests/config.nims b/tests/config.nims new file mode 100644 index 0000000..da6438a --- /dev/null +++ b/tests/config.nims @@ -0,0 +1 @@ +switch("define", "chronicles_log_level=ERROR")