mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-05 15:03:07 +00:00
makes timetracker periodically raise routingtable nodes
This commit is contained in:
parent
6574d53d5f
commit
82a1cd0715
@ -3,6 +3,7 @@ import pkg/chronos
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./nodestore
|
||||
import ../services/dht
|
||||
import ../component
|
||||
import ../state
|
||||
import ../types
|
||||
@ -14,8 +15,9 @@ logScope:
|
||||
type TimeTracker* = ref object of Component
|
||||
state: State
|
||||
nodestore: NodeStore
|
||||
dht: Dht
|
||||
|
||||
proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
|
||||
proc checkForExpiredNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
|
||||
trace "Checking for expired nodes..."
|
||||
let expiry =
|
||||
(Moment.now().epochSeconds - (t.state.config.revisitDelayMins * 60)).uint64
|
||||
@ -30,6 +32,17 @@ proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
|
||||
?await t.state.events.nodesExpired.fire(expired)
|
||||
return success()
|
||||
|
||||
proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
|
||||
let nids = t.dht.getRoutingTableNodeIds()
|
||||
if err =? (await t.state.events.nodesFound.fire(nids)).errorOption:
|
||||
return failure(err)
|
||||
return success()
|
||||
|
||||
proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
|
||||
?await t.checkForExpiredNodes()
|
||||
?await t.raiseRoutingTableNodes()
|
||||
return success()
|
||||
|
||||
method start*(t: TimeTracker): Future[?!void] {.async.} =
|
||||
info "Starting..."
|
||||
|
||||
@ -46,5 +59,7 @@ method start*(t: TimeTracker): Future[?!void] {.async.} =
|
||||
method stop*(t: TimeTracker): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
proc new*(T: type TimeTracker, state: State, nodestore: NodeStore): TimeTracker =
|
||||
TimeTracker(state: state, nodestore: nodestore)
|
||||
proc new*(
|
||||
T: type TimeTracker, state: State, nodestore: NodeStore, dht: Dht
|
||||
): TimeTracker =
|
||||
TimeTracker(state: state, nodestore: nodestore, dht: dht)
|
||||
|
||||
@ -47,7 +47,7 @@ proc getNode*(d: Dht, nodeId: NodeId): ?!Node =
|
||||
return success(node.get())
|
||||
return failure("Node not found for id: " & nodeId.toHex())
|
||||
|
||||
method getRoutingTableNodeIds*(d: Dht): seq[Nid] {.base.} =
|
||||
method getRoutingTableNodeIds*(d: Dht): seq[Nid] {.base, gcsafe, raises: [].} =
|
||||
var ids = newSeq[Nid]()
|
||||
for bucket in d.protocol.routingTable.buckets:
|
||||
for node in bucket.nodes:
|
||||
@ -109,19 +109,9 @@ proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) =
|
||||
if not d.protocol.isNil:
|
||||
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
|
||||
|
||||
# proc findRoutingTableNodes(d: Dht) {.async.} =
|
||||
# await sleepAsync(5.seconds)
|
||||
# let nodes = d.getRoutingTableNodeIds()
|
||||
|
||||
# if err =? (await d.state.events.nodesFound.fire(nodes)).errorOption:
|
||||
# error "Failed to raise routing-table nodes as found nodes", err = err.msg
|
||||
# else:
|
||||
# trace "Routing table nodes raised as found nodes", num = nodes.len
|
||||
|
||||
method start*(d: Dht): Future[?!void] {.async.} =
|
||||
d.protocol.open()
|
||||
await d.protocol.start()
|
||||
# asyncSpawn d.findRoutingTableNodes()
|
||||
return success()
|
||||
|
||||
method stop*(d: Dht): Future[?!void] {.async.} =
|
||||
|
||||
@ -33,7 +33,7 @@ type
|
||||
events*: Events
|
||||
|
||||
proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async.} =
|
||||
await sleepAsync(3.seconds)
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
proc worker(): Future[void] {.async.} =
|
||||
while s.status == ApplicationStatus.Running:
|
||||
|
||||
@ -9,6 +9,7 @@ import ../../../codexcrawler/types
|
||||
import ../../../codexcrawler/state
|
||||
import ../mockstate
|
||||
import ../mocknodestore
|
||||
import ../mockdht
|
||||
import ../helpers
|
||||
|
||||
suite "TimeTracker":
|
||||
@ -16,6 +17,7 @@ suite "TimeTracker":
|
||||
nid: Nid
|
||||
state: MockState
|
||||
store: MockNodeStore
|
||||
dht: MockDht
|
||||
time: TimeTracker
|
||||
expiredNodesReceived: seq[Nid]
|
||||
sub: AsyncDataEventSubscription
|
||||
@ -24,6 +26,7 @@ suite "TimeTracker":
|
||||
nid = genNid()
|
||||
state = createMockState()
|
||||
store = createMockNodeStore()
|
||||
dht = createMockDht()
|
||||
|
||||
# Subscribe to nodesExpired event
|
||||
expiredNodesReceived = newSeq[Nid]()
|
||||
@ -35,7 +38,7 @@ suite "TimeTracker":
|
||||
|
||||
state.config.revisitDelayMins = 22
|
||||
|
||||
time = TimeTracker.new(state, store)
|
||||
time = TimeTracker.new(state, store, dht)
|
||||
|
||||
(await time.start()).tryGet()
|
||||
|
||||
@ -73,3 +76,20 @@ suite "TimeTracker":
|
||||
|
||||
check:
|
||||
recentNodeId notin expiredNodesReceived
|
||||
|
||||
test "onStep raises routingTable nodes as nodesFound":
|
||||
var nodesFound = newSeq[Nid]()
|
||||
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
nodesFound = nids
|
||||
return success()
|
||||
|
||||
let sub = state.events.nodesFound.subscribe(onNodesFound)
|
||||
|
||||
dht.routingTable.add(nid)
|
||||
|
||||
await onStep()
|
||||
|
||||
check:
|
||||
nid in nodesFound
|
||||
|
||||
await state.events.nodesFound.unsubscribe(sub)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user