From 82a1cd07155b1c6de4229cab246081cc28154780 Mon Sep 17 00:00:00 2001 From: Ben Date: Wed, 12 Feb 2025 14:25:54 +0100 Subject: [PATCH] makes timetracker periodically raise routingtable nodes --- codexcrawler/components/timetracker.nim | 21 +++++++++++++++--- codexcrawler/services/dht.nim | 12 +--------- codexcrawler/state.nim | 2 +- .../components/testtimetracker.nim | 22 ++++++++++++++++++- 4 files changed, 41 insertions(+), 16 deletions(-) diff --git a/codexcrawler/components/timetracker.nim b/codexcrawler/components/timetracker.nim index d61a585..afbfca0 100644 --- a/codexcrawler/components/timetracker.nim +++ b/codexcrawler/components/timetracker.nim @@ -3,6 +3,7 @@ import pkg/chronos import pkg/questionable/results import ./nodestore +import ../services/dht import ../component import ../state import ../types @@ -14,8 +15,9 @@ logScope: type TimeTracker* = ref object of Component state: State nodestore: NodeStore + dht: Dht -proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} = +proc checkForExpiredNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} = trace "Checking for expired nodes..." let expiry = (Moment.now().epochSeconds - (t.state.config.revisitDelayMins * 60)).uint64 @@ -30,6 +32,17 @@ proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} = ?await t.state.events.nodesExpired.fire(expired) return success() +proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} = + let nids = t.dht.getRoutingTableNodeIds() + if err =? (await t.state.events.nodesFound.fire(nids)).errorOption: + return failure(err) + return success() + +proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} = + ?await t.checkForExpiredNodes() + ?await t.raiseRoutingTableNodes() + return success() + method start*(t: TimeTracker): Future[?!void] {.async.} = info "Starting..." @@ -46,5 +59,7 @@ method start*(t: TimeTracker): Future[?!void] {.async.} = method stop*(t: TimeTracker): Future[?!void] {.async.} = return success() -proc new*(T: type TimeTracker, state: State, nodestore: NodeStore): TimeTracker = - TimeTracker(state: state, nodestore: nodestore) +proc new*( + T: type TimeTracker, state: State, nodestore: NodeStore, dht: Dht +): TimeTracker = + TimeTracker(state: state, nodestore: nodestore, dht: dht) diff --git a/codexcrawler/services/dht.nim b/codexcrawler/services/dht.nim index 345dd31..e4726fc 100644 --- a/codexcrawler/services/dht.nim +++ b/codexcrawler/services/dht.nim @@ -47,7 +47,7 @@ proc getNode*(d: Dht, nodeId: NodeId): ?!Node = return success(node.get()) return failure("Node not found for id: " & nodeId.toHex()) -method getRoutingTableNodeIds*(d: Dht): seq[Nid] {.base.} = +method getRoutingTableNodeIds*(d: Dht): seq[Nid] {.base, gcsafe, raises: [].} = var ids = newSeq[Nid]() for bucket in d.protocol.routingTable.buckets: for node in bucket.nodes: @@ -109,19 +109,9 @@ proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) = if not d.protocol.isNil: d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") -# proc findRoutingTableNodes(d: Dht) {.async.} = -# await sleepAsync(5.seconds) -# let nodes = d.getRoutingTableNodeIds() - -# if err =? (await d.state.events.nodesFound.fire(nodes)).errorOption: -# error "Failed to raise routing-table nodes as found nodes", err = err.msg -# else: -# trace "Routing table nodes raised as found nodes", num = nodes.len - method start*(d: Dht): Future[?!void] {.async.} = d.protocol.open() await d.protocol.start() - # asyncSpawn d.findRoutingTableNodes() return success() method stop*(d: Dht): Future[?!void] {.async.} = diff --git a/codexcrawler/state.nim b/codexcrawler/state.nim index f4ca2e0..d70dfb5 100644 --- a/codexcrawler/state.nim +++ b/codexcrawler/state.nim @@ -33,7 +33,7 @@ type events*: Events proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async.} = - await sleepAsync(3.seconds) + await sleepAsync(1.seconds) proc worker(): Future[void] {.async.} = while s.status == ApplicationStatus.Running: diff --git a/tests/codexcrawler/components/testtimetracker.nim b/tests/codexcrawler/components/testtimetracker.nim index 587d1cb..7e29282 100644 --- a/tests/codexcrawler/components/testtimetracker.nim +++ b/tests/codexcrawler/components/testtimetracker.nim @@ -9,6 +9,7 @@ import ../../../codexcrawler/types import ../../../codexcrawler/state import ../mockstate import ../mocknodestore +import ../mockdht import ../helpers suite "TimeTracker": @@ -16,6 +17,7 @@ suite "TimeTracker": nid: Nid state: MockState store: MockNodeStore + dht: MockDht time: TimeTracker expiredNodesReceived: seq[Nid] sub: AsyncDataEventSubscription @@ -24,6 +26,7 @@ suite "TimeTracker": nid = genNid() state = createMockState() store = createMockNodeStore() + dht = createMockDht() # Subscribe to nodesExpired event expiredNodesReceived = newSeq[Nid]() @@ -35,7 +38,7 @@ suite "TimeTracker": state.config.revisitDelayMins = 22 - time = TimeTracker.new(state, store) + time = TimeTracker.new(state, store, dht) (await time.start()).tryGet() @@ -73,3 +76,20 @@ suite "TimeTracker": check: recentNodeId notin expiredNodesReceived + + test "onStep raises routingTable nodes as nodesFound": + var nodesFound = newSeq[Nid]() + proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = + nodesFound = nids + return success() + + let sub = state.events.nodesFound.subscribe(onNodesFound) + + dht.routingTable.add(nid) + + await onStep() + + check: + nid in nodesFound + + await state.events.nodesFound.unsubscribe(sub)