mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-05 23:13:11 +00:00
Implementing crawler
This commit is contained in:
parent
454fbd3474
commit
c9ba44a307
@ -1,5 +1,7 @@
|
||||
# Codex Network Crawler
|
||||
|
||||

|
||||
|
||||
[](https://opensource.org/licenses/Apache-2.0)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
[](#stability)
|
||||
|
||||
@ -108,15 +108,9 @@ proc initializeDht(app: Application): Future[?!void] {.async.} =
|
||||
|
||||
return success()
|
||||
|
||||
proc initializeCrawler(app: Application) =
|
||||
app.crawler = Crawler.new(
|
||||
app.dht,
|
||||
app.todoNodes,
|
||||
app.okNodes,
|
||||
app.nokNodes
|
||||
)
|
||||
|
||||
app.crawler.start()
|
||||
proc initializeCrawler(app: Application): Future[?!void] {.async.} =
|
||||
app.crawler = Crawler.new(app.dht, app.todoNodes, app.okNodes, app.nokNodes)
|
||||
return await app.crawler.start()
|
||||
|
||||
proc initializeApp(app: Application): Future[?!void] {.async.} =
|
||||
if err =? (await app.initializeLists()).errorOption:
|
||||
@ -127,31 +121,12 @@ proc initializeApp(app: Application): Future[?!void] {.async.} =
|
||||
error "Failed to initialize DHT", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
app.initializeCrawler()
|
||||
if err =? (await app.initializeCrawler()).errorOption:
|
||||
error "Failed to initialize crawler", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
return success()
|
||||
|
||||
# proc hackyCrawl(app: Application) {.async.} =
|
||||
# info "starting hacky crawl..."
|
||||
# await sleepAsync(3000)
|
||||
|
||||
# var nodeIds = app.dht.getRoutingTableNodeIds()
|
||||
# trace "starting with routing table nodes", nodes = nodeIds.len
|
||||
|
||||
# while app.status == ApplicationStatus.Running and nodeIds.len > 0:
|
||||
# let nodeId = nodeIds[0]
|
||||
# nodeIds.delete(0)
|
||||
|
||||
# without newNodes =? (await app.dht.getNeighbors(nodeId)), err:
|
||||
# error "getneighbors failed", err = err.msg
|
||||
|
||||
# for node in newNodes:
|
||||
# nodeIds.add(node.id)
|
||||
# trace "adding new node", id = $node.id, addrs = $node.address
|
||||
# await sleepAsync(1000)
|
||||
|
||||
# info "hacky crawl stopped!"
|
||||
|
||||
proc stop*(app: Application) =
|
||||
app.status = ApplicationStatus.Stopping
|
||||
waitFor app.dht.stop()
|
||||
|
||||
@ -1,8 +1,11 @@
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./dht
|
||||
import ./list
|
||||
import ./nodeentry
|
||||
|
||||
logScope:
|
||||
topics = "crawler"
|
||||
@ -13,13 +16,60 @@ type Crawler* = ref object
|
||||
okNodes: List
|
||||
nokNodes: List
|
||||
|
||||
proc start*(c: Crawler) =
|
||||
info "Starting crawler..."
|
||||
proc handleNodeNotOk(c: Crawler, target: NodeEntry) {.async.} =
|
||||
if err =? (await c.nokNodes.add(target)).errorOption:
|
||||
error "Failed to add not-OK-node to list", err = err.msg
|
||||
|
||||
proc new*(T: type Crawler, dht: Dht, todoNodes: List, okNodes: List, nokNodes: List): Crawler =
|
||||
Crawler(
|
||||
dht: dht,
|
||||
todoNodes: todoNodes,
|
||||
okNodes: okNodes,
|
||||
nokNodes: nokNodes
|
||||
)
|
||||
proc handleNodeOk(c: Crawler, target: NodeEntry) {.async.} =
|
||||
if err =? (await c.okNodes.add(target)).errorOption:
|
||||
error "Failed to add OK-node to list", err = err.msg
|
||||
|
||||
proc addNewTodoNode(c: Crawler, nodeId: NodeId): Future[?!void] {.async.} =
|
||||
let entry = NodeEntry(id: nodeId, value: "todo")
|
||||
return await c.todoNodes.add(entry)
|
||||
|
||||
proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} =
|
||||
for node in newNodes:
|
||||
if err =? (await c.addNewTodoNode(node.id)).errorOption:
|
||||
error "Failed to add todo-node to list", err = err.msg
|
||||
|
||||
proc step(c: Crawler) {.async.} =
|
||||
without target =? (await c.todoNodes.pop()), err:
|
||||
error "Failed to get todo node", err = err.msg
|
||||
|
||||
# todo: update target timestamp
|
||||
|
||||
without newNodes =? (await c.dht.getNeighbors(target.id)), err:
|
||||
trace "getNeighbors call failed", node = $target.id, err = err.msg
|
||||
await c.handleNodeNotOk(target)
|
||||
return
|
||||
|
||||
await c.handleNodeOk(target)
|
||||
await c.addNewTodoNodes(newNodes)
|
||||
|
||||
proc worker(c: Crawler) {.async.} =
|
||||
try:
|
||||
while true:
|
||||
await c.step()
|
||||
await sleepAsync(3.secs)
|
||||
except Exception as exc:
|
||||
error "Exception in crawler worker", msg = exc.msg
|
||||
quit QuitFailure
|
||||
|
||||
proc start*(c: Crawler): Future[?!void] {.async.} =
|
||||
if c.todoNodes.len < 1:
|
||||
let nodeIds = c.dht.getRoutingTableNodeIds()
|
||||
info "Loading routing-table nodes to todo-list...", nodes = nodeIds.len
|
||||
for id in nodeIds:
|
||||
if err =? (await c.addNewTodoNode(id)).errorOption:
|
||||
error "Failed to add routing-table node to todo-list", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
info "Starting crawler..."
|
||||
asyncSpawn c.worker()
|
||||
return success()
|
||||
|
||||
proc new*(
|
||||
T: type Crawler, dht: Dht, todoNodes: List, okNodes: List, nokNodes: List
|
||||
): Crawler =
|
||||
Crawler(dht: dht, todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes)
|
||||
|
||||
@ -7,6 +7,8 @@ import pkg/stew/byteutils
|
||||
import pkg/stew/endians2
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/stint
|
||||
import pkg/codexdht
|
||||
|
||||
import std/sets
|
||||
import std/strutils
|
||||
@ -27,25 +29,31 @@ type
|
||||
onMetric: OnUpdateMetric
|
||||
|
||||
proc encode(s: NodeEntry): seq[byte] =
|
||||
(s.id & ";" & s.value).toBytes()
|
||||
($s.id & ";" & s.value).toBytes()
|
||||
|
||||
proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T =
|
||||
let s = string.fromBytes(bytes)
|
||||
if s.len == 0:
|
||||
return success(NodeEntry(id: "", value: ""))
|
||||
return success(NodeEntry(id: NodeId("0".u256), value: ""))
|
||||
|
||||
let tokens = s.split(";")
|
||||
if tokens.len != 2:
|
||||
return failure("expected 2 tokens")
|
||||
|
||||
success(NodeEntry(id: tokens[0], value: tokens[1]))
|
||||
success(NodeEntry(id: NodeId(tokens[0].u256), value: tokens[1]))
|
||||
|
||||
proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||
without itemKey =? Key.init(this.name / item.id), err:
|
||||
without itemKey =? Key.init(this.name / $item.id), err:
|
||||
return failure(err)
|
||||
?await this.store.put(itemKey, item)
|
||||
return success()
|
||||
|
||||
proc removeItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||
without itemKey =? Key.init(this.name / $item.id), err:
|
||||
return failure(err)
|
||||
?await this.store.delete(itemKey)
|
||||
return success()
|
||||
|
||||
proc load*(this: List): Future[?!void] {.async.} =
|
||||
without queryKey =? Key.init(this.name), err:
|
||||
return failure(err)
|
||||
@ -57,7 +65,7 @@ proc load*(this: List): Future[?!void] {.async.} =
|
||||
return failure(err)
|
||||
without value =? item.value, err:
|
||||
return failure(err)
|
||||
if value.id.len > 0:
|
||||
if value.value.len > 0:
|
||||
this.items.incl(value)
|
||||
|
||||
this.onMetric(this.items.len.int64)
|
||||
@ -76,3 +84,15 @@ proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||
if err =? (await this.saveItem(item)).errorOption:
|
||||
return failure(err)
|
||||
return success()
|
||||
|
||||
proc pop*(this: List): Future[?!NodeEntry] {.async.} =
|
||||
if this.items.len < 1:
|
||||
return failure(this.name & "List is empty.")
|
||||
|
||||
let item = this.items.pop()
|
||||
if err =? (await this.removeItem(item)).errorOption:
|
||||
return failure(err)
|
||||
return success(item)
|
||||
|
||||
proc len*(this: List): int =
|
||||
this.items.len
|
||||
|
||||
@ -2,10 +2,11 @@ import pkg/stew/byteutils
|
||||
import pkg/stew/endians2
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/codexdht
|
||||
|
||||
type NodeEntry* = object
|
||||
id*: string # will be node ID
|
||||
value*: string
|
||||
id*: NodeId
|
||||
value*: string # todo: will be last-checked timestamp
|
||||
|
||||
proc `$`*(entry: NodeEntry): string =
|
||||
entry.id & ":" & entry.value
|
||||
$entry.id & ":" & entry.value
|
||||
|
||||
BIN
crawler.png
Normal file
BIN
crawler.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 46 KiB |
Loading…
x
Reference in New Issue
Block a user