diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index bbc0734..eb5027c 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -26,7 +26,7 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} = nodesFound: newAsyncDataEvent[seq[Nid]](), newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), - nodesExpired: newAsyncDataEvent[seq[Nid]](), + nodesToRevisit: newAsyncDataEvent[seq[Nid]](), ), ) diff --git a/codexcrawler/components/crawler.nim b/codexcrawler/components/crawler.nim index b4db6f0..c32f5ce 100644 --- a/codexcrawler/components/crawler.nim +++ b/codexcrawler/components/crawler.nim @@ -24,20 +24,24 @@ proc raiseCheckEvent( ): Future[?!void] {.async: (raises: []).} = let event = DhtNodeCheckEventData(id: nid, isOk: success) if err =? (await c.state.events.dhtNodeCheck.fire(event)).errorOption: + error "failed to raise check event", err = err.msg return failure(err) return success() proc step(c: Crawler): Future[?!void] {.async: (raises: []).} = without nid =? (await c.todo.pop()), err: + error "failed to pop todolist", err = err.msg return failure(err) without response =? await c.dht.getNeighbors(nid), err: + error "failed to get neighbors", err = err.msg return failure(err) if err =? (await c.raiseCheckEvent(nid, response.isResponsive)).errorOption: return failure(err) if err =? (await c.state.events.nodesFound.fire(response.nodeIds)).errorOption: + error "failed to raise nodesFound event", err = err.msg return failure(err) return success() diff --git a/codexcrawler/components/nodestore.nim b/codexcrawler/components/nodestore.nim index 99ac8da..e60fcad 100644 --- a/codexcrawler/components/nodestore.nim +++ b/codexcrawler/components/nodestore.nim @@ -22,6 +22,7 @@ type NodeEntry* = object id*: Nid lastVisit*: uint64 + firstInactive*: uint64 OnNodeEntry* = proc(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} @@ -33,12 +34,13 @@ type subCheck: AsyncDataEventSubscription proc `$`*(entry: NodeEntry): string = - $entry.id & ":" & $entry.lastVisit + $entry.id & ":" & $entry.lastVisit & " " & $entry.firstInactive proc toBytes*(entry: NodeEntry): seq[byte] = var buffer = initProtoBuffer() buffer.write(1, $entry.id) buffer.write(2, entry.lastVisit) + buffer.write(3, entry.firstInactive) buffer.finish() return buffer.buffer @@ -47,6 +49,7 @@ proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry = buffer = initProtoBuffer(data) idStr: string lastVisit: uint64 + firstInactive: uint64 if buffer.getField(1, idStr).isErr: return failure("Unable to decode `idStr`") @@ -54,26 +57,36 @@ proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry = if buffer.getField(2, lastVisit).isErr: return failure("Unable to decode `lastVisit`") - return success(NodeEntry(id: Nid.fromStr(idStr), lastVisit: lastVisit)) + if buffer.getField(3, firstInactive).isErr: + return failure("Unable to decode `firstInactive`") + + return success( + NodeEntry( + id: Nid.fromStr(idStr), lastVisit: lastVisit, firstInactive: firstInactive + ) + ) proc encode*(e: NodeEntry): seq[byte] = e.toBytes() proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T = if bytes.len < 1: - return success(NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64)) + return success( + NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64, firstInactive: 0.uint64) + ) return NodeEntry.fromBytes(bytes) proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} = without key =? Key.init(nodestoreName / $nid), err: + error "failed to format key", err = err.msg return failure(err) without exists =? (await s.store.has(key)), err: + error "failed to check store for key", err = err.msg return failure(err) if not exists: - let entry = NodeEntry(id: nid, lastVisit: 0) + let entry = NodeEntry(id: nid, lastVisit: 0, firstInactive: 0) ?await s.store.put(key, entry) - info "New node", nodeId = $nid return success(not exists) @@ -94,39 +107,79 @@ proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = ?await s.fireNewNodesDiscovered(newNodes) return success() -proc updateLastVisit(s: NodeStore, nid: Nid): Future[?!void] {.async.} = - without key =? Key.init(nodestoreName / $nid), err: +proc processNodeCheck( + s: NodeStore, event: DhtNodeCheckEventData +): Future[?!void] {.async.} = + without key =? Key.init(nodestoreName / $(event.id)), err: + error "failed to format key", err = err.msg return failure(err) without var entry =? (await get[NodeEntry](s.store, key)), err: + error "failed to get entry for key", err = err.msg, key = $key return failure(err) entry.lastVisit = s.clock.now() + if event.isOk and entry.firstInactive > 0: + entry.firstInactive = 0 + elif not event.isOk and entry.firstInactive == 0: + entry.firstInactive = s.clock.now() ?await s.store.put(key, entry) return success() +proc deleteEntry(s: NodeStore, nid: Nid): Future[?!void] {.async.} = + without key =? Key.init(nodestoreName / $nid), err: + error "failed to format key", err = err.msg + return failure(err) + without exists =? (await s.store.has(key)), err: + error "failed to check store for key", err = err.msg, key = $key + return failure(err) + + if exists: + ?await s.store.delete(key) + return success() + method iterateAll*( s: NodeStore, onNode: OnNodeEntry ): Future[?!void] {.async: (raises: []), base.} = without queryKey =? Key.init(nodestoreName), err: + error "failed to format key", err = err.msg return failure(err) try: without iter =? (await query[NodeEntry](s.store, Query.init(queryKey))), err: + error "failed to create query", err = err.msg return failure(err) while not iter.finished: without item =? (await iter.next()), err: + error "failure during query iteration", err = err.msg return failure(err) without value =? item.value, err: + error "failed to get value from iterator", err = err.msg return failure(err) - ?await onNode(value) + if $(value.id) == "0" and value.lastVisit == 0 and value.firstInactive == 0: + # iterator stop entry + discard + else: + ?await onNode(value) + + await sleepAsync(1.millis) except CatchableError as exc: return failure(exc.msg) return success() +method deleteEntries*( + s: NodeStore, nids: seq[Nid] +): Future[?!void] {.async: (raises: []), base.} = + for nid in nids: + try: + ?await s.deleteEntry(nid) + except CatchableError as exc: + return failure(exc.msg) + return success() + method start*(s: NodeStore): Future[?!void] {.async.} = info "Starting..." @@ -134,7 +187,7 @@ method start*(s: NodeStore): Future[?!void] {.async.} = return await s.processFoundNodes(nids) proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = - return await s.updateLastVisit(event.id) + return await s.processNodeCheck(event) s.subFound = s.state.events.nodesFound.subscribe(onNodesFound) s.subCheck = s.state.events.dhtNodeCheck.subscribe(onCheck) diff --git a/codexcrawler/components/timetracker.nim b/codexcrawler/components/timetracker.nim index 40ecf85..b1af510 100644 --- a/codexcrawler/components/timetracker.nim +++ b/codexcrawler/components/timetracker.nim @@ -19,20 +19,31 @@ type TimeTracker* = ref object of Component dht: Dht clock: Clock -proc checkForExpiredNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} = - let expiry = t.clock.now() - (t.state.config.expiryDelayMins * 60).uint64 +proc checkRevisitsAndExpiry(t: TimeTracker): Future[?!void] {.async: (raises: []).} = + let + revisitThreshold = t.clock.now() - (t.state.config.revisitDelayMins * 60).uint64 + expiryThreshold = t.clock.now() - (t.state.config.expiryDelayMins * 60).uint64 + + var + toRevisit = newSeq[Nid]() + toDelete = newSeq[Nid]() - var expired = newSeq[Nid]() proc checkNode(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} = - if item.lastVisit < expiry: - expired.add(item.id) + if item.lastVisit < revisitThreshold: + toRevisit.add(item.id) + if item.firstInactive > 0 and item.firstInactive < expiryThreshold: + toDelete.add(item.id) return success() ?await t.nodestore.iterateAll(checkNode) - if expired.len > 0: - trace "Found expired nodes", expired = expired.len - ?await t.state.events.nodesExpired.fire(expired) + if toRevisit.len > 0: + trace "Found nodes to revisit", toRevisit = toRevisit.len + ?await t.state.events.nodesToRevisit.fire(toRevisit) + + if toDelete.len > 0: + trace "Found expired node records to delete", toDelete = toDelete.len + ?await t.nodestore.deleteEntries(toDelete) return success() @@ -41,19 +52,22 @@ proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: [] trace "Raising routing table nodes", nodes = nids.len if err =? (await t.state.events.nodesFound.fire(nids)).errorOption: + error "failed to raise nodesFound event", err = err.msg return failure(err) return success() method start*(t: TimeTracker): Future[?!void] {.async.} = info "Starting..." - proc onCheckExpiry(): Future[?!void] {.async: (raises: []), gcsafe.} = - await t.checkForExpiredNodes() + proc onCheckRevisitAndExpiry(): Future[?!void] {.async: (raises: []), gcsafe.} = + await t.checkRevisitsAndExpiry() proc onRoutingTable(): Future[?!void] {.async: (raises: []), gcsafe.} = await t.raiseRoutingTableNodes() - await t.state.whileRunning(onCheckExpiry, t.state.config.checkDelayMins.minutes) + await t.state.whileRunning( + onCheckRevisitAndExpiry, t.state.config.checkDelayMins.minutes + ) await t.state.whileRunning(onRoutingTable, 30.minutes) return success() diff --git a/codexcrawler/components/todolist.nim b/codexcrawler/components/todolist.nim index 00ebfe8..e271f1e 100644 --- a/codexcrawler/components/todolist.nim +++ b/codexcrawler/components/todolist.nim @@ -20,7 +20,7 @@ type TodoList* = ref object of Component nids: seq[Nid] state: State subNew: AsyncDataEventSubscription - subExp: AsyncDataEventSubscription + subRev: AsyncDataEventSubscription emptySignal: ?Future[void] metrics: Metrics @@ -62,12 +62,12 @@ method start*(t: TodoList): Future[?!void] {.async.} = return success() t.subNew = t.state.events.newNodesDiscovered.subscribe(onNewNodes) - t.subExp = t.state.events.nodesExpired.subscribe(onNewNodes) + t.subRev = t.state.events.nodesToRevisit.subscribe(onNewNodes) return success() method stop*(t: TodoList): Future[?!void] {.async.} = await t.state.events.newNodesDiscovered.unsubscribe(t.subNew) - await t.state.events.nodesExpired.unsubscribe(t.subExp) + await t.state.events.nodesToRevisit.unsubscribe(t.subRev) return success() proc new*(_: type TodoList, state: State, metrics: Metrics): TodoList = diff --git a/codexcrawler/config.nim b/codexcrawler/config.nim index 707f743..84e1ef6 100644 --- a/codexcrawler/config.nim +++ b/codexcrawler/config.nim @@ -10,7 +10,7 @@ let doc = Codex Network Crawler. Generates network metrics. Usage: - codexcrawler [--logLevel=] [--publicIp=] [--metricsAddress=] [--metricsPort=

] [--dataDir=

] [--discoveryPort=

] [--bootNodes=] [--stepDelay=] [--revisitDelay=] + codexcrawler [--logLevel=] [--publicIp=] [--metricsAddress=] [--metricsPort=

] [--dataDir=

] [--discoveryPort=

] [--bootNodes=] [--stepDelay=] [--revisitDelay=] [--checkDelay=] [--expiryDelay=] Options: --logLevel= Sets log level [default: INFO] @@ -20,9 +20,10 @@ Options: --dataDir=

Directory for storing data [default: crawler_data] --discoveryPort=

Port used for DHT [default: 8090] --bootNodes= Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs] - --stepDelay= Delay in milliseconds per node visit [default: 100] - --expiryDelay= Delay in minutes after which a node can be revisited [default: 60] - --checkDelay= Delay with which the 'expiryDelay' is checked for all known nodes [default: 10] + --stepDelay= Delay in milliseconds per node visit [default: 1000] + --revisitDelay= Delay in minutes after which a node can be revisited [default: 60] + --checkDelay= Delay with which the 'revisitDelay' is checked for all known nodes [default: 10] + --expiryDelay= Delay in minutes after which unresponsive nodes are discarded [default: 1440] (24h) """ import strutils @@ -37,16 +38,17 @@ type Config* = ref object discPort*: Port bootNodes*: seq[SignedPeerRecord] stepDelayMs*: int - expiryDelayMins*: int + revisitDelayMins*: int checkDelayMins*: int + expiryDelayMins*: int proc `$`*(config: Config): string = "Crawler:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp & " metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort & " dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" & config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs & - " expiryDelayMins=" & $config.expiryDelayMins & " checkDelayMins=" & - $config.checkDelayMins + " revisitDelayMins=" & $config.revisitDelayMins & " expiryDelayMins=" & + $config.expiryDelayMins & " checkDelayMins=" & $config.checkDelayMins proc getDefaultTestnetBootNodes(): seq[string] = @[ @@ -99,6 +101,7 @@ proc parseConfig*(): Config = discPort: Port(parseInt(get("--discoveryPort"))), bootNodes: getBootNodes(get("--bootNodes")), stepDelayMs: parseInt(get("--stepDelay")), - expiryDelayMins: parseInt(get("--expiryDelay")), + revisitDelayMins: parseInt(get("--revisitDelay")), checkDelayMins: parseInt(get("--checkDelay")), + expiryDelayMins: parseInt(get("--expiryDelay")), ) diff --git a/codexcrawler/state.nim b/codexcrawler/state.nim index d70dfb5..66b7cd7 100644 --- a/codexcrawler/state.nim +++ b/codexcrawler/state.nim @@ -20,7 +20,7 @@ type nodesFound*: AsyncDataEvent[seq[Nid]] newNodesDiscovered*: AsyncDataEvent[seq[Nid]] dhtNodeCheck*: AsyncDataEvent[DhtNodeCheckEventData] - nodesExpired*: AsyncDataEvent[seq[Nid]] + nodesToRevisit*: AsyncDataEvent[seq[Nid]] ApplicationStatus* {.pure.} = enum Stopped diff --git a/codexcrawler/utils/asyncdataevent.nim b/codexcrawler/utils/asyncdataevent.nim index 81946d7..e847c7d 100644 --- a/codexcrawler/utils/asyncdataevent.nim +++ b/codexcrawler/utils/asyncdataevent.nim @@ -63,6 +63,7 @@ proc fire*[T]( for sub in event.subscriptions: try: await sub.fireEvent.wait() + sub.fireEvent.clear() except CancelledError: discard if err =? sub.lastResult.errorOption: diff --git a/docker/docker-entrypoint.sh b/docker/docker-entrypoint.sh index 210ea9e..9cad0bd 100644 --- a/docker/docker-entrypoint.sh +++ b/docker/docker-entrypoint.sh @@ -10,11 +10,12 @@ DATADIR=${CRAWLER_DATADIR:-crawler_data} DISCPORT=${CRAWLER_DISCPORT:-8090} BOOTNODES=${CRAWLER_BOOTNODES:-testnet_sprs} STEPDELAY=${CRAWLER_STEPDELAY:-1000} +REVISITDELAY=${CRAWLER_REVISITDELAY:-60} CHECKDELAY=${CRAWLER_CHECKDELAY:-10} -EXPIRYDELAY=${CRAWLER_EXPIRYDELAY:-60} +EXPIRYDELAY=${CRAWLER_EXPIRYDELAY:-1440} # Update CLI arguments -set -- "$@" --logLevel="${LOGLEVEL}" --publicIp="${PUBLICIP}" --metricsAddress="${METRICSADDRESS}" --metricsPort="${METRICSPORT}" --dataDir="${DATADIR}" --discoveryPort="${DISCPORT}" --bootNodes="${BOOTNODES}" --stepDelay="${STEPDELAY}" --expiryDelay="${EXPIRYDELAY}" --checkDelay="${CHECKDELAY}" +set -- "$@" --logLevel="${LOGLEVEL}" --publicIp="${PUBLICIP}" --metricsAddress="${METRICSADDRESS}" --metricsPort="${METRICSPORT}" --dataDir="${DATADIR}" --discoveryPort="${DISCPORT}" --bootNodes="${BOOTNODES}" --stepDelay="${STEPDELAY}" --revisitDelay="${REVISITDELAY}" --expiryDelay="${EXPIRYDELAY}" --checkDelay="${CHECKDELAY}" # Run echo "Run Codex Crawler" diff --git a/tests/codexcrawler/components/testnodestore.nim b/tests/codexcrawler/components/testnodestore.nim index b9687be..282eab9 100644 --- a/tests/codexcrawler/components/testnodestore.nim +++ b/tests/codexcrawler/components/testnodestore.nim @@ -39,8 +39,15 @@ suite "Nodestore": state.checkAllUnsubscribed() removeDir(dsPath) + proc fireNodeFoundEvent(nids: seq[Nid]) {.async.} = + (await state.events.nodesFound.fire(nids)).tryGet() + + proc fireCheckEvent(nid: Nid, isOk: bool) {.async.} = + (await state.events.dhtNodeCheck.fire(DhtNodeCheckEventData(id: nid, isOk: isOk))).tryGet() + test "nodeEntry encoding": - let entry = NodeEntry(id: genNid(), lastVisit: 123.uint64) + let entry = + NodeEntry(id: genNid(), lastVisit: 123.uint64, firstInactive: 234.uint64) let bytes = entry.encode() @@ -49,13 +56,14 @@ suite "Nodestore": check: entry.id == decoded.id entry.lastVisit == decoded.lastVisit + entry.firstInactive == decoded.firstInactive test "nodesFound event should store nodes": let nid = genNid() expectedKey = Key.init(nodestoreName / $nid).tryGet() - (await state.events.nodesFound.fire(@[nid])).tryGet() + await fireNodeFoundEvent(@[nid]) check: (await ds.has(expectedKey)).tryGet() @@ -74,7 +82,7 @@ suite "Nodestore": sub = state.events.newNodesDiscovered.subscribe(onNewNodes) nid = genNid() - (await state.events.nodesFound.fire(@[nid])).tryGet() + await fireNodeFoundEvent(@[nid]) check: newNodes == @[nid] @@ -85,7 +93,7 @@ suite "Nodestore": let nid = genNid() # Make nid known first. Then subscribe. - (await state.events.nodesFound.fire(@[nid])).tryGet() + await fireNodeFoundEvent(@[nid]) var newNodes = newSeq[Nid]() @@ -98,7 +106,7 @@ suite "Nodestore": let sub = state.events.newNodesDiscovered.subscribe(onNewNodes) # Firing the event again should not trigger newNodesDiscovered for nid - (await state.events.nodesFound.fire(@[nid])).tryGet() + await fireNodeFoundEvent(@[nid]) check: newNodes.len == 0 @@ -112,7 +120,7 @@ suite "Nodestore": nid2 = genNid() nid3 = genNid() - (await state.events.nodesFound.fire(@[nid1, nid2, nid3])).tryGet() + await fireNodeFoundEvent(@[nid1, nid2, nid3]) var iterNodes = newSeq[Nid]() proc onNode(entry: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} = @@ -126,6 +134,46 @@ suite "Nodestore": nid2 in iterNodes nid3 in iterNodes + test "iterateAll yields no uninitialized entries": + let + nid1 = genNid() + nid2 = genNid() + nid3 = genNid() + + await fireNodeFoundEvent(@[nid1, nid2, nid3]) + + var iterNodes = newSeq[Nid]() + proc onNode(entry: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} = + iterNodes.add(entry.id) + return success() + + (await store.iterateAll(onNode)).tryGet() + + for nid in iterNodes: + check: + $nid != "0" + + test "deleteEntries deletes entries": + let + nid1 = genNid() + nid2 = genNid() + nid3 = genNid() + + await fireNodeFoundEvent(@[nid1, nid2, nid3]) + (await store.deleteEntries(@[nid1, nid2])).tryGet() + + var iterNodes = newSeq[Nid]() + proc onNode(entry: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} = + iterNodes.add(entry.id) + return success() + + (await store.iterateAll(onNode)).tryGet() + + check: + nid1 notin iterNodes + nid2 notin iterNodes + nid3 in iterNodes + test "dhtNodeCheck event should update lastVisit": let nid = genNid() @@ -133,14 +181,43 @@ suite "Nodestore": clock.setNow = 123456789.uint64 - (await state.events.nodesFound.fire(@[nid])).tryGet() + await fireNodeFoundEvent(@[nid]) let originalEntry = (await get[NodeEntry](ds, expectedKey)).tryGet() check: originalEntry.lastVisit == 0 - (await state.events.dhtNodeCheck.fire(DhtNodeCheckEventData(id: nid, isOk: true))).tryGet() + await fireCheckEvent(nid, true) let updatedEntry = (await get[NodeEntry](ds, expectedKey)).tryGet() check: clock.setNow == updatedEntry.lastVisit + + test "failed dhtNodeCheck event should set firstInactive": + let + nid = genNid() + expectedKey = Key.init(nodestoreName / $nid).tryGet() + + clock.setNow = 345345.uint64 + + await fireNodeFoundEvent(@[nid]) + await fireCheckEvent(nid, false) + + let updatedEntry = (await get[NodeEntry](ds, expectedKey)).tryGet() + check: + clock.setNow == updatedEntry.firstInactive + + test "successful dhtNodeCheck event should clear firstInactive": + let + nid = genNid() + expectedKey = Key.init(nodestoreName / $nid).tryGet() + + clock.setNow = 456456.uint64 + + await fireNodeFoundEvent(@[nid]) + await fireCheckEvent(nid, false) + await fireCheckEvent(nid, true) + + let updatedEntry = (await get[NodeEntry](ds, expectedKey)).tryGet() + check: + updatedEntry.firstInactive == 0 diff --git a/tests/codexcrawler/components/testtimetracker.nim b/tests/codexcrawler/components/testtimetracker.nim index 02edb8b..0c9b4cd 100644 --- a/tests/codexcrawler/components/testtimetracker.nim +++ b/tests/codexcrawler/components/testtimetracker.nim @@ -23,7 +23,7 @@ suite "TimeTracker": clock: MockClock dht: MockDht time: TimeTracker - expiredNodesReceived: seq[Nid] + nodesToRevisitReceived: seq[Nid] sub: AsyncDataEventSubscription setup: @@ -35,13 +35,13 @@ suite "TimeTracker": clock.setNow = now - # Subscribe to nodesExpired event - expiredNodesReceived = newSeq[Nid]() - proc onExpired(nids: seq[Nid]): Future[?!void] {.async.} = - expiredNodesReceived = nids + # Subscribe to nodesToRevisit event + nodesToRevisitReceived = newSeq[Nid]() + proc onToRevisit(nids: seq[Nid]): Future[?!void] {.async.} = + nodesToRevisitReceived = nids return success() - sub = state.events.nodesExpired.subscribe(onExpired) + sub = state.events.nodesToRevisit.subscribe(onToRevisit) state.config.checkDelayMins = 11 state.config.expiryDelayMins = 22 @@ -52,44 +52,73 @@ suite "TimeTracker": teardown: (await time.stop()).tryGet() - await state.events.nodesExpired.unsubscribe(sub) + await state.events.nodesToRevisit.unsubscribe(sub) state.checkAllUnsubscribed() - proc onStepExpiry() {.async.} = + proc onStepCheck() {.async.} = (await state.steppers[0]()).tryGet() proc onStepRt() {.async.} = (await state.steppers[1]()).tryGet() - proc createNodeInStore(lastVisit: uint64): Nid = - let entry = NodeEntry(id: genNid(), lastVisit: lastVisit) + proc createNodeInStore(lastVisit: uint64, firstInactive = 0.uint64): Nid = + let entry = + NodeEntry(id: genNid(), lastVisit: lastVisit, firstInactive: firstInactive) store.nodesToIterate.add(entry) return entry.id - test "start sets steppers for expiry and routingtable load": + test "start sets steppers for check and routingtable load": check: state.delays[0] == state.config.checkDelayMins.minutes state.delays[1] == 30.minutes - test "onStep fires nodesExpired event for expired nodes": + test "onStep fires nodesToRevisit event for nodes past revisit timestamp": let - expiredTimestamp = now - ((1 + state.config.expiryDelayMins) * 60).uint64 - expiredNodeId = createNodeInStore(expiredTimestamp) + revisitTimestamp = now - ((state.config.revisitDelayMins + 1) * 60).uint64 + revisitNodeId = createNodeInStore(revisitTimestamp) - await onStepExpiry() + await onStepCheck() check: - expiredNodeId in expiredNodesReceived + revisitNodeId in nodesToRevisitReceived - test "onStep does not fire nodesExpired event for nodes that are recent": + test "onStep does not fire nodesToRevisit event for nodes that are recent": let - recentTimestamp = now - ((state.config.expiryDelayMins - 1) * 60).uint64 + recentTimestamp = now - ((state.config.revisitDelayMins - 1) * 60).uint64 recentNodeId = createNodeInStore(recentTimestamp) - await onStepExpiry() + await onStepCheck() check: - recentNodeId notin expiredNodesReceived + recentNodeId notin nodesToRevisitReceived + + test "onStep deletes nodes with past expired inactivity timestamp": + let + expiredTimestamp = now - ((state.config.expiryDelayMins + 1) * 60).uint64 + expiredNodeId = createNodeInStore(now, expiredTimestamp) + + await onStepCheck() + + check: + expiredNodeId in store.nodesToDelete + + test "onStep does not delete nodes with recent inactivity timestamp": + let + recentTimestamp = now - ((state.config.expiryDelayMins - 1) * 60).uint64 + recentNodeId = createNodeInStore(now, recentTimestamp) + + await onStepCheck() + + check: + recentNodeId notin store.nodesToDelete + + test "onStep does not delete nodes with zero inactivity timestamp": + let activeNodeId = createNodeInStore(now, 0.uint64) + + await onStepCheck() + + check: + activeNodeId notin store.nodesToDelete test "onStep raises routingTable nodes as nodesFound": var nodesFound = newSeq[Nid]() diff --git a/tests/codexcrawler/components/testtodolist.nim b/tests/codexcrawler/components/testtodolist.nim index 3ddc3bf..cca8512 100644 --- a/tests/codexcrawler/components/testtodolist.nim +++ b/tests/codexcrawler/components/testtodolist.nim @@ -33,8 +33,8 @@ suite "TodoList": proc fireNewNodesDiscoveredEvent(nids: seq[Nid]) {.async.} = (await state.events.newNodesDiscovered.fire(nids)).tryGet() - proc fireNodesExpiredEvent(nids: seq[Nid]) {.async.} = - (await state.events.nodesExpired.fire(nids)).tryGet() + proc fireNodesToRevisitEvent(nids: seq[Nid]) {.async.} = + (await state.events.nodesToRevisit.fire(nids)).tryGet() test "discovered nodes are added to todo list": await fireNewNodesDiscoveredEvent(@[nid]) @@ -43,8 +43,8 @@ suite "TodoList": check: item == nid - test "expired nodes are added to todo list": - await fireNodesExpiredEvent(@[nid]) + test "revisit nodes are added to todo list": + await fireNodesToRevisitEvent(@[nid]) let item = (await todo.pop).tryGet() check: @@ -56,15 +56,15 @@ suite "TodoList": check: metrics.todo == 1 - test "nodesExpired event updates todo metric": - await fireNodesExpiredEvent(@[nid]) + test "nodesToRevisit event updates todo metric": + await fireNodesToRevisitEvent(@[nid]) check: metrics.todo == 1 test "does not store duplicates": await fireNewNodesDiscoveredEvent(@[nid]) - await fireNodesExpiredEvent(@[nid]) + await fireNodesToRevisitEvent(@[nid]) check: metrics.todo == 1 diff --git a/tests/codexcrawler/mocks/mocknodestore.nim b/tests/codexcrawler/mocks/mocknodestore.nim index d640f38..ed78f31 100644 --- a/tests/codexcrawler/mocks/mocknodestore.nim +++ b/tests/codexcrawler/mocks/mocknodestore.nim @@ -3,9 +3,11 @@ import pkg/questionable/results import pkg/chronos import ../../../codexcrawler/components/nodestore +import ../../../codexcrawler/types type MockNodeStore* = ref object of NodeStore nodesToIterate*: seq[NodeEntry] + nodesToDelete*: seq[Nid] method iterateAll*( s: MockNodeStore, onNode: OnNodeEntry @@ -14,6 +16,12 @@ method iterateAll*( ?await onNode(node) return success() +method deleteEntries*( + s: MockNodeStore, nids: seq[Nid] +): Future[?!void] {.async: (raises: []).} = + s.nodesToDelete = nids + return success() + method start*(s: MockNodeStore): Future[?!void] {.async.} = return success() @@ -21,4 +29,4 @@ method stop*(s: MockNodeStore): Future[?!void] {.async.} = return success() proc createMockNodeStore*(): MockNodeStore = - MockNodeStore(nodesToIterate: newSeq[NodeEntry]()) + MockNodeStore(nodesToIterate: newSeq[NodeEntry](), nodesToDelete: newSeq[Nid]()) diff --git a/tests/codexcrawler/mocks/mockstate.nim b/tests/codexcrawler/mocks/mockstate.nim index 0263144..bb15f11 100644 --- a/tests/codexcrawler/mocks/mockstate.nim +++ b/tests/codexcrawler/mocks/mockstate.nim @@ -13,7 +13,7 @@ proc checkAllUnsubscribed*(s: MockState) = s.events.nodesFound.listeners == 0 s.events.newNodesDiscovered.listeners == 0 s.events.dhtNodeCheck.listeners == 0 - s.events.nodesExpired.listeners == 0 + s.events.nodesToRevisit.listeners == 0 method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} = s.steppers.add(step) @@ -27,7 +27,7 @@ proc createMockState*(): MockState = nodesFound: newAsyncDataEvent[seq[Nid]](), newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), - nodesExpired: newAsyncDataEvent[seq[Nid]](), + nodesToRevisit: newAsyncDataEvent[seq[Nid]](), ), steppers: newSeq[OnStep](), delays: newSeq[Duration](), diff --git a/tests/codexcrawler/teststate.nim b/tests/codexcrawler/teststate.nim index 4ea6bb5..abce2b6 100644 --- a/tests/codexcrawler/teststate.nim +++ b/tests/codexcrawler/teststate.nim @@ -18,7 +18,7 @@ suite "State": nodesFound: newAsyncDataEvent[seq[Nid]](), newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), - nodesExpired: newAsyncDataEvent[seq[Nid]](), + nodesToRevisit: newAsyncDataEvent[seq[Nid]](), ), ) diff --git a/tests/codexcrawler/utils/testasyncdataevent.nim b/tests/codexcrawler/utils/testasyncdataevent.nim index c5a47c5..34cf1b2 100644 --- a/tests/codexcrawler/utils/testasyncdataevent.nim +++ b/tests/codexcrawler/utils/testasyncdataevent.nim @@ -32,6 +32,55 @@ suite "AsyncDataEvent": await event.unsubscribe(s) + test "Multiple events": + var counter = 0 + proc eventHandler(e: ExampleData): Future[?!void] {.async.} = + inc counter + success() + + let s = event.subscribe(eventHandler) + + check: + isOK(await event.fire(ExampleData(s: msg))) + isOK(await event.fire(ExampleData(s: msg))) + isOK(await event.fire(ExampleData(s: msg))) + + counter == 3 + + await event.unsubscribe(s) + + test "Multiple subscribers": + var + data1 = "" + data2 = "" + data3 = "" + proc eventHandler1(e: ExampleData): Future[?!void] {.async.} = + data1 = e.s + success() + + proc eventHandler2(e: ExampleData): Future[?!void] {.async.} = + data2 = e.s + success() + + proc eventHandler3(e: ExampleData): Future[?!void] {.async.} = + data3 = e.s + success() + + let + sub1 = event.subscribe(eventHandler1) + sub2 = event.subscribe(eventHandler2) + sub3 = event.subscribe(eventHandler3) + + check: + isOK(await event.fire(ExampleData(s: msg))) + data1 == msg + data2 == msg + data3 == msg + + await event.unsubscribe(sub1) + await event.unsubscribe(sub2) + await event.unsubscribe(sub3) + test "Failed event preserves error message": proc eventHandler(e: ExampleData): Future[?!void] {.async.} = failure(msg)