Implements and tests nodestore

This commit is contained in:
Ben 2025-02-11 12:42:20 +01:00
parent b6b7624a05
commit 5fa90c5c2f
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
7 changed files with 119 additions and 35 deletions

View File

@ -1,4 +1,5 @@
import std/os import std/os
import pkg/datastore
import pkg/datastore/typedds import pkg/datastore/typedds
import pkg/questionable/results import pkg/questionable/results
import pkg/chronicles import pkg/chronicles
@ -11,13 +12,16 @@ import ../state
import ../utils/datastoreutils import ../utils/datastoreutils
import ../utils/asyncdataevent import ../utils/asyncdataevent
type const
OnNodeId = proc(item: Nid): Future[?!void] {.async: (raises: []), gcsafe.} nodestoreName = "nodestore"
type
NodeEntry* = object NodeEntry* = object
id*: Nid id*: Nid
lastVisit*: uint64 lastVisit*: uint64
OnNodeEntry = proc(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.}
NodeStore* = ref object of Component NodeStore* = ref object of Component
state: State state: State
store: TypedDatastore store: TypedDatastore
@ -55,17 +59,52 @@ proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T =
return success(NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64)) return success(NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64))
return NodeEntry.fromBytes(bytes) return NodeEntry.fromBytes(bytes)
proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
without key =? Key.init(nodestoreName / $nid), err:
return failure(err)
without exists =? (await s.store.has(key)), err:
return failure(err)
if not exists:
let entry = NodeEntry(
id: nid,
lastVisit: 0
)
?await s.store.put(key, entry)
return success(not exists)
proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
await s.state.events.newNodesDiscovered.fire(nids)
proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
# put the nodes in the store. var newNodes = newSeq[Nid]()
# track all new ones, if any, raise newNodes event.
for nid in nids:
without isNew =? (await s.storeNodeIsNew(nid)), err:
return failure(err)
if isNew:
newNodes.add(nid)
if newNodes.len > 0:
? await s.fireNewNodesDiscovered(newNodes)
return success() return success()
proc iterateAll*(s: NodeStore, onNodeId: OnNodeId) {.async.} = proc iterateAll*(s: NodeStore, onNode: OnNodeEntry): Future[?!void] {.async.} =
discard without queryKey =? Key.init(nodestoreName), err:
# query iterator, yield items to callback. return failure(err)
# for item in this.items: without iter =? (await query[NodeEntry](s.store, Query.init(queryKey))), err:
# onItem(item) return failure(err)
# await sleepAsync(1.millis)
while not iter.finished:
without item =? (await iter.next()), err:
return failure(err)
without value =? item.value, err:
return failure(err)
?await onNode(value)
return success()
method start*(s: NodeStore): Future[?!void] {.async.} = method start*(s: NodeStore): Future[?!void] {.async.} =
info "Starting nodestore..." info "Starting nodestore..."

View File

@ -43,10 +43,6 @@ proc saveItem(this: List, item: Nid): Future[?!void] {.async.} =
return success() return success()
proc load*(this: List): Future[?!void] {.async.} = proc load*(this: List): Future[?!void] {.async.} =
let id = Nid.fromStr("0")
let bytes = newSeq[byte]()
let ne = Nid.fromBytes(bytes)
without queryKey =? Key.init(this.name), err: without queryKey =? Key.init(this.name), err:
return failure(err) return failure(err)
without iter =? (await query[Nid](this.store, Query.init(queryKey))), err: without iter =? (await query[Nid](this.store, Query.init(queryKey))), err:

View File

@ -5,10 +5,11 @@ import pkg/chronos
type type
AsyncDataEventSubscription* = ref object AsyncDataEventSubscription* = ref object
key: EventQueueKey key: EventQueueKey
isRunning: bool listenFuture: Future[void]
fireEvent: AsyncEvent fireEvent: AsyncEvent
stopEvent: AsyncEvent
lastResult: ?!void lastResult: ?!void
inHandler: bool
delayedUnsubscribe: bool
AsyncDataEvent*[T] = ref object AsyncDataEvent*[T] = ref object
queue: AsyncEventQueue[?T] queue: AsyncEventQueue[?T]
@ -21,47 +22,64 @@ proc newAsyncDataEvent*[T](): AsyncDataEvent[T] =
queue: newAsyncEventQueue[?T](), subscriptions: newSeq[AsyncDataEventSubscription]() queue: newAsyncEventQueue[?T](), subscriptions: newSeq[AsyncDataEventSubscription]()
) )
proc performUnsubscribe[T](event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription) {.async.} =
if subscription in event.subscriptions:
await subscription.listenFuture.cancelAndWait()
event.subscriptions.delete(event.subscriptions.find(subscription))
proc subscribe*[T]( proc subscribe*[T](
event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T] event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]
): AsyncDataEventSubscription = ): AsyncDataEventSubscription =
let subscription = AsyncDataEventSubscription( var subscription = AsyncDataEventSubscription(
key: event.queue.register(), key: event.queue.register(),
isRunning: true, listenFuture: newFuture[void](),
fireEvent: newAsyncEvent(), fireEvent: newAsyncEvent(),
stopEvent: newAsyncEvent(), inHandler: false,
delayedUnsubscribe: false
) )
proc listener() {.async.} = proc listener() {.async.} =
while subscription.isRunning: while true:
let items = await event.queue.waitEvents(subscription.key) let items = await event.queue.waitEvents(subscription.key)
for item in items: for item in items:
if data =? item: if data =? item:
subscription.inHandler = true
subscription.lastResult = (await handler(data)) subscription.lastResult = (await handler(data))
subscription.inHandler = false
subscription.fireEvent.fire() subscription.fireEvent.fire()
subscription.stopEvent.fire()
asyncSpawn listener() subscription.listenFuture = listener()
event.subscriptions.add(subscription) event.subscriptions.add(subscription)
subscription subscription
proc fire*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} = proc fire*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} =
event.queue.emit(data.some) event.queue.emit(data.some)
for subscription in event.subscriptions: var toUnsubscribe = newSeq[AsyncDataEventSubscription]()
await subscription.fireEvent.wait() for sub in event.subscriptions:
if err =? subscription.lastResult.errorOption: await sub.fireEvent.wait()
if err =? sub.lastResult.errorOption:
return failure(err) return failure(err)
if sub.delayedUnsubscribe:
toUnsubscribe.add(sub)
for sub in toUnsubscribe:
await event.unsubscribe(sub)
success() success()
proc unsubscribe*[T]( proc unsubscribe*[T](
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
) {.async.} = ) {.async.} =
subscription.isRunning = false if subscription.inHandler:
event.queue.emit(T.none) subscription.delayedUnsubscribe = true
await subscription.stopEvent.wait() else:
event.subscriptions.delete(event.subscriptions.find(subscription)) await event.performUnsubscribe(subscription)
proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async.} = proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async.} =
let all = event.subscriptions let all = event.subscriptions
for subscription in all: for subscription in all:
await event.unsubscribe(subscription) await event.unsubscribe(subscription)
proc listeners*[T](event: AsyncDataEvent[T]): int =
event.subscriptions.len

View File

@ -29,9 +29,12 @@ suite "Nodestore":
state, ds state, ds
) )
(await store.start()).tryGet()
teardown: teardown:
(await store.stop()).tryGet()
(await ds.close()).tryGet() (await ds.close()).tryGet()
# state.cleanupMock() state.checkAllUnsubscribed()
removeDir(dsPath) removeDir(dsPath)
test "nodeEntry encoding": test "nodeEntry encoding":
@ -115,11 +118,11 @@ suite "Nodestore":
(await state.events.nodesFound.fire(@[nid1, nid2, nid3])).tryGet() (await state.events.nodesFound.fire(@[nid1, nid2, nid3])).tryGet()
var iterNodes = newSeq[Nid]() var iterNodes = newSeq[Nid]()
proc onNodeId(nid: Nid): Future[?!void] {.async: (raises: []), gcsafe.} = proc onNode(entry: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
iterNodes.add(nid) iterNodes.add(entry.id)
return success() return success()
await store.iterateAll(onNodeId) (await store.iterateAll(onNode)).tryGet()
check: check:
nid1 in iterNodes nid1 in iterNodes

View File

@ -1,3 +1,4 @@
import pkg/asynctest/chronos/unittest
import ../../codexcrawler/state import ../../codexcrawler/state
import ../../codexcrawler/utils/asyncdataevent import ../../codexcrawler/utils/asyncdataevent
import ../../codexcrawler/types import ../../codexcrawler/types
@ -20,5 +21,9 @@ proc createMockState*(): MockState =
), ),
) )
proc cleanupMock*(this: MockState) = proc checkAllUnsubscribed*(this: MockState) =
discard check:
this.events.nodesFound.listeners == 0
this.events.newNodesDiscovered.listeners == 0
this.events.dhtNodeCheck.listeners == 0
this.events.nodesExpired.listeners == 0

View File

@ -79,3 +79,25 @@ suite "AsyncDataEvent":
await event.unsubscribe(s1) await event.unsubscribe(s1)
await event.unsubscribe(s2) await event.unsubscribe(s2)
await event.unsubscribe(s3) await event.unsubscribe(s3)
test "Can unsubscribe in handler":
proc doNothing() {.async, closure.} =
await sleepAsync(1.millis)
var callback = doNothing
proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
await callback()
success()
let s = event.subscribe(eventHandler)
proc doUnsubscribe() {.async.} =
await event.unsubscribe(s)
callback = doUnsubscribe
check:
isOK(await event.fire(ExampleData(s: msg)))
await event.unsubscribe(s)

1
tests/config.nims Normal file
View File

@ -0,0 +1 @@
switch("define", "chronicles_log_level=ERROR")