mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-02 13:33:08 +00:00
Reduces log spam. Adds wait signal for empty lists.
This commit is contained in:
parent
5f573cf91a
commit
e7caa08fae
@ -54,16 +54,20 @@ proc step(c: Crawler) {.async.} =
|
|||||||
target.lastVisit = Moment.now().epochSeconds.uint64
|
target.lastVisit = Moment.now().epochSeconds.uint64
|
||||||
|
|
||||||
without receivedNodes =? (await c.dht.getNeighbors(target.id)), err:
|
without receivedNodes =? (await c.dht.getNeighbors(target.id)), err:
|
||||||
trace "Call failed", node = $target.id, err = err.msg
|
|
||||||
await c.handleNodeNotOk(target)
|
await c.handleNodeNotOk(target)
|
||||||
return
|
return
|
||||||
|
|
||||||
let newNodes = receivedNodes.filterIt(isNew(c, it))
|
let newNodes = receivedNodes.filterIt(isNew(c, it))
|
||||||
|
if newNodes.len > 0:
|
||||||
|
trace "Discovered new nodes", newNodes = newNodes.len
|
||||||
|
|
||||||
trace "Received nodes", receivedNodes = receivedNodes.len, newNodes = newNodes.len
|
|
||||||
await c.handleNodeOk(target)
|
await c.handleNodeOk(target)
|
||||||
await c.addNewTodoNodes(newNodes)
|
await c.addNewTodoNodes(newNodes)
|
||||||
|
|
||||||
|
# Don't log the status every loop:
|
||||||
|
if (c.todoNodes.len mod 10) == 0:
|
||||||
|
trace "Status"
|
||||||
|
|
||||||
proc worker(c: Crawler) {.async.} =
|
proc worker(c: Crawler) {.async.} =
|
||||||
try:
|
try:
|
||||||
while true:
|
while true:
|
||||||
@ -82,7 +86,7 @@ proc start*(c: Crawler): Future[?!void] {.async.} =
|
|||||||
error "Failed to add routing-table node to todo-list", err = err.msg
|
error "Failed to add routing-table node to todo-list", err = err.msg
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
info "Starting crawler..."
|
info "Starting crawler...", stepDelayMs = $c.config.stepDelayMs
|
||||||
asyncSpawn c.worker()
|
asyncSpawn c.worker()
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
|||||||
@ -29,6 +29,7 @@ type
|
|||||||
store: TypedDatastore
|
store: TypedDatastore
|
||||||
items: seq[NodeEntry]
|
items: seq[NodeEntry]
|
||||||
onMetric: OnUpdateMetric
|
onMetric: OnUpdateMetric
|
||||||
|
emptySignal: ?Future[void]
|
||||||
|
|
||||||
proc encode(s: NodeEntry): seq[byte] =
|
proc encode(s: NodeEntry): seq[byte] =
|
||||||
s.toBytes()
|
s.toBytes()
|
||||||
@ -80,6 +81,11 @@ proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
|||||||
this.items.add(item)
|
this.items.add(item)
|
||||||
this.onMetric(this.items.len.int64)
|
this.onMetric(this.items.len.int64)
|
||||||
|
|
||||||
|
if isSome(this.emptySignal):
|
||||||
|
trace "List no longer empty.", name = this.name
|
||||||
|
this.emptySignal.get().complete()
|
||||||
|
this.emptySignal = Future[void].none
|
||||||
|
|
||||||
if err =? (await this.saveItem(item)).errorOption:
|
if err =? (await this.saveItem(item)).errorOption:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
return success()
|
return success()
|
||||||
@ -97,7 +103,11 @@ proc remove*(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
|||||||
|
|
||||||
proc pop*(this: List): Future[?!NodeEntry] {.async.} =
|
proc pop*(this: List): Future[?!NodeEntry] {.async.} =
|
||||||
if this.items.len < 1:
|
if this.items.len < 1:
|
||||||
return failure(this.name & "List is empty.")
|
trace "List is empty. Waiting for new items...", name = this.name
|
||||||
|
this.emptySignal = some(newFuture[void]("list.emptySignal"))
|
||||||
|
await this.emptySignal.get().wait(1.hours)
|
||||||
|
if this.items.len < 1:
|
||||||
|
return failure(this.name & "List is empty.")
|
||||||
|
|
||||||
let item = this.items[0]
|
let item = this.items[0]
|
||||||
|
|
||||||
|
|||||||
@ -8,8 +8,6 @@ import ./list
|
|||||||
import ./nodeentry
|
import ./nodeentry
|
||||||
import ./config
|
import ./config
|
||||||
|
|
||||||
import std/sequtils
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "timetracker"
|
topics = "timetracker"
|
||||||
|
|
||||||
@ -24,11 +22,13 @@ proc processList(t: TimeTracker, list: List, expiry: uint64) {.async.} =
|
|||||||
var toMove = newSeq[NodeEntry]()
|
var toMove = newSeq[NodeEntry]()
|
||||||
proc onItem(item: NodeEntry) =
|
proc onItem(item: NodeEntry) =
|
||||||
if item.lastVisit < expiry:
|
if item.lastVisit < expiry:
|
||||||
trace "expired node, moving to todo"
|
|
||||||
toMove.add(item)
|
toMove.add(item)
|
||||||
|
|
||||||
await list.iterateAll(onItem)
|
await list.iterateAll(onItem)
|
||||||
|
|
||||||
|
if toMove.len > 0:
|
||||||
|
trace "expired node, moving to todo", nodes = $toMove.len
|
||||||
|
|
||||||
for item in toMove:
|
for item in toMove:
|
||||||
if err =? (await t.todoNodes.add(item)).errorOption:
|
if err =? (await t.todoNodes.add(item)).errorOption:
|
||||||
error "Failed to add expired node to todo list", err = err.msg
|
error "Failed to add expired node to todo list", err = err.msg
|
||||||
@ -51,7 +51,7 @@ proc worker(t: TimeTracker) {.async.} =
|
|||||||
quit QuitFailure
|
quit QuitFailure
|
||||||
|
|
||||||
proc start*(t: TimeTracker): Future[?!void] {.async.} =
|
proc start*(t: TimeTracker): Future[?!void] {.async.} =
|
||||||
info "Starting timetracker..."
|
info "Starting timetracker...", revisitDelayMins = $t.workerDelay
|
||||||
asyncSpawn t.worker()
|
asyncSpawn t.worker()
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
|||||||
@ -7,6 +7,7 @@ switch("define", "libp2p_pki_schemes=secp256k1")
|
|||||||
# Sets TRACE logging for everything except DHT
|
# Sets TRACE logging for everything except DHT
|
||||||
switch("define", "chronicles_log_level=TRACE")
|
switch("define", "chronicles_log_level=TRACE")
|
||||||
switch("define", "chronicles_disabled_topics:discv5")
|
switch("define", "chronicles_disabled_topics:discv5")
|
||||||
|
switch("define", "chronicles_disable_thread_id")
|
||||||
|
|
||||||
when (NimMajor, NimMinor) >= (2, 0):
|
when (NimMajor, NimMinor) >= (2, 0):
|
||||||
--mm:
|
--mm:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user