mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-08 00:13:12 +00:00
wip: time tracking
This commit is contained in:
parent
87e4f04f0b
commit
088c160267
@ -16,6 +16,7 @@ import ./list
|
||||
import ./dht
|
||||
import ./keyutils
|
||||
import ./crawler
|
||||
import ./timetracker
|
||||
|
||||
declareGauge(todoNodesGauge, "DHT nodes to be visited")
|
||||
declareGauge(okNodesGauge, "DHT nodes successfully contacted")
|
||||
@ -35,6 +36,7 @@ type
|
||||
nokNodes*: List
|
||||
dht*: Dht
|
||||
crawler*: Crawler
|
||||
timeTracker*: TimeTracker
|
||||
|
||||
proc createDatastore(app: Application, path: string): ?!Datastore =
|
||||
without store =? LevelDbDatastore.new(path), err:
|
||||
@ -113,6 +115,11 @@ proc initializeCrawler(app: Application): Future[?!void] {.async.} =
|
||||
Crawler.new(app.dht, app.todoNodes, app.okNodes, app.nokNodes, app.config)
|
||||
return await app.crawler.start()
|
||||
|
||||
proc initializeTimeTracker(app: Application): Future[?!void] {.async.} =
|
||||
app.timeTracker =
|
||||
TimeTracker.new(app.todoNodes, app.okNodes, app.nokNodes, app.config)
|
||||
return await app.timeTracker.start()
|
||||
|
||||
proc initializeApp(app: Application): Future[?!void] {.async.} =
|
||||
if err =? (await app.initializeLists()).errorOption:
|
||||
error "Failed to initialize lists", err = err.msg
|
||||
@ -126,6 +133,10 @@ proc initializeApp(app: Application): Future[?!void] {.async.} =
|
||||
error "Failed to initialize crawler", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if err =? (await app.initializeTimeTracker()).errorOption:
|
||||
error "Failed to initialize timetracker", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
return success()
|
||||
|
||||
proc stop*(app: Application) =
|
||||
|
||||
@ -42,10 +42,8 @@ proc getNode*(d: Dht, nodeId: NodeId): ?!Node =
|
||||
|
||||
proc getRoutingTableNodeIds*(d: Dht): seq[NodeId] =
|
||||
var ids = newSeq[NodeId]()
|
||||
info "routing table", len = $d.protocol.routingTable.len
|
||||
for bucket in d.protocol.routingTable.buckets:
|
||||
for node in bucket.nodes:
|
||||
warn "node seen", node = $node.id, seen = $node.seen
|
||||
ids.add(node.id)
|
||||
return ids
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ logScope:
|
||||
|
||||
type
|
||||
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
|
||||
OnItem = proc(item: NodeEntry): void {.gcsafe, raises: [].}
|
||||
|
||||
List* = ref object
|
||||
name: string
|
||||
@ -43,12 +44,6 @@ proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||
?await this.store.put(itemKey, item)
|
||||
return success()
|
||||
|
||||
proc removeItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||
without itemKey =? Key.init(this.name / $item.id), err:
|
||||
return failure(err)
|
||||
?await this.store.delete(itemKey)
|
||||
return success()
|
||||
|
||||
proc load*(this: List): Future[?!void] {.async.} =
|
||||
without queryKey =? Key.init(this.name), err:
|
||||
return failure(err)
|
||||
@ -89,17 +84,31 @@ proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||
return failure(err)
|
||||
return success()
|
||||
|
||||
proc remove*(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||
if this.items.len < 1:
|
||||
return failure(this.name & "List is empty.")
|
||||
|
||||
this.items.keepItIf(item.id != it.id)
|
||||
without itemKey =? Key.init(this.name / $item.id), err:
|
||||
return failure(err)
|
||||
?await this.store.delete(itemKey)
|
||||
this.onMetric(this.items.len.int64)
|
||||
return success()
|
||||
|
||||
proc pop*(this: List): Future[?!NodeEntry] {.async.} =
|
||||
if this.items.len < 1:
|
||||
return failure(this.name & "List is empty.")
|
||||
|
||||
let item = this.items[0]
|
||||
this.items.delete(0)
|
||||
this.onMetric(this.items.len.int64)
|
||||
|
||||
if err =? (await this.removeItem(item)).errorOption:
|
||||
if err =? (await this.remove(item)).errorOption:
|
||||
return failure(err)
|
||||
return success(item)
|
||||
|
||||
proc len*(this: List): int =
|
||||
this.items.len
|
||||
|
||||
proc iterateAll*(this: List, onItem: OnItem) {.async.} =
|
||||
for item in this.items:
|
||||
onItem(item)
|
||||
await sleepAsync(1.millis)
|
||||
|
||||
75
codexcrawler/timetracker.nim
Normal file
75
codexcrawler/timetracker.nim
Normal file
@ -0,0 +1,75 @@
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./dht
|
||||
import ./list
|
||||
import ./nodeentry
|
||||
import ./config
|
||||
|
||||
import std/sequtils
|
||||
|
||||
logScope:
|
||||
topics = "timetracker"
|
||||
|
||||
type TimeTracker* = ref object
|
||||
config: CrawlerConfig
|
||||
todoNodes: List
|
||||
okNodes: List
|
||||
nokNodes: List
|
||||
workerDelay: int
|
||||
|
||||
proc processList(t: TimeTracker, list: List, expiry: uint64) {.async.} =
|
||||
var toMove = newSeq[NodeEntry]()
|
||||
proc onItem(item: NodeEntry) =
|
||||
if item.lastVisit < expiry:
|
||||
trace "expired node, moving to todo"
|
||||
toMove.add(item)
|
||||
|
||||
await list.iterateAll(onItem)
|
||||
|
||||
for item in toMove:
|
||||
if err =? (await t.todoNodes.add(item)).errorOption:
|
||||
error "Failed to add expired node to todo list", err = err.msg
|
||||
return
|
||||
if err =? (await list.remove(item)).errorOption:
|
||||
error "Failed to remove expired node to source list", err = err.msg
|
||||
|
||||
proc step(t: TimeTracker) {.async.} =
|
||||
let expiry = (Moment.now().epochSeconds - (t.config.revisitDelayMins * 60)).uint64
|
||||
await t.processList(t.okNodes, expiry)
|
||||
await t.processList(t.nokNodes, expiry)
|
||||
|
||||
proc worker(t: TimeTracker) {.async.} =
|
||||
try:
|
||||
while true:
|
||||
await t.step()
|
||||
await sleepAsync(t.workerDelay.minutes)
|
||||
except Exception as exc:
|
||||
error "Exception in timetracker worker", msg = exc.msg
|
||||
quit QuitFailure
|
||||
|
||||
proc start*(t: TimeTracker): Future[?!void] {.async.} =
|
||||
info "Starting timetracker..."
|
||||
asyncSpawn t.worker()
|
||||
return success()
|
||||
|
||||
proc new*(
|
||||
T: type TimeTracker,
|
||||
todoNodes: List,
|
||||
okNodes: List,
|
||||
nokNodes: List,
|
||||
config: CrawlerConfig,
|
||||
): TimeTracker =
|
||||
var delay = config.revisitDelayMins div 10
|
||||
if delay < 1:
|
||||
delay = 1
|
||||
|
||||
TimeTracker(
|
||||
todoNodes: todoNodes,
|
||||
okNodes: okNodes,
|
||||
nokNodes: nokNodes,
|
||||
config: config,
|
||||
workerDelay: delay,
|
||||
)
|
||||
Loading…
x
Reference in New Issue
Block a user