mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-07 16:03:08 +00:00
basic crawling
This commit is contained in:
parent
c9ba44a307
commit
a2d36d5192
@ -7,6 +7,8 @@ import ./dht
|
|||||||
import ./list
|
import ./list
|
||||||
import ./nodeentry
|
import ./nodeentry
|
||||||
|
|
||||||
|
import std/sequtils
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "crawler"
|
topics = "crawler"
|
||||||
|
|
||||||
@ -16,6 +18,11 @@ type Crawler* = ref object
|
|||||||
okNodes: List
|
okNodes: List
|
||||||
nokNodes: List
|
nokNodes: List
|
||||||
|
|
||||||
|
# This is not going to stay this way.
|
||||||
|
proc isNew(c: Crawler, node: Node): bool =
|
||||||
|
not c.todoNodes.contains(node.id) and not c.okNodes.contains(node.id) and
|
||||||
|
not c.nokNodes.contains(node.id)
|
||||||
|
|
||||||
proc handleNodeNotOk(c: Crawler, target: NodeEntry) {.async.} =
|
proc handleNodeNotOk(c: Crawler, target: NodeEntry) {.async.} =
|
||||||
if err =? (await c.nokNodes.add(target)).errorOption:
|
if err =? (await c.nokNodes.add(target)).errorOption:
|
||||||
error "Failed to add not-OK-node to list", err = err.msg
|
error "Failed to add not-OK-node to list", err = err.msg
|
||||||
@ -34,16 +41,24 @@ proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} =
|
|||||||
error "Failed to add todo-node to list", err = err.msg
|
error "Failed to add todo-node to list", err = err.msg
|
||||||
|
|
||||||
proc step(c: Crawler) {.async.} =
|
proc step(c: Crawler) {.async.} =
|
||||||
|
logScope:
|
||||||
|
todo = $c.todoNodes.len
|
||||||
|
ok = $c.okNodes.len
|
||||||
|
nok = $c.nokNodes.len
|
||||||
|
|
||||||
without target =? (await c.todoNodes.pop()), err:
|
without target =? (await c.todoNodes.pop()), err:
|
||||||
error "Failed to get todo node", err = err.msg
|
error "Failed to get todo node", err = err.msg
|
||||||
|
|
||||||
# todo: update target timestamp
|
# todo: update target timestamp
|
||||||
|
|
||||||
without newNodes =? (await c.dht.getNeighbors(target.id)), err:
|
without receivedNodes =? (await c.dht.getNeighbors(target.id)), err:
|
||||||
trace "getNeighbors call failed", node = $target.id, err = err.msg
|
trace "Call failed", node = $target.id, err = err.msg
|
||||||
await c.handleNodeNotOk(target)
|
await c.handleNodeNotOk(target)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
let newNodes = receivedNodes.filterIt(isNew(c, it))
|
||||||
|
|
||||||
|
trace "Received nodes", receivedNodes = receivedNodes.len, newNodes = newNodes.len
|
||||||
await c.handleNodeOk(target)
|
await c.handleNodeOk(target)
|
||||||
await c.addNewTodoNodes(newNodes)
|
await c.addNewTodoNodes(newNodes)
|
||||||
|
|
||||||
|
|||||||
@ -12,6 +12,7 @@ import pkg/codexdht
|
|||||||
|
|
||||||
import std/sets
|
import std/sets
|
||||||
import std/strutils
|
import std/strutils
|
||||||
|
import std/sequtils
|
||||||
import std/os
|
import std/os
|
||||||
|
|
||||||
import ./nodeentry
|
import ./nodeentry
|
||||||
@ -25,7 +26,7 @@ type
|
|||||||
List* = ref object
|
List* = ref object
|
||||||
name: string
|
name: string
|
||||||
store: TypedDatastore
|
store: TypedDatastore
|
||||||
items: HashSet[NodeEntry]
|
items: seq[NodeEntry]
|
||||||
onMetric: OnUpdateMetric
|
onMetric: OnUpdateMetric
|
||||||
|
|
||||||
proc encode(s: NodeEntry): seq[byte] =
|
proc encode(s: NodeEntry): seq[byte] =
|
||||||
@ -40,7 +41,8 @@ proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T =
|
|||||||
if tokens.len != 2:
|
if tokens.len != 2:
|
||||||
return failure("expected 2 tokens")
|
return failure("expected 2 tokens")
|
||||||
|
|
||||||
success(NodeEntry(id: NodeId(tokens[0].u256), value: tokens[1]))
|
let id = UInt256.fromHex(tokens[0])
|
||||||
|
success(NodeEntry(id: id, value: tokens[1]))
|
||||||
|
|
||||||
proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||||
without itemKey =? Key.init(this.name / $item.id), err:
|
without itemKey =? Key.init(this.name / $item.id), err:
|
||||||
@ -66,7 +68,7 @@ proc load*(this: List): Future[?!void] {.async.} =
|
|||||||
without value =? item.value, err:
|
without value =? item.value, err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
if value.value.len > 0:
|
if value.value.len > 0:
|
||||||
this.items.incl(value)
|
this.items.add(value)
|
||||||
|
|
||||||
this.onMetric(this.items.len.int64)
|
this.onMetric(this.items.len.int64)
|
||||||
info "Loaded list", name = this.name, items = this.items.len
|
info "Loaded list", name = this.name, items = this.items.len
|
||||||
@ -77,8 +79,17 @@ proc new*(
|
|||||||
): List =
|
): List =
|
||||||
List(name: name, store: store, onMetric: onMetric)
|
List(name: name, store: store, onMetric: onMetric)
|
||||||
|
|
||||||
|
proc contains*(this: List, nodeId: NodeId): bool =
|
||||||
|
this.items.anyIt(it.id == nodeId)
|
||||||
|
|
||||||
|
proc contains*(this: List, item: NodeEntry): bool =
|
||||||
|
this.contains(item.id)
|
||||||
|
|
||||||
proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||||
this.items.incl(item)
|
if this.contains(item):
|
||||||
|
return success()
|
||||||
|
|
||||||
|
this.items.add(item)
|
||||||
this.onMetric(this.items.len.int64)
|
this.onMetric(this.items.len.int64)
|
||||||
|
|
||||||
if err =? (await this.saveItem(item)).errorOption:
|
if err =? (await this.saveItem(item)).errorOption:
|
||||||
@ -89,7 +100,10 @@ proc pop*(this: List): Future[?!NodeEntry] {.async.} =
|
|||||||
if this.items.len < 1:
|
if this.items.len < 1:
|
||||||
return failure(this.name & "List is empty.")
|
return failure(this.name & "List is empty.")
|
||||||
|
|
||||||
let item = this.items.pop()
|
let item = this.items[0]
|
||||||
|
this.items.delete(0)
|
||||||
|
this.onMetric(this.items.len.int64)
|
||||||
|
|
||||||
if err =? (await this.removeItem(item)).errorOption:
|
if err =? (await this.removeItem(item)).errorOption:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
return success(item)
|
return success(item)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user