implements expiry and delete for old unresponsive nodes

This commit is contained in:
thatben 2025-02-13 14:55:45 +01:00
parent 88197d8748
commit 20896cae3c
No known key found for this signature in database
GPG Key ID: 62C543548433D43E
16 changed files with 313 additions and 74 deletions

View File

@ -26,7 +26,7 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
nodesFound: newAsyncDataEvent[seq[Nid]](), nodesFound: newAsyncDataEvent[seq[Nid]](),
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](),
nodesExpired: newAsyncDataEvent[seq[Nid]](), nodesToRevisit: newAsyncDataEvent[seq[Nid]](),
), ),
) )

View File

@ -24,20 +24,24 @@ proc raiseCheckEvent(
): Future[?!void] {.async: (raises: []).} = ): Future[?!void] {.async: (raises: []).} =
let event = DhtNodeCheckEventData(id: nid, isOk: success) let event = DhtNodeCheckEventData(id: nid, isOk: success)
if err =? (await c.state.events.dhtNodeCheck.fire(event)).errorOption: if err =? (await c.state.events.dhtNodeCheck.fire(event)).errorOption:
error "failed to raise check event", err = err.msg
return failure(err) return failure(err)
return success() return success()
proc step(c: Crawler): Future[?!void] {.async: (raises: []).} = proc step(c: Crawler): Future[?!void] {.async: (raises: []).} =
without nid =? (await c.todo.pop()), err: without nid =? (await c.todo.pop()), err:
error "failed to pop todolist", err = err.msg
return failure(err) return failure(err)
without response =? await c.dht.getNeighbors(nid), err: without response =? await c.dht.getNeighbors(nid), err:
error "failed to get neighbors", err = err.msg
return failure(err) return failure(err)
if err =? (await c.raiseCheckEvent(nid, response.isResponsive)).errorOption: if err =? (await c.raiseCheckEvent(nid, response.isResponsive)).errorOption:
return failure(err) return failure(err)
if err =? (await c.state.events.nodesFound.fire(response.nodeIds)).errorOption: if err =? (await c.state.events.nodesFound.fire(response.nodeIds)).errorOption:
error "failed to raise nodesFound event", err = err.msg
return failure(err) return failure(err)
return success() return success()

View File

@ -22,6 +22,7 @@ type
NodeEntry* = object NodeEntry* = object
id*: Nid id*: Nid
lastVisit*: uint64 lastVisit*: uint64
firstInactive*: uint64
OnNodeEntry* = proc(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} OnNodeEntry* = proc(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.}
@ -33,12 +34,13 @@ type
subCheck: AsyncDataEventSubscription subCheck: AsyncDataEventSubscription
proc `$`*(entry: NodeEntry): string = proc `$`*(entry: NodeEntry): string =
$entry.id & ":" & $entry.lastVisit $entry.id & ":" & $entry.lastVisit & " " & $entry.firstInactive
proc toBytes*(entry: NodeEntry): seq[byte] = proc toBytes*(entry: NodeEntry): seq[byte] =
var buffer = initProtoBuffer() var buffer = initProtoBuffer()
buffer.write(1, $entry.id) buffer.write(1, $entry.id)
buffer.write(2, entry.lastVisit) buffer.write(2, entry.lastVisit)
buffer.write(3, entry.firstInactive)
buffer.finish() buffer.finish()
return buffer.buffer return buffer.buffer
@ -47,6 +49,7 @@ proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry =
buffer = initProtoBuffer(data) buffer = initProtoBuffer(data)
idStr: string idStr: string
lastVisit: uint64 lastVisit: uint64
firstInactive: uint64
if buffer.getField(1, idStr).isErr: if buffer.getField(1, idStr).isErr:
return failure("Unable to decode `idStr`") return failure("Unable to decode `idStr`")
@ -54,26 +57,36 @@ proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry =
if buffer.getField(2, lastVisit).isErr: if buffer.getField(2, lastVisit).isErr:
return failure("Unable to decode `lastVisit`") return failure("Unable to decode `lastVisit`")
return success(NodeEntry(id: Nid.fromStr(idStr), lastVisit: lastVisit)) if buffer.getField(3, firstInactive).isErr:
return failure("Unable to decode `firstInactive`")
return success(
NodeEntry(
id: Nid.fromStr(idStr), lastVisit: lastVisit, firstInactive: firstInactive
)
)
proc encode*(e: NodeEntry): seq[byte] = proc encode*(e: NodeEntry): seq[byte] =
e.toBytes() e.toBytes()
proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T = proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T =
if bytes.len < 1: if bytes.len < 1:
return success(NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64)) return success(
NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64, firstInactive: 0.uint64)
)
return NodeEntry.fromBytes(bytes) return NodeEntry.fromBytes(bytes)
proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} = proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
without key =? Key.init(nodestoreName / $nid), err: without key =? Key.init(nodestoreName / $nid), err:
error "failed to format key", err = err.msg
return failure(err) return failure(err)
without exists =? (await s.store.has(key)), err: without exists =? (await s.store.has(key)), err:
error "failed to check store for key", err = err.msg
return failure(err) return failure(err)
if not exists: if not exists:
let entry = NodeEntry(id: nid, lastVisit: 0) let entry = NodeEntry(id: nid, lastVisit: 0, firstInactive: 0)
?await s.store.put(key, entry) ?await s.store.put(key, entry)
info "New node", nodeId = $nid info "New node", nodeId = $nid
return success(not exists) return success(not exists)
@ -94,39 +107,79 @@ proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
?await s.fireNewNodesDiscovered(newNodes) ?await s.fireNewNodesDiscovered(newNodes)
return success() return success()
proc updateLastVisit(s: NodeStore, nid: Nid): Future[?!void] {.async.} = proc processNodeCheck(
without key =? Key.init(nodestoreName / $nid), err: s: NodeStore, event: DhtNodeCheckEventData
): Future[?!void] {.async.} =
without key =? Key.init(nodestoreName / $(event.id)), err:
error "failed to format key", err = err.msg
return failure(err) return failure(err)
without var entry =? (await get[NodeEntry](s.store, key)), err: without var entry =? (await get[NodeEntry](s.store, key)), err:
error "failed to get entry for key", err = err.msg, key = $key
return failure(err) return failure(err)
entry.lastVisit = s.clock.now() entry.lastVisit = s.clock.now()
if event.isOk and entry.firstInactive > 0:
entry.firstInactive = 0
elif not event.isOk and entry.firstInactive == 0:
entry.firstInactive = s.clock.now()
?await s.store.put(key, entry) ?await s.store.put(key, entry)
return success() return success()
proc deleteEntry(s: NodeStore, nid: Nid): Future[?!void] {.async.} =
without key =? Key.init(nodestoreName / $nid), err:
error "failed to format key", err = err.msg
return failure(err)
without exists =? (await s.store.has(key)), err:
error "failed to check store for key", err = err.msg, key = $key
return failure(err)
if exists:
?await s.store.delete(key)
return success()
method iterateAll*( method iterateAll*(
s: NodeStore, onNode: OnNodeEntry s: NodeStore, onNode: OnNodeEntry
): Future[?!void] {.async: (raises: []), base.} = ): Future[?!void] {.async: (raises: []), base.} =
without queryKey =? Key.init(nodestoreName), err: without queryKey =? Key.init(nodestoreName), err:
error "failed to format key", err = err.msg
return failure(err) return failure(err)
try: try:
without iter =? (await query[NodeEntry](s.store, Query.init(queryKey))), err: without iter =? (await query[NodeEntry](s.store, Query.init(queryKey))), err:
error "failed to create query", err = err.msg
return failure(err) return failure(err)
while not iter.finished: while not iter.finished:
without item =? (await iter.next()), err: without item =? (await iter.next()), err:
error "failure during query iteration", err = err.msg
return failure(err) return failure(err)
without value =? item.value, err: without value =? item.value, err:
error "failed to get value from iterator", err = err.msg
return failure(err) return failure(err)
?await onNode(value) if $(value.id) == "0" and value.lastVisit == 0 and value.firstInactive == 0:
# iterator stop entry
discard
else:
?await onNode(value)
await sleepAsync(1.millis)
except CatchableError as exc: except CatchableError as exc:
return failure(exc.msg) return failure(exc.msg)
return success() return success()
method deleteEntries*(
s: NodeStore, nids: seq[Nid]
): Future[?!void] {.async: (raises: []), base.} =
for nid in nids:
try:
?await s.deleteEntry(nid)
except CatchableError as exc:
return failure(exc.msg)
return success()
method start*(s: NodeStore): Future[?!void] {.async.} = method start*(s: NodeStore): Future[?!void] {.async.} =
info "Starting..." info "Starting..."
@ -134,7 +187,7 @@ method start*(s: NodeStore): Future[?!void] {.async.} =
return await s.processFoundNodes(nids) return await s.processFoundNodes(nids)
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
return await s.updateLastVisit(event.id) return await s.processNodeCheck(event)
s.subFound = s.state.events.nodesFound.subscribe(onNodesFound) s.subFound = s.state.events.nodesFound.subscribe(onNodesFound)
s.subCheck = s.state.events.dhtNodeCheck.subscribe(onCheck) s.subCheck = s.state.events.dhtNodeCheck.subscribe(onCheck)

View File

@ -19,20 +19,31 @@ type TimeTracker* = ref object of Component
dht: Dht dht: Dht
clock: Clock clock: Clock
proc checkForExpiredNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} = proc checkRevisitsAndExpiry(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
let expiry = t.clock.now() - (t.state.config.expiryDelayMins * 60).uint64 let
revisitThreshold = t.clock.now() - (t.state.config.revisitDelayMins * 60).uint64
expiryThreshold = t.clock.now() - (t.state.config.expiryDelayMins * 60).uint64
var
toRevisit = newSeq[Nid]()
toDelete = newSeq[Nid]()
var expired = newSeq[Nid]()
proc checkNode(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} = proc checkNode(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
if item.lastVisit < expiry: if item.lastVisit < revisitThreshold:
expired.add(item.id) toRevisit.add(item.id)
if item.firstInactive > 0 and item.firstInactive < expiryThreshold:
toDelete.add(item.id)
return success() return success()
?await t.nodestore.iterateAll(checkNode) ?await t.nodestore.iterateAll(checkNode)
if expired.len > 0: if toRevisit.len > 0:
trace "Found expired nodes", expired = expired.len trace "Found nodes to revisit", toRevisit = toRevisit.len
?await t.state.events.nodesExpired.fire(expired) ?await t.state.events.nodesToRevisit.fire(toRevisit)
if toDelete.len > 0:
trace "Found expired node records to delete", toDelete = toDelete.len
?await t.nodestore.deleteEntries(toDelete)
return success() return success()
@ -41,19 +52,22 @@ proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: []
trace "Raising routing table nodes", nodes = nids.len trace "Raising routing table nodes", nodes = nids.len
if err =? (await t.state.events.nodesFound.fire(nids)).errorOption: if err =? (await t.state.events.nodesFound.fire(nids)).errorOption:
error "failed to raise nodesFound event", err = err.msg
return failure(err) return failure(err)
return success() return success()
method start*(t: TimeTracker): Future[?!void] {.async.} = method start*(t: TimeTracker): Future[?!void] {.async.} =
info "Starting..." info "Starting..."
proc onCheckExpiry(): Future[?!void] {.async: (raises: []), gcsafe.} = proc onCheckRevisitAndExpiry(): Future[?!void] {.async: (raises: []), gcsafe.} =
await t.checkForExpiredNodes() await t.checkRevisitsAndExpiry()
proc onRoutingTable(): Future[?!void] {.async: (raises: []), gcsafe.} = proc onRoutingTable(): Future[?!void] {.async: (raises: []), gcsafe.} =
await t.raiseRoutingTableNodes() await t.raiseRoutingTableNodes()
await t.state.whileRunning(onCheckExpiry, t.state.config.checkDelayMins.minutes) await t.state.whileRunning(
onCheckRevisitAndExpiry, t.state.config.checkDelayMins.minutes
)
await t.state.whileRunning(onRoutingTable, 30.minutes) await t.state.whileRunning(onRoutingTable, 30.minutes)
return success() return success()

View File

@ -20,7 +20,7 @@ type TodoList* = ref object of Component
nids: seq[Nid] nids: seq[Nid]
state: State state: State
subNew: AsyncDataEventSubscription subNew: AsyncDataEventSubscription
subExp: AsyncDataEventSubscription subRev: AsyncDataEventSubscription
emptySignal: ?Future[void] emptySignal: ?Future[void]
metrics: Metrics metrics: Metrics
@ -62,12 +62,12 @@ method start*(t: TodoList): Future[?!void] {.async.} =
return success() return success()
t.subNew = t.state.events.newNodesDiscovered.subscribe(onNewNodes) t.subNew = t.state.events.newNodesDiscovered.subscribe(onNewNodes)
t.subExp = t.state.events.nodesExpired.subscribe(onNewNodes) t.subRev = t.state.events.nodesToRevisit.subscribe(onNewNodes)
return success() return success()
method stop*(t: TodoList): Future[?!void] {.async.} = method stop*(t: TodoList): Future[?!void] {.async.} =
await t.state.events.newNodesDiscovered.unsubscribe(t.subNew) await t.state.events.newNodesDiscovered.unsubscribe(t.subNew)
await t.state.events.nodesExpired.unsubscribe(t.subExp) await t.state.events.nodesToRevisit.unsubscribe(t.subRev)
return success() return success()
proc new*(_: type TodoList, state: State, metrics: Metrics): TodoList = proc new*(_: type TodoList, state: State, metrics: Metrics): TodoList =

View File

@ -10,7 +10,7 @@ let doc =
Codex Network Crawler. Generates network metrics. Codex Network Crawler. Generates network metrics.
Usage: Usage:
codexcrawler [--logLevel=<l>] [--publicIp=<a>] [--metricsAddress=<ip>] [--metricsPort=<p>] [--dataDir=<dir>] [--discoveryPort=<p>] [--bootNodes=<n>] [--stepDelay=<ms>] [--revisitDelay=<m>] codexcrawler [--logLevel=<l>] [--publicIp=<a>] [--metricsAddress=<ip>] [--metricsPort=<p>] [--dataDir=<dir>] [--discoveryPort=<p>] [--bootNodes=<n>] [--stepDelay=<ms>] [--revisitDelay=<m>] [--checkDelay=<m>] [--expiryDelay=<m>]
Options: Options:
--logLevel=<l> Sets log level [default: INFO] --logLevel=<l> Sets log level [default: INFO]
@ -20,9 +20,10 @@ Options:
--dataDir=<dir> Directory for storing data [default: crawler_data] --dataDir=<dir> Directory for storing data [default: crawler_data]
--discoveryPort=<p> Port used for DHT [default: 8090] --discoveryPort=<p> Port used for DHT [default: 8090]
--bootNodes=<n> Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs] --bootNodes=<n> Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs]
--stepDelay=<ms> Delay in milliseconds per node visit [default: 100] --stepDelay=<ms> Delay in milliseconds per node visit [default: 1000]
--expiryDelay=<m> Delay in minutes after which a node can be revisited [default: 60] --revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 60]
--checkDelay=<m> Delay with which the 'expiryDelay' is checked for all known nodes [default: 10] --checkDelay=<m> Delay with which the 'revisitDelay' is checked for all known nodes [default: 10]
--expiryDelay=<m> Delay in minutes after which unresponsive nodes are discarded [default: 1440] (24h)
""" """
import strutils import strutils
@ -37,16 +38,17 @@ type Config* = ref object
discPort*: Port discPort*: Port
bootNodes*: seq[SignedPeerRecord] bootNodes*: seq[SignedPeerRecord]
stepDelayMs*: int stepDelayMs*: int
expiryDelayMins*: int revisitDelayMins*: int
checkDelayMins*: int checkDelayMins*: int
expiryDelayMins*: int
proc `$`*(config: Config): string = proc `$`*(config: Config): string =
"Crawler:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp & "Crawler:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp &
" metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort & " metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort &
" dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" & " dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" &
config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs & config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs &
" expiryDelayMins=" & $config.expiryDelayMins & " checkDelayMins=" & " revisitDelayMins=" & $config.revisitDelayMins & " expiryDelayMins=" &
$config.checkDelayMins $config.expiryDelayMins & " checkDelayMins=" & $config.checkDelayMins
proc getDefaultTestnetBootNodes(): seq[string] = proc getDefaultTestnetBootNodes(): seq[string] =
@[ @[
@ -99,6 +101,7 @@ proc parseConfig*(): Config =
discPort: Port(parseInt(get("--discoveryPort"))), discPort: Port(parseInt(get("--discoveryPort"))),
bootNodes: getBootNodes(get("--bootNodes")), bootNodes: getBootNodes(get("--bootNodes")),
stepDelayMs: parseInt(get("--stepDelay")), stepDelayMs: parseInt(get("--stepDelay")),
expiryDelayMins: parseInt(get("--expiryDelay")), revisitDelayMins: parseInt(get("--revisitDelay")),
checkDelayMins: parseInt(get("--checkDelay")), checkDelayMins: parseInt(get("--checkDelay")),
expiryDelayMins: parseInt(get("--expiryDelay")),
) )

View File

@ -20,7 +20,7 @@ type
nodesFound*: AsyncDataEvent[seq[Nid]] nodesFound*: AsyncDataEvent[seq[Nid]]
newNodesDiscovered*: AsyncDataEvent[seq[Nid]] newNodesDiscovered*: AsyncDataEvent[seq[Nid]]
dhtNodeCheck*: AsyncDataEvent[DhtNodeCheckEventData] dhtNodeCheck*: AsyncDataEvent[DhtNodeCheckEventData]
nodesExpired*: AsyncDataEvent[seq[Nid]] nodesToRevisit*: AsyncDataEvent[seq[Nid]]
ApplicationStatus* {.pure.} = enum ApplicationStatus* {.pure.} = enum
Stopped Stopped

View File

@ -63,6 +63,7 @@ proc fire*[T](
for sub in event.subscriptions: for sub in event.subscriptions:
try: try:
await sub.fireEvent.wait() await sub.fireEvent.wait()
sub.fireEvent.clear()
except CancelledError: except CancelledError:
discard discard
if err =? sub.lastResult.errorOption: if err =? sub.lastResult.errorOption:

View File

@ -10,11 +10,12 @@ DATADIR=${CRAWLER_DATADIR:-crawler_data}
DISCPORT=${CRAWLER_DISCPORT:-8090} DISCPORT=${CRAWLER_DISCPORT:-8090}
BOOTNODES=${CRAWLER_BOOTNODES:-testnet_sprs} BOOTNODES=${CRAWLER_BOOTNODES:-testnet_sprs}
STEPDELAY=${CRAWLER_STEPDELAY:-1000} STEPDELAY=${CRAWLER_STEPDELAY:-1000}
REVISITDELAY=${CRAWLER_REVISITDELAY:-60}
CHECKDELAY=${CRAWLER_CHECKDELAY:-10} CHECKDELAY=${CRAWLER_CHECKDELAY:-10}
EXPIRYDELAY=${CRAWLER_EXPIRYDELAY:-60} EXPIRYDELAY=${CRAWLER_EXPIRYDELAY:-1440}
# Update CLI arguments # Update CLI arguments
set -- "$@" --logLevel="${LOGLEVEL}" --publicIp="${PUBLICIP}" --metricsAddress="${METRICSADDRESS}" --metricsPort="${METRICSPORT}" --dataDir="${DATADIR}" --discoveryPort="${DISCPORT}" --bootNodes="${BOOTNODES}" --stepDelay="${STEPDELAY}" --expiryDelay="${EXPIRYDELAY}" --checkDelay="${CHECKDELAY}" set -- "$@" --logLevel="${LOGLEVEL}" --publicIp="${PUBLICIP}" --metricsAddress="${METRICSADDRESS}" --metricsPort="${METRICSPORT}" --dataDir="${DATADIR}" --discoveryPort="${DISCPORT}" --bootNodes="${BOOTNODES}" --stepDelay="${STEPDELAY}" --revisitDelay="${REVISITDELAY}" --expiryDelay="${EXPIRYDELAY}" --checkDelay="${CHECKDELAY}"
# Run # Run
echo "Run Codex Crawler" echo "Run Codex Crawler"

View File

@ -39,8 +39,15 @@ suite "Nodestore":
state.checkAllUnsubscribed() state.checkAllUnsubscribed()
removeDir(dsPath) removeDir(dsPath)
proc fireNodeFoundEvent(nids: seq[Nid]) {.async.} =
(await state.events.nodesFound.fire(nids)).tryGet()
proc fireCheckEvent(nid: Nid, isOk: bool) {.async.} =
(await state.events.dhtNodeCheck.fire(DhtNodeCheckEventData(id: nid, isOk: isOk))).tryGet()
test "nodeEntry encoding": test "nodeEntry encoding":
let entry = NodeEntry(id: genNid(), lastVisit: 123.uint64) let entry =
NodeEntry(id: genNid(), lastVisit: 123.uint64, firstInactive: 234.uint64)
let let
bytes = entry.encode() bytes = entry.encode()
@ -49,13 +56,14 @@ suite "Nodestore":
check: check:
entry.id == decoded.id entry.id == decoded.id
entry.lastVisit == decoded.lastVisit entry.lastVisit == decoded.lastVisit
entry.firstInactive == decoded.firstInactive
test "nodesFound event should store nodes": test "nodesFound event should store nodes":
let let
nid = genNid() nid = genNid()
expectedKey = Key.init(nodestoreName / $nid).tryGet() expectedKey = Key.init(nodestoreName / $nid).tryGet()
(await state.events.nodesFound.fire(@[nid])).tryGet() await fireNodeFoundEvent(@[nid])
check: check:
(await ds.has(expectedKey)).tryGet() (await ds.has(expectedKey)).tryGet()
@ -74,7 +82,7 @@ suite "Nodestore":
sub = state.events.newNodesDiscovered.subscribe(onNewNodes) sub = state.events.newNodesDiscovered.subscribe(onNewNodes)
nid = genNid() nid = genNid()
(await state.events.nodesFound.fire(@[nid])).tryGet() await fireNodeFoundEvent(@[nid])
check: check:
newNodes == @[nid] newNodes == @[nid]
@ -85,7 +93,7 @@ suite "Nodestore":
let nid = genNid() let nid = genNid()
# Make nid known first. Then subscribe. # Make nid known first. Then subscribe.
(await state.events.nodesFound.fire(@[nid])).tryGet() await fireNodeFoundEvent(@[nid])
var var
newNodes = newSeq[Nid]() newNodes = newSeq[Nid]()
@ -98,7 +106,7 @@ suite "Nodestore":
let sub = state.events.newNodesDiscovered.subscribe(onNewNodes) let sub = state.events.newNodesDiscovered.subscribe(onNewNodes)
# Firing the event again should not trigger newNodesDiscovered for nid # Firing the event again should not trigger newNodesDiscovered for nid
(await state.events.nodesFound.fire(@[nid])).tryGet() await fireNodeFoundEvent(@[nid])
check: check:
newNodes.len == 0 newNodes.len == 0
@ -112,7 +120,7 @@ suite "Nodestore":
nid2 = genNid() nid2 = genNid()
nid3 = genNid() nid3 = genNid()
(await state.events.nodesFound.fire(@[nid1, nid2, nid3])).tryGet() await fireNodeFoundEvent(@[nid1, nid2, nid3])
var iterNodes = newSeq[Nid]() var iterNodes = newSeq[Nid]()
proc onNode(entry: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} = proc onNode(entry: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
@ -126,6 +134,46 @@ suite "Nodestore":
nid2 in iterNodes nid2 in iterNodes
nid3 in iterNodes nid3 in iterNodes
test "iterateAll yields no uninitialized entries":
let
nid1 = genNid()
nid2 = genNid()
nid3 = genNid()
await fireNodeFoundEvent(@[nid1, nid2, nid3])
var iterNodes = newSeq[Nid]()
proc onNode(entry: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
iterNodes.add(entry.id)
return success()
(await store.iterateAll(onNode)).tryGet()
for nid in iterNodes:
check:
$nid != "0"
test "deleteEntries deletes entries":
let
nid1 = genNid()
nid2 = genNid()
nid3 = genNid()
await fireNodeFoundEvent(@[nid1, nid2, nid3])
(await store.deleteEntries(@[nid1, nid2])).tryGet()
var iterNodes = newSeq[Nid]()
proc onNode(entry: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
iterNodes.add(entry.id)
return success()
(await store.iterateAll(onNode)).tryGet()
check:
nid1 notin iterNodes
nid2 notin iterNodes
nid3 in iterNodes
test "dhtNodeCheck event should update lastVisit": test "dhtNodeCheck event should update lastVisit":
let let
nid = genNid() nid = genNid()
@ -133,14 +181,43 @@ suite "Nodestore":
clock.setNow = 123456789.uint64 clock.setNow = 123456789.uint64
(await state.events.nodesFound.fire(@[nid])).tryGet() await fireNodeFoundEvent(@[nid])
let originalEntry = (await get[NodeEntry](ds, expectedKey)).tryGet() let originalEntry = (await get[NodeEntry](ds, expectedKey)).tryGet()
check: check:
originalEntry.lastVisit == 0 originalEntry.lastVisit == 0
(await state.events.dhtNodeCheck.fire(DhtNodeCheckEventData(id: nid, isOk: true))).tryGet() await fireCheckEvent(nid, true)
let updatedEntry = (await get[NodeEntry](ds, expectedKey)).tryGet() let updatedEntry = (await get[NodeEntry](ds, expectedKey)).tryGet()
check: check:
clock.setNow == updatedEntry.lastVisit clock.setNow == updatedEntry.lastVisit
test "failed dhtNodeCheck event should set firstInactive":
let
nid = genNid()
expectedKey = Key.init(nodestoreName / $nid).tryGet()
clock.setNow = 345345.uint64
await fireNodeFoundEvent(@[nid])
await fireCheckEvent(nid, false)
let updatedEntry = (await get[NodeEntry](ds, expectedKey)).tryGet()
check:
clock.setNow == updatedEntry.firstInactive
test "successful dhtNodeCheck event should clear firstInactive":
let
nid = genNid()
expectedKey = Key.init(nodestoreName / $nid).tryGet()
clock.setNow = 456456.uint64
await fireNodeFoundEvent(@[nid])
await fireCheckEvent(nid, false)
await fireCheckEvent(nid, true)
let updatedEntry = (await get[NodeEntry](ds, expectedKey)).tryGet()
check:
updatedEntry.firstInactive == 0

View File

@ -23,7 +23,7 @@ suite "TimeTracker":
clock: MockClock clock: MockClock
dht: MockDht dht: MockDht
time: TimeTracker time: TimeTracker
expiredNodesReceived: seq[Nid] nodesToRevisitReceived: seq[Nid]
sub: AsyncDataEventSubscription sub: AsyncDataEventSubscription
setup: setup:
@ -35,13 +35,13 @@ suite "TimeTracker":
clock.setNow = now clock.setNow = now
# Subscribe to nodesExpired event # Subscribe to nodesToRevisit event
expiredNodesReceived = newSeq[Nid]() nodesToRevisitReceived = newSeq[Nid]()
proc onExpired(nids: seq[Nid]): Future[?!void] {.async.} = proc onToRevisit(nids: seq[Nid]): Future[?!void] {.async.} =
expiredNodesReceived = nids nodesToRevisitReceived = nids
return success() return success()
sub = state.events.nodesExpired.subscribe(onExpired) sub = state.events.nodesToRevisit.subscribe(onToRevisit)
state.config.checkDelayMins = 11 state.config.checkDelayMins = 11
state.config.expiryDelayMins = 22 state.config.expiryDelayMins = 22
@ -52,44 +52,73 @@ suite "TimeTracker":
teardown: teardown:
(await time.stop()).tryGet() (await time.stop()).tryGet()
await state.events.nodesExpired.unsubscribe(sub) await state.events.nodesToRevisit.unsubscribe(sub)
state.checkAllUnsubscribed() state.checkAllUnsubscribed()
proc onStepExpiry() {.async.} = proc onStepCheck() {.async.} =
(await state.steppers[0]()).tryGet() (await state.steppers[0]()).tryGet()
proc onStepRt() {.async.} = proc onStepRt() {.async.} =
(await state.steppers[1]()).tryGet() (await state.steppers[1]()).tryGet()
proc createNodeInStore(lastVisit: uint64): Nid = proc createNodeInStore(lastVisit: uint64, firstInactive = 0.uint64): Nid =
let entry = NodeEntry(id: genNid(), lastVisit: lastVisit) let entry =
NodeEntry(id: genNid(), lastVisit: lastVisit, firstInactive: firstInactive)
store.nodesToIterate.add(entry) store.nodesToIterate.add(entry)
return entry.id return entry.id
test "start sets steppers for expiry and routingtable load": test "start sets steppers for check and routingtable load":
check: check:
state.delays[0] == state.config.checkDelayMins.minutes state.delays[0] == state.config.checkDelayMins.minutes
state.delays[1] == 30.minutes state.delays[1] == 30.minutes
test "onStep fires nodesExpired event for expired nodes": test "onStep fires nodesToRevisit event for nodes past revisit timestamp":
let let
expiredTimestamp = now - ((1 + state.config.expiryDelayMins) * 60).uint64 revisitTimestamp = now - ((state.config.revisitDelayMins + 1) * 60).uint64
expiredNodeId = createNodeInStore(expiredTimestamp) revisitNodeId = createNodeInStore(revisitTimestamp)
await onStepExpiry() await onStepCheck()
check: check:
expiredNodeId in expiredNodesReceived revisitNodeId in nodesToRevisitReceived
test "onStep does not fire nodesExpired event for nodes that are recent": test "onStep does not fire nodesToRevisit event for nodes that are recent":
let let
recentTimestamp = now - ((state.config.expiryDelayMins - 1) * 60).uint64 recentTimestamp = now - ((state.config.revisitDelayMins - 1) * 60).uint64
recentNodeId = createNodeInStore(recentTimestamp) recentNodeId = createNodeInStore(recentTimestamp)
await onStepExpiry() await onStepCheck()
check: check:
recentNodeId notin expiredNodesReceived recentNodeId notin nodesToRevisitReceived
test "onStep deletes nodes with past expired inactivity timestamp":
let
expiredTimestamp = now - ((state.config.expiryDelayMins + 1) * 60).uint64
expiredNodeId = createNodeInStore(now, expiredTimestamp)
await onStepCheck()
check:
expiredNodeId in store.nodesToDelete
test "onStep does not delete nodes with recent inactivity timestamp":
let
recentTimestamp = now - ((state.config.expiryDelayMins - 1) * 60).uint64
recentNodeId = createNodeInStore(now, recentTimestamp)
await onStepCheck()
check:
recentNodeId notin store.nodesToDelete
test "onStep does not delete nodes with zero inactivity timestamp":
let activeNodeId = createNodeInStore(now, 0.uint64)
await onStepCheck()
check:
activeNodeId notin store.nodesToDelete
test "onStep raises routingTable nodes as nodesFound": test "onStep raises routingTable nodes as nodesFound":
var nodesFound = newSeq[Nid]() var nodesFound = newSeq[Nid]()

View File

@ -33,8 +33,8 @@ suite "TodoList":
proc fireNewNodesDiscoveredEvent(nids: seq[Nid]) {.async.} = proc fireNewNodesDiscoveredEvent(nids: seq[Nid]) {.async.} =
(await state.events.newNodesDiscovered.fire(nids)).tryGet() (await state.events.newNodesDiscovered.fire(nids)).tryGet()
proc fireNodesExpiredEvent(nids: seq[Nid]) {.async.} = proc fireNodesToRevisitEvent(nids: seq[Nid]) {.async.} =
(await state.events.nodesExpired.fire(nids)).tryGet() (await state.events.nodesToRevisit.fire(nids)).tryGet()
test "discovered nodes are added to todo list": test "discovered nodes are added to todo list":
await fireNewNodesDiscoveredEvent(@[nid]) await fireNewNodesDiscoveredEvent(@[nid])
@ -43,8 +43,8 @@ suite "TodoList":
check: check:
item == nid item == nid
test "expired nodes are added to todo list": test "revisit nodes are added to todo list":
await fireNodesExpiredEvent(@[nid]) await fireNodesToRevisitEvent(@[nid])
let item = (await todo.pop).tryGet() let item = (await todo.pop).tryGet()
check: check:
@ -56,15 +56,15 @@ suite "TodoList":
check: check:
metrics.todo == 1 metrics.todo == 1
test "nodesExpired event updates todo metric": test "nodesToRevisit event updates todo metric":
await fireNodesExpiredEvent(@[nid]) await fireNodesToRevisitEvent(@[nid])
check: check:
metrics.todo == 1 metrics.todo == 1
test "does not store duplicates": test "does not store duplicates":
await fireNewNodesDiscoveredEvent(@[nid]) await fireNewNodesDiscoveredEvent(@[nid])
await fireNodesExpiredEvent(@[nid]) await fireNodesToRevisitEvent(@[nid])
check: check:
metrics.todo == 1 metrics.todo == 1

View File

@ -3,9 +3,11 @@ import pkg/questionable/results
import pkg/chronos import pkg/chronos
import ../../../codexcrawler/components/nodestore import ../../../codexcrawler/components/nodestore
import ../../../codexcrawler/types
type MockNodeStore* = ref object of NodeStore type MockNodeStore* = ref object of NodeStore
nodesToIterate*: seq[NodeEntry] nodesToIterate*: seq[NodeEntry]
nodesToDelete*: seq[Nid]
method iterateAll*( method iterateAll*(
s: MockNodeStore, onNode: OnNodeEntry s: MockNodeStore, onNode: OnNodeEntry
@ -14,6 +16,12 @@ method iterateAll*(
?await onNode(node) ?await onNode(node)
return success() return success()
method deleteEntries*(
s: MockNodeStore, nids: seq[Nid]
): Future[?!void] {.async: (raises: []).} =
s.nodesToDelete = nids
return success()
method start*(s: MockNodeStore): Future[?!void] {.async.} = method start*(s: MockNodeStore): Future[?!void] {.async.} =
return success() return success()
@ -21,4 +29,4 @@ method stop*(s: MockNodeStore): Future[?!void] {.async.} =
return success() return success()
proc createMockNodeStore*(): MockNodeStore = proc createMockNodeStore*(): MockNodeStore =
MockNodeStore(nodesToIterate: newSeq[NodeEntry]()) MockNodeStore(nodesToIterate: newSeq[NodeEntry](), nodesToDelete: newSeq[Nid]())

View File

@ -13,7 +13,7 @@ proc checkAllUnsubscribed*(s: MockState) =
s.events.nodesFound.listeners == 0 s.events.nodesFound.listeners == 0
s.events.newNodesDiscovered.listeners == 0 s.events.newNodesDiscovered.listeners == 0
s.events.dhtNodeCheck.listeners == 0 s.events.dhtNodeCheck.listeners == 0
s.events.nodesExpired.listeners == 0 s.events.nodesToRevisit.listeners == 0
method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} = method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} =
s.steppers.add(step) s.steppers.add(step)
@ -27,7 +27,7 @@ proc createMockState*(): MockState =
nodesFound: newAsyncDataEvent[seq[Nid]](), nodesFound: newAsyncDataEvent[seq[Nid]](),
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](),
nodesExpired: newAsyncDataEvent[seq[Nid]](), nodesToRevisit: newAsyncDataEvent[seq[Nid]](),
), ),
steppers: newSeq[OnStep](), steppers: newSeq[OnStep](),
delays: newSeq[Duration](), delays: newSeq[Duration](),

View File

@ -18,7 +18,7 @@ suite "State":
nodesFound: newAsyncDataEvent[seq[Nid]](), nodesFound: newAsyncDataEvent[seq[Nid]](),
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](),
nodesExpired: newAsyncDataEvent[seq[Nid]](), nodesToRevisit: newAsyncDataEvent[seq[Nid]](),
), ),
) )

View File

@ -32,6 +32,55 @@ suite "AsyncDataEvent":
await event.unsubscribe(s) await event.unsubscribe(s)
test "Multiple events":
var counter = 0
proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
inc counter
success()
let s = event.subscribe(eventHandler)
check:
isOK(await event.fire(ExampleData(s: msg)))
isOK(await event.fire(ExampleData(s: msg)))
isOK(await event.fire(ExampleData(s: msg)))
counter == 3
await event.unsubscribe(s)
test "Multiple subscribers":
var
data1 = ""
data2 = ""
data3 = ""
proc eventHandler1(e: ExampleData): Future[?!void] {.async.} =
data1 = e.s
success()
proc eventHandler2(e: ExampleData): Future[?!void] {.async.} =
data2 = e.s
success()
proc eventHandler3(e: ExampleData): Future[?!void] {.async.} =
data3 = e.s
success()
let
sub1 = event.subscribe(eventHandler1)
sub2 = event.subscribe(eventHandler2)
sub3 = event.subscribe(eventHandler3)
check:
isOK(await event.fire(ExampleData(s: msg)))
data1 == msg
data2 == msg
data3 == msg
await event.unsubscribe(sub1)
await event.unsubscribe(sub2)
await event.unsubscribe(sub3)
test "Failed event preserves error message": test "Failed event preserves error message":
proc eventHandler(e: ExampleData): Future[?!void] {.async.} = proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
failure(msg) failure(msg)