setting up timing controls

This commit is contained in:
thatben 2025-02-07 16:19:26 +01:00
parent a2d36d5192
commit 87e4f04f0b
No known key found for this signature in database
GPG Key ID: 62C543548433D43E
5 changed files with 57 additions and 24 deletions

View File

@ -109,7 +109,8 @@ proc initializeDht(app: Application): Future[?!void] {.async.} =
return success()
proc initializeCrawler(app: Application): Future[?!void] {.async.} =
app.crawler = Crawler.new(app.dht, app.todoNodes, app.okNodes, app.nokNodes)
app.crawler =
Crawler.new(app.dht, app.todoNodes, app.okNodes, app.nokNodes, app.config)
return await app.crawler.start()
proc initializeApp(app: Application): Future[?!void] {.async.} =

View File

@ -10,7 +10,7 @@ let doc =
Codex Network Crawler. Generates network metrics.
Usage:
codexcrawler [--logLevel=<l>] [--publicIp=<a>] [--metricsAddress=<ip>] [--metricsPort=<p>] [--dataDir=<dir>] [--discoveryPort=<p>] [--bootNodes=<n>]
codexcrawler [--logLevel=<l>] [--publicIp=<a>] [--metricsAddress=<ip>] [--metricsPort=<p>] [--dataDir=<dir>] [--discoveryPort=<p>] [--bootNodes=<n>] [--stepDelay=<ms>] [--revisitDelay=<m>]
Options:
--logLevel=<l> Sets log level [default: TRACE]
@ -20,6 +20,8 @@ Options:
--dataDir=<dir> Directory for storing data [default: crawler_data]
--discoveryPort=<p> Port used for DHT [default: 8090]
--bootNodes=<n> Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs]
--stepDelay=<ms> Delay in milliseconds per crawl step [default: 1000]
--revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 1440] (24h)
"""
import strutils
@ -33,12 +35,15 @@ type CrawlerConfig* = ref object
dataDir*: string
discPort*: Port
bootNodes*: seq[SignedPeerRecord]
stepDelayMs*: int
revisitDelayMins*: int
proc `$`*(config: CrawlerConfig): string =
"CrawlerConfig:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp &
" metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort &
" dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" &
config.bootNodes.mapIt($it).join(";")
config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs &
" revisitDelay=" & $config.revisitDelayMins
proc getDefaultTestnetBootNodes(): seq[string] =
@[
@ -51,7 +56,7 @@ proc getDefaultTestnetBootNodes(): seq[string] =
"spr:CiUIAhIhAntGLadpfuBCD9XXfiN_43-V3L5VWgFCXxg4a8uhDdnYEgIDARo8CicAJQgCEiECe0Ytp2l-4EIP1dd-I3_jf5XcvlVaAUJfGDhry6EN2dgQsIufsAYaCwoJBNEmoCiRAnV2KkYwRAIgXO3bzd5VF8jLZG8r7dcLJ_FnQBYp1BcxrOvovEa40acCIDhQ14eJRoPwJ6GKgqOkXdaFAsoszl-HIRzYcXKeb7D9",
"spr:CiUIAhIhA2AEPzVj1Z_pshWAwvTp0xvRZTigIkYphXGZdiYGmYRwEgIDARo8CicAJQgCEiEDYAQ_NWPVn-myFYDC9OnTG9FlOKAiRimFcZl2JgaZhHAQvKCXugYaCwoJBES3CuORAnd-KkYwRAIgNwrc7n8A107pYUoWfJxL8X0f-flfUKeA6bFrjVKzEo0CID_0q-KO5ZAGf65VsK-d9rV3S0PbFg7Hj3Cv4aVX2Lnn",
"spr:CiUIAhIhAuhggJhkjeRoR7MHjZ_L_naZKnjF541X0GXTI7LEwXi_EgIDARo8CicAJQgCEiEC6GCAmGSN5GhHsweNn8v-dpkqeMXnjVfQZdMjssTBeL8Qop2quwYaCwoJBJK-4V-RAncuKkYwRAIgaXWoxvKkzrjUZ5K_ayQHKNlYhUEzBXhGviujxfJiGXkCICbsYFivi6Ny1FT6tbofVBRj7lnaR3K9_3j5pUT4862k",
"spr:CiUIAhIhA-pnA5sLGDVbqEXsRxDUjQEpiSAximHNbyqr2DwLmTq8EgIDARo8CicAJQgCEiED6mcDmwsYNVuoRexHENSNASmJIDGKYc1vKqvYPAuZOrwQyrekvAYaCwoJBIDHOw-RAnc4KkcwRQIhAJtKNeTykcE5bkKwe-vhSmqyBwc2AnexqFX1tAQGLQJ4AiBJOPseqvI3PyEM8l3hY3zvelZU9lT03O7MA_8cUfF4Uw"
"spr:CiUIAhIhA-pnA5sLGDVbqEXsRxDUjQEpiSAximHNbyqr2DwLmTq8EgIDARo8CicAJQgCEiED6mcDmwsYNVuoRexHENSNASmJIDGKYc1vKqvYPAuZOrwQyrekvAYaCwoJBIDHOw-RAnc4KkcwRQIhAJtKNeTykcE5bkKwe-vhSmqyBwc2AnexqFX1tAQGLQJ4AiBJOPseqvI3PyEM8l3hY3zvelZU9lT03O7MA_8cUfF4Uw",
]
proc getBootNodeStrings(input: string): seq[string] =
@ -90,4 +95,6 @@ proc parseConfig*(): CrawlerConfig =
dataDir: get("--dataDir"),
discPort: Port(parseInt(get("--discoveryPort"))),
bootNodes: getBootNodes(get("--bootNodes")),
stepDelayMs: parseInt(get("--stepDelay")),
revisitDelayMins: parseInt(get("--revisitDelay")),
)

View File

@ -6,6 +6,7 @@ import pkg/questionable/results
import ./dht
import ./list
import ./nodeentry
import ./config
import std/sequtils
@ -14,6 +15,7 @@ logScope:
type Crawler* = ref object
dht: Dht
config: CrawlerConfig
todoNodes: List
okNodes: List
nokNodes: List
@ -32,7 +34,7 @@ proc handleNodeOk(c: Crawler, target: NodeEntry) {.async.} =
error "Failed to add OK-node to list", err = err.msg
proc addNewTodoNode(c: Crawler, nodeId: NodeId): Future[?!void] {.async.} =
let entry = NodeEntry(id: nodeId, value: "todo")
let entry = NodeEntry(id: nodeId, lastVisit: 0)
return await c.todoNodes.add(entry)
proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} =
@ -46,10 +48,10 @@ proc step(c: Crawler) {.async.} =
ok = $c.okNodes.len
nok = $c.nokNodes.len
without target =? (await c.todoNodes.pop()), err:
without var target =? (await c.todoNodes.pop()), err:
error "Failed to get todo node", err = err.msg
# todo: update target timestamp
target.lastVisit = Moment.now().epochSeconds.uint64
without receivedNodes =? (await c.dht.getNeighbors(target.id)), err:
trace "Call failed", node = $target.id, err = err.msg
@ -66,7 +68,7 @@ proc worker(c: Crawler) {.async.} =
try:
while true:
await c.step()
await sleepAsync(3.secs)
await sleepAsync(c.config.stepDelayMs.millis)
except Exception as exc:
error "Exception in crawler worker", msg = exc.msg
quit QuitFailure
@ -85,6 +87,13 @@ proc start*(c: Crawler): Future[?!void] {.async.} =
return success()
proc new*(
T: type Crawler, dht: Dht, todoNodes: List, okNodes: List, nokNodes: List
T: type Crawler,
dht: Dht,
todoNodes: List,
okNodes: List,
nokNodes: List,
config: CrawlerConfig,
): Crawler =
Crawler(dht: dht, todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes)
Crawler(
dht: dht, todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes, config: config
)

View File

@ -30,19 +30,12 @@ type
onMetric: OnUpdateMetric
proc encode(s: NodeEntry): seq[byte] =
($s.id & ";" & s.value).toBytes()
s.toBytes()
proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T =
let s = string.fromBytes(bytes)
if s.len == 0:
return success(NodeEntry(id: NodeId("0".u256), value: ""))
let tokens = s.split(";")
if tokens.len != 2:
return failure("expected 2 tokens")
let id = UInt256.fromHex(tokens[0])
success(NodeEntry(id: id, value: tokens[1]))
if bytes.len < 1:
return success(NodeEntry(id: UInt256.fromHex("0"), lastVisit: 0.uint64))
return NodeEntry.fromBytes(bytes)
proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
without itemKey =? Key.init(this.name / $item.id), err:
@ -67,7 +60,7 @@ proc load*(this: List): Future[?!void] {.async.} =
return failure(err)
without value =? item.value, err:
return failure(err)
if value.value.len > 0:
if value.id > 0 or value.lastVisit > 0:
this.items.add(value)
this.onMetric(this.items.len.int64)

View File

@ -3,10 +3,33 @@ import pkg/stew/endians2
import pkg/questionable
import pkg/questionable/results
import pkg/codexdht
import pkg/chronos
import pkg/libp2p
type NodeEntry* = object
id*: NodeId
value*: string # todo: will be last-checked timestamp
lastVisit*: uint64
proc `$`*(entry: NodeEntry): string =
$entry.id & ":" & entry.value
$entry.id & ":" & $entry.lastVisit
proc toBytes*(entry: NodeEntry): seq[byte] =
var buffer = initProtoBuffer()
buffer.write(1, $entry.id)
buffer.write(2, entry.lastVisit)
buffer.finish()
return buffer.buffer
proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry =
var
buffer = initProtoBuffer(data)
idStr: string
lastVisit: uint64
if buffer.getField(1, idStr).isErr:
return failure("Unable to decode `idStr`")
if buffer.getField(2, lastVisit).isErr:
return failure("Unable to decode `lastVisit`")
return success(NodeEntry(id: UInt256.fromHex(idStr), lastVisit: lastVisit))