diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index c906b03..8f34113 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -109,7 +109,8 @@ proc initializeDht(app: Application): Future[?!void] {.async.} = return success() proc initializeCrawler(app: Application): Future[?!void] {.async.} = - app.crawler = Crawler.new(app.dht, app.todoNodes, app.okNodes, app.nokNodes) + app.crawler = + Crawler.new(app.dht, app.todoNodes, app.okNodes, app.nokNodes, app.config) return await app.crawler.start() proc initializeApp(app: Application): Future[?!void] {.async.} = diff --git a/codexcrawler/config.nim b/codexcrawler/config.nim index 20b6554..1c1f041 100644 --- a/codexcrawler/config.nim +++ b/codexcrawler/config.nim @@ -10,7 +10,7 @@ let doc = Codex Network Crawler. Generates network metrics. Usage: - codexcrawler [--logLevel=] [--publicIp=] [--metricsAddress=] [--metricsPort=

] [--dataDir=

] [--discoveryPort=

] [--bootNodes=] + codexcrawler [--logLevel=] [--publicIp=] [--metricsAddress=] [--metricsPort=

] [--dataDir=

] [--discoveryPort=

] [--bootNodes=] [--stepDelay=] [--revisitDelay=] Options: --logLevel= Sets log level [default: TRACE] @@ -20,6 +20,8 @@ Options: --dataDir=

Directory for storing data [default: crawler_data] --discoveryPort=

Port used for DHT [default: 8090] --bootNodes= Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs] + --stepDelay= Delay in milliseconds per crawl step [default: 1000] + --revisitDelay= Delay in minutes after which a node can be revisited [default: 1440] (24h) """ import strutils @@ -33,12 +35,15 @@ type CrawlerConfig* = ref object dataDir*: string discPort*: Port bootNodes*: seq[SignedPeerRecord] + stepDelayMs*: int + revisitDelayMins*: int proc `$`*(config: CrawlerConfig): string = "CrawlerConfig:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp & " metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort & " dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" & - config.bootNodes.mapIt($it).join(";") + config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs & + " revisitDelay=" & $config.revisitDelayMins proc getDefaultTestnetBootNodes(): seq[string] = @[ @@ -51,7 +56,7 @@ proc getDefaultTestnetBootNodes(): seq[string] = "spr:CiUIAhIhAntGLadpfuBCD9XXfiN_43-V3L5VWgFCXxg4a8uhDdnYEgIDARo8CicAJQgCEiECe0Ytp2l-4EIP1dd-I3_jf5XcvlVaAUJfGDhry6EN2dgQsIufsAYaCwoJBNEmoCiRAnV2KkYwRAIgXO3bzd5VF8jLZG8r7dcLJ_FnQBYp1BcxrOvovEa40acCIDhQ14eJRoPwJ6GKgqOkXdaFAsoszl-HIRzYcXKeb7D9", "spr:CiUIAhIhA2AEPzVj1Z_pshWAwvTp0xvRZTigIkYphXGZdiYGmYRwEgIDARo8CicAJQgCEiEDYAQ_NWPVn-myFYDC9OnTG9FlOKAiRimFcZl2JgaZhHAQvKCXugYaCwoJBES3CuORAnd-KkYwRAIgNwrc7n8A107pYUoWfJxL8X0f-flfUKeA6bFrjVKzEo0CID_0q-KO5ZAGf65VsK-d9rV3S0PbFg7Hj3Cv4aVX2Lnn", "spr:CiUIAhIhAuhggJhkjeRoR7MHjZ_L_naZKnjF541X0GXTI7LEwXi_EgIDARo8CicAJQgCEiEC6GCAmGSN5GhHsweNn8v-dpkqeMXnjVfQZdMjssTBeL8Qop2quwYaCwoJBJK-4V-RAncuKkYwRAIgaXWoxvKkzrjUZ5K_ayQHKNlYhUEzBXhGviujxfJiGXkCICbsYFivi6Ny1FT6tbofVBRj7lnaR3K9_3j5pUT4862k", - "spr:CiUIAhIhA-pnA5sLGDVbqEXsRxDUjQEpiSAximHNbyqr2DwLmTq8EgIDARo8CicAJQgCEiED6mcDmwsYNVuoRexHENSNASmJIDGKYc1vKqvYPAuZOrwQyrekvAYaCwoJBIDHOw-RAnc4KkcwRQIhAJtKNeTykcE5bkKwe-vhSmqyBwc2AnexqFX1tAQGLQJ4AiBJOPseqvI3PyEM8l3hY3zvelZU9lT03O7MA_8cUfF4Uw" + "spr:CiUIAhIhA-pnA5sLGDVbqEXsRxDUjQEpiSAximHNbyqr2DwLmTq8EgIDARo8CicAJQgCEiED6mcDmwsYNVuoRexHENSNASmJIDGKYc1vKqvYPAuZOrwQyrekvAYaCwoJBIDHOw-RAnc4KkcwRQIhAJtKNeTykcE5bkKwe-vhSmqyBwc2AnexqFX1tAQGLQJ4AiBJOPseqvI3PyEM8l3hY3zvelZU9lT03O7MA_8cUfF4Uw", ] proc getBootNodeStrings(input: string): seq[string] = @@ -90,4 +95,6 @@ proc parseConfig*(): CrawlerConfig = dataDir: get("--dataDir"), discPort: Port(parseInt(get("--discoveryPort"))), bootNodes: getBootNodes(get("--bootNodes")), + stepDelayMs: parseInt(get("--stepDelay")), + revisitDelayMins: parseInt(get("--revisitDelay")), ) diff --git a/codexcrawler/crawler.nim b/codexcrawler/crawler.nim index c3b8124..97f98f2 100644 --- a/codexcrawler/crawler.nim +++ b/codexcrawler/crawler.nim @@ -6,6 +6,7 @@ import pkg/questionable/results import ./dht import ./list import ./nodeentry +import ./config import std/sequtils @@ -14,6 +15,7 @@ logScope: type Crawler* = ref object dht: Dht + config: CrawlerConfig todoNodes: List okNodes: List nokNodes: List @@ -32,7 +34,7 @@ proc handleNodeOk(c: Crawler, target: NodeEntry) {.async.} = error "Failed to add OK-node to list", err = err.msg proc addNewTodoNode(c: Crawler, nodeId: NodeId): Future[?!void] {.async.} = - let entry = NodeEntry(id: nodeId, value: "todo") + let entry = NodeEntry(id: nodeId, lastVisit: 0) return await c.todoNodes.add(entry) proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} = @@ -46,10 +48,10 @@ proc step(c: Crawler) {.async.} = ok = $c.okNodes.len nok = $c.nokNodes.len - without target =? (await c.todoNodes.pop()), err: + without var target =? (await c.todoNodes.pop()), err: error "Failed to get todo node", err = err.msg - # todo: update target timestamp + target.lastVisit = Moment.now().epochSeconds.uint64 without receivedNodes =? (await c.dht.getNeighbors(target.id)), err: trace "Call failed", node = $target.id, err = err.msg @@ -66,7 +68,7 @@ proc worker(c: Crawler) {.async.} = try: while true: await c.step() - await sleepAsync(3.secs) + await sleepAsync(c.config.stepDelayMs.millis) except Exception as exc: error "Exception in crawler worker", msg = exc.msg quit QuitFailure @@ -85,6 +87,13 @@ proc start*(c: Crawler): Future[?!void] {.async.} = return success() proc new*( - T: type Crawler, dht: Dht, todoNodes: List, okNodes: List, nokNodes: List + T: type Crawler, + dht: Dht, + todoNodes: List, + okNodes: List, + nokNodes: List, + config: CrawlerConfig, ): Crawler = - Crawler(dht: dht, todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes) + Crawler( + dht: dht, todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes, config: config + ) diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index e083e1a..2ce16d3 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -30,19 +30,12 @@ type onMetric: OnUpdateMetric proc encode(s: NodeEntry): seq[byte] = - ($s.id & ";" & s.value).toBytes() + s.toBytes() proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T = - let s = string.fromBytes(bytes) - if s.len == 0: - return success(NodeEntry(id: NodeId("0".u256), value: "")) - - let tokens = s.split(";") - if tokens.len != 2: - return failure("expected 2 tokens") - - let id = UInt256.fromHex(tokens[0]) - success(NodeEntry(id: id, value: tokens[1])) + if bytes.len < 1: + return success(NodeEntry(id: UInt256.fromHex("0"), lastVisit: 0.uint64)) + return NodeEntry.fromBytes(bytes) proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} = without itemKey =? Key.init(this.name / $item.id), err: @@ -67,7 +60,7 @@ proc load*(this: List): Future[?!void] {.async.} = return failure(err) without value =? item.value, err: return failure(err) - if value.value.len > 0: + if value.id > 0 or value.lastVisit > 0: this.items.add(value) this.onMetric(this.items.len.int64) diff --git a/codexcrawler/nodeentry.nim b/codexcrawler/nodeentry.nim index a6cc76b..e5b6721 100644 --- a/codexcrawler/nodeentry.nim +++ b/codexcrawler/nodeentry.nim @@ -3,10 +3,33 @@ import pkg/stew/endians2 import pkg/questionable import pkg/questionable/results import pkg/codexdht +import pkg/chronos +import pkg/libp2p type NodeEntry* = object id*: NodeId - value*: string # todo: will be last-checked timestamp + lastVisit*: uint64 proc `$`*(entry: NodeEntry): string = - $entry.id & ":" & entry.value + $entry.id & ":" & $entry.lastVisit + +proc toBytes*(entry: NodeEntry): seq[byte] = + var buffer = initProtoBuffer() + buffer.write(1, $entry.id) + buffer.write(2, entry.lastVisit) + buffer.finish() + return buffer.buffer + +proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry = + var + buffer = initProtoBuffer(data) + idStr: string + lastVisit: uint64 + + if buffer.getField(1, idStr).isErr: + return failure("Unable to decode `idStr`") + + if buffer.getField(2, lastVisit).isErr: + return failure("Unable to decode `lastVisit`") + + return success(NodeEntry(id: UInt256.fromHex(idStr), lastVisit: lastVisit))