From 088c160267ccc65f39203d91b10f669f472fa125 Mon Sep 17 00:00:00 2001 From: thatben Date: Fri, 7 Feb 2025 16:48:47 +0100 Subject: [PATCH] wip: time tracking --- codexcrawler/application.nim | 11 ++++++ codexcrawler/dht.nim | 2 - codexcrawler/list.nim | 27 ++++++++----- codexcrawler/timetracker.nim | 75 ++++++++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 11 deletions(-) create mode 100644 codexcrawler/timetracker.nim diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index 8f34113..8909816 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -16,6 +16,7 @@ import ./list import ./dht import ./keyutils import ./crawler +import ./timetracker declareGauge(todoNodesGauge, "DHT nodes to be visited") declareGauge(okNodesGauge, "DHT nodes successfully contacted") @@ -35,6 +36,7 @@ type nokNodes*: List dht*: Dht crawler*: Crawler + timeTracker*: TimeTracker proc createDatastore(app: Application, path: string): ?!Datastore = without store =? LevelDbDatastore.new(path), err: @@ -113,6 +115,11 @@ proc initializeCrawler(app: Application): Future[?!void] {.async.} = Crawler.new(app.dht, app.todoNodes, app.okNodes, app.nokNodes, app.config) return await app.crawler.start() +proc initializeTimeTracker(app: Application): Future[?!void] {.async.} = + app.timeTracker = + TimeTracker.new(app.todoNodes, app.okNodes, app.nokNodes, app.config) + return await app.timeTracker.start() + proc initializeApp(app: Application): Future[?!void] {.async.} = if err =? (await app.initializeLists()).errorOption: error "Failed to initialize lists", err = err.msg @@ -126,6 +133,10 @@ proc initializeApp(app: Application): Future[?!void] {.async.} = error "Failed to initialize crawler", err = err.msg return failure(err) + if err =? (await app.initializeTimeTracker()).errorOption: + error "Failed to initialize timetracker", err = err.msg + return failure(err) + return success() proc stop*(app: Application) = diff --git a/codexcrawler/dht.nim b/codexcrawler/dht.nim index 9da00b9..a649ffe 100644 --- a/codexcrawler/dht.nim +++ b/codexcrawler/dht.nim @@ -42,10 +42,8 @@ proc getNode*(d: Dht, nodeId: NodeId): ?!Node = proc getRoutingTableNodeIds*(d: Dht): seq[NodeId] = var ids = newSeq[NodeId]() - info "routing table", len = $d.protocol.routingTable.len for bucket in d.protocol.routingTable.buckets: for node in bucket.nodes: - warn "node seen", node = $node.id, seen = $node.seen ids.add(node.id) return ids diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index 2ce16d3..3596f8a 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -22,6 +22,7 @@ logScope: type OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].} + OnItem = proc(item: NodeEntry): void {.gcsafe, raises: [].} List* = ref object name: string @@ -43,12 +44,6 @@ proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} = ?await this.store.put(itemKey, item) return success() -proc removeItem(this: List, item: NodeEntry): Future[?!void] {.async.} = - without itemKey =? Key.init(this.name / $item.id), err: - return failure(err) - ?await this.store.delete(itemKey) - return success() - proc load*(this: List): Future[?!void] {.async.} = without queryKey =? Key.init(this.name), err: return failure(err) @@ -89,17 +84,31 @@ proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} = return failure(err) return success() +proc remove*(this: List, item: NodeEntry): Future[?!void] {.async.} = + if this.items.len < 1: + return failure(this.name & "List is empty.") + + this.items.keepItIf(item.id != it.id) + without itemKey =? Key.init(this.name / $item.id), err: + return failure(err) + ?await this.store.delete(itemKey) + this.onMetric(this.items.len.int64) + return success() + proc pop*(this: List): Future[?!NodeEntry] {.async.} = if this.items.len < 1: return failure(this.name & "List is empty.") let item = this.items[0] - this.items.delete(0) - this.onMetric(this.items.len.int64) - if err =? (await this.removeItem(item)).errorOption: + if err =? (await this.remove(item)).errorOption: return failure(err) return success(item) proc len*(this: List): int = this.items.len + +proc iterateAll*(this: List, onItem: OnItem) {.async.} = + for item in this.items: + onItem(item) + await sleepAsync(1.millis) diff --git a/codexcrawler/timetracker.nim b/codexcrawler/timetracker.nim new file mode 100644 index 0000000..e839490 --- /dev/null +++ b/codexcrawler/timetracker.nim @@ -0,0 +1,75 @@ +import pkg/chronicles +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import ./dht +import ./list +import ./nodeentry +import ./config + +import std/sequtils + +logScope: + topics = "timetracker" + +type TimeTracker* = ref object + config: CrawlerConfig + todoNodes: List + okNodes: List + nokNodes: List + workerDelay: int + +proc processList(t: TimeTracker, list: List, expiry: uint64) {.async.} = + var toMove = newSeq[NodeEntry]() + proc onItem(item: NodeEntry) = + if item.lastVisit < expiry: + trace "expired node, moving to todo" + toMove.add(item) + + await list.iterateAll(onItem) + + for item in toMove: + if err =? (await t.todoNodes.add(item)).errorOption: + error "Failed to add expired node to todo list", err = err.msg + return + if err =? (await list.remove(item)).errorOption: + error "Failed to remove expired node to source list", err = err.msg + +proc step(t: TimeTracker) {.async.} = + let expiry = (Moment.now().epochSeconds - (t.config.revisitDelayMins * 60)).uint64 + await t.processList(t.okNodes, expiry) + await t.processList(t.nokNodes, expiry) + +proc worker(t: TimeTracker) {.async.} = + try: + while true: + await t.step() + await sleepAsync(t.workerDelay.minutes) + except Exception as exc: + error "Exception in timetracker worker", msg = exc.msg + quit QuitFailure + +proc start*(t: TimeTracker): Future[?!void] {.async.} = + info "Starting timetracker..." + asyncSpawn t.worker() + return success() + +proc new*( + T: type TimeTracker, + todoNodes: List, + okNodes: List, + nokNodes: List, + config: CrawlerConfig, +): TimeTracker = + var delay = config.revisitDelayMins div 10 + if delay < 1: + delay = 1 + + TimeTracker( + todoNodes: todoNodes, + okNodes: okNodes, + nokNodes: nokNodes, + config: config, + workerDelay: delay, + )