Splits check delay and expiry delay

This commit is contained in:
thatben 2025-02-13 12:35:19 +01:00
parent a714c19814
commit 88197d8748
No known key found for this signature in database
GPG Key ID: 62C543548433D43E
6 changed files with 50 additions and 37 deletions

View File

@ -20,7 +20,7 @@ type TimeTracker* = ref object of Component
clock: Clock
proc checkForExpiredNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
let expiry = t.clock.now() - (t.state.config.revisitDelayMins * 60).uint64
let expiry = t.clock.now() - (t.state.config.expiryDelayMins * 60).uint64
var expired = newSeq[Nid]()
proc checkNode(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
@ -44,22 +44,17 @@ proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: []
return failure(err)
return success()
proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
?await t.checkForExpiredNodes()
?await t.raiseRoutingTableNodes()
return success()
method start*(t: TimeTracker): Future[?!void] {.async.} =
info "Starting..."
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
await t.step()
proc onCheckExpiry(): Future[?!void] {.async: (raises: []), gcsafe.} =
await t.checkForExpiredNodes()
var delay = t.state.config.revisitDelayMins
if delay < 1:
delay = 1
proc onRoutingTable(): Future[?!void] {.async: (raises: []), gcsafe.} =
await t.raiseRoutingTableNodes()
await t.state.whileRunning(onStep, delay.minutes)
await t.state.whileRunning(onCheckExpiry, t.state.config.checkDelayMins.minutes)
await t.state.whileRunning(onRoutingTable, 30.minutes)
return success()
method stop*(t: TimeTracker): Future[?!void] {.async.} =

View File

@ -13,15 +13,16 @@ Usage:
codexcrawler [--logLevel=<l>] [--publicIp=<a>] [--metricsAddress=<ip>] [--metricsPort=<p>] [--dataDir=<dir>] [--discoveryPort=<p>] [--bootNodes=<n>] [--stepDelay=<ms>] [--revisitDelay=<m>]
Options:
--logLevel=<l> Sets log level [default: TRACE]
--publicIp=<a> Public IP address where this instance is reachable. [default: 62.45.154.249]
--metricsAddress=<ip> Listen address of the metrics server [default: 0.0.0.0]
--metricsPort=<p> Listen HTTP port of the metrics server [default: 8008]
--dataDir=<dir> Directory for storing data [default: crawler_data]
--discoveryPort=<p> Port used for DHT [default: 8090]
--bootNodes=<n> Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs]
--stepDelay=<ms> Delay in milliseconds per crawl step [default: 100]
--revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 1] (24h)
--logLevel=<l> Sets log level [default: INFO]
--publicIp=<a> Public IP address where this instance is reachable.
--metricsAddress=<ip> Listen address of the metrics server [default: 0.0.0.0]
--metricsPort=<p> Listen HTTP port of the metrics server [default: 8008]
--dataDir=<dir> Directory for storing data [default: crawler_data]
--discoveryPort=<p> Port used for DHT [default: 8090]
--bootNodes=<n> Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs]
--stepDelay=<ms> Delay in milliseconds per node visit [default: 100]
--expiryDelay=<m> Delay in minutes after which a node can be revisited [default: 60]
--checkDelay=<m> Delay with which the 'expiryDelay' is checked for all known nodes [default: 10]
"""
import strutils
@ -36,14 +37,16 @@ type Config* = ref object
discPort*: Port
bootNodes*: seq[SignedPeerRecord]
stepDelayMs*: int
revisitDelayMins*: int
expiryDelayMins*: int
checkDelayMins*: int
proc `$`*(config: Config): string =
"Crawler:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp &
" metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort &
" dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" &
config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs &
" revisitDelay=" & $config.revisitDelayMins
" expiryDelayMins=" & $config.expiryDelayMins & " checkDelayMins=" &
$config.checkDelayMins
proc getDefaultTestnetBootNodes(): seq[string] =
@[
@ -96,5 +99,6 @@ proc parseConfig*(): Config =
discPort: Port(parseInt(get("--discoveryPort"))),
bootNodes: getBootNodes(get("--bootNodes")),
stepDelayMs: parseInt(get("--stepDelay")),
revisitDelayMins: parseInt(get("--revisitDelay")),
expiryDelayMins: parseInt(get("--expiryDelay")),
checkDelayMins: parseInt(get("--checkDelay")),
)

View File

@ -10,10 +10,11 @@ DATADIR=${CRAWLER_DATADIR:-crawler_data}
DISCPORT=${CRAWLER_DISCPORT:-8090}
BOOTNODES=${CRAWLER_BOOTNODES:-testnet_sprs}
STEPDELAY=${CRAWLER_STEPDELAY:-1000}
REVISITDELAY=${CRAWLER_REVISITDELAY:-1440}
CHECKDELAY=${CRAWLER_CHECKDELAY:-10}
EXPIRYDELAY=${CRAWLER_EXPIRYDELAY:-60}
# Update CLI arguments
set -- "$@" --logLevel="${LOGLEVEL}" --publicIp="${PUBLICIP}" --metricsAddress="${METRICSADDRESS}" --metricsPort="${METRICSPORT}" --dataDir="${DATADIR}" --discoveryPort="${DISCPORT}" --bootNodes="${BOOTNODES}" --stepDelay="${STEPDELAY}" --revisitDelay="${REVISITDELAY}"
set -- "$@" --logLevel="${LOGLEVEL}" --publicIp="${PUBLICIP}" --metricsAddress="${METRICSADDRESS}" --metricsPort="${METRICSPORT}" --dataDir="${DATADIR}" --discoveryPort="${DISCPORT}" --bootNodes="${BOOTNODES}" --stepDelay="${STEPDELAY}" --expiryDelay="${EXPIRYDELAY}" --checkDelay="${CHECKDELAY}"
# Run
echo "Run Codex Crawler"

View File

@ -38,7 +38,7 @@ suite "Crawler":
state.checkAllUnsubscribed()
proc onStep() {.async.} =
(await state.stepper()).tryGet()
(await state.steppers[0]()).tryGet()
proc responsive(nid: Nid): GetNeighborsResponse =
GetNeighborsResponse(isResponsive: true, nodeIds: @[nid])

View File

@ -43,7 +43,8 @@ suite "TimeTracker":
sub = state.events.nodesExpired.subscribe(onExpired)
state.config.revisitDelayMins = 22
state.config.checkDelayMins = 11
state.config.expiryDelayMins = 22
time = TimeTracker.new(state, store, dht, clock)
@ -54,30 +55,38 @@ suite "TimeTracker":
await state.events.nodesExpired.unsubscribe(sub)
state.checkAllUnsubscribed()
proc onStep() {.async.} =
(await state.stepper()).tryGet()
proc onStepExpiry() {.async.} =
(await state.steppers[0]()).tryGet()
proc onStepRt() {.async.} =
(await state.steppers[1]()).tryGet()
proc createNodeInStore(lastVisit: uint64): Nid =
let entry = NodeEntry(id: genNid(), lastVisit: lastVisit)
store.nodesToIterate.add(entry)
return entry.id
test "start sets steppers for expiry and routingtable load":
check:
state.delays[0] == state.config.checkDelayMins.minutes
state.delays[1] == 30.minutes
test "onStep fires nodesExpired event for expired nodes":
let
expiredTimestamp = now - ((1 + state.config.revisitDelayMins) * 60).uint64
expiredTimestamp = now - ((1 + state.config.expiryDelayMins) * 60).uint64
expiredNodeId = createNodeInStore(expiredTimestamp)
await onStep()
await onStepExpiry()
check:
expiredNodeId in expiredNodesReceived
test "onStep does not fire nodesExpired event for nodes that are recent":
let
recentTimestamp = now - ((state.config.revisitDelayMins - 1) * 60).uint64
recentTimestamp = now - ((state.config.expiryDelayMins - 1) * 60).uint64
recentNodeId = createNodeInStore(recentTimestamp)
await onStep()
await onStepExpiry()
check:
recentNodeId notin expiredNodesReceived
@ -92,7 +101,7 @@ suite "TimeTracker":
dht.routingTable.add(nid)
await onStep()
await onStepRt()
check:
nid in nodesFound

View File

@ -5,7 +5,8 @@ import ../../../codexcrawler/types
import ../../../codexcrawler/config
type MockState* = ref object of State
stepper*: OnStep
steppers*: seq[OnStep]
delays*: seq[Duration]
proc checkAllUnsubscribed*(s: MockState) =
check:
@ -15,7 +16,8 @@ proc checkAllUnsubscribed*(s: MockState) =
s.events.nodesExpired.listeners == 0
method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} =
s.stepper = step
s.steppers.add(step)
s.delays.add(delay)
proc createMockState*(): MockState =
MockState(
@ -27,4 +29,6 @@ proc createMockState*(): MockState =
dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](),
nodesExpired: newAsyncDataEvent[seq[Nid]](),
),
steppers: newSeq[OnStep](),
delays: newSeq[Duration](),
)