Implementing and testing crawler

This commit is contained in:
Ben 2025-02-12 13:25:37 +01:00
parent da1d82a4cd
commit 25291f7625
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
14 changed files with 265 additions and 130 deletions

View File

@ -3,86 +3,53 @@ import pkg/chronos
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import ./dht import ../services/dht
import ../list import ./todolist
import ../config import ../config
import ../component
import ../types import ../types
import ../component
import ../state import ../state
import ../utils/asyncdataevent import ../utils/asyncdataevent
import std/sequtils
logScope: logScope:
topics = "crawler" topics = "crawler"
type Crawler* = ref object of Component type Crawler* = ref object of Component
state: State
dht: Dht dht: Dht
config: Config todo: TodoList
todoNodes: List
okNodes: List
nokNodes: List
# This is not going to stay this way. proc raiseCheckEvent(c: Crawler, nid: Nid, success: bool): Future[?!void] {.async: (raises: []).} =
proc isNew(c: Crawler, node: Node): bool = let event = DhtNodeCheckEventData(
not c.todoNodes.contains(node.id) and not c.okNodes.contains(node.id) and id: nid,
not c.nokNodes.contains(node.id) isOk: success
)
if err =? (await c.state.events.dhtNodeCheck.fire(event)).errorOption:
return failure(err)
return success()
# proc handleNodeNotOk(c: Crawler, target: NodeEntry) {.async.} = proc step(c: Crawler): Future[?!void] {.async: (raises: []).} =
# if err =? (await c.nokNodes.add(target)).errorOption: without nid =? (await c.todo.pop()), err:
# error "Failed to add not-OK-node to list", err = err.msg return failure(err)
# proc handleNodeOk(c: Crawler, target: NodeEntry) {.async.} = without response =? await c.dht.getNeighbors(nid), err:
# if err =? (await c.okNodes.add(target)).errorOption: return failure(err)
# error "Failed to add OK-node to list", err = err.msg
# proc addNewTodoNode(c: Crawler, nodeId: NodeId): Future[?!void] {.async.} = if err =? (await c.raiseCheckEvent(nid, response.isResponsive)).errorOption:
# let entry = NodeEntry(id: nodeId, lastVisit: 0) return failure(err)
# return await c.todoNodes.add(entry)
# proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} = if err =? (await c.state.events.nodesFound.fire(response.nodeIds)).errorOption:
# for node in newNodes: return failure(err)
# if err =? (await c.addNewTodoNode(node.id)).errorOption:
# error "Failed to add todo-node to list", err = err.msg
# proc step(c: Crawler) {.async.} = return success()
# logScope:
# todo = $c.todoNodes.len
# ok = $c.okNodes.len
# nok = $c.nokNodes.len
# without var target =? (await c.todoNodes.pop()), err:
# error "Failed to get todo node", err = err.msg
# target.lastVisit = Moment.now().epochSeconds.uint64
# without receivedNodes =? (await c.dht.getNeighbors(target.id)), err:
# await c.handleNodeNotOk(target)
# return
# let newNodes = receivedNodes.filterIt(isNew(c, it))
# if newNodes.len > 0:
# trace "Discovered new nodes", newNodes = newNodes.len
# await c.handleNodeOk(target)
# await c.addNewTodoNodes(newNodes)
# # Don't log the status every loop:
# if (c.todoNodes.len mod 10) == 0:
# trace "Status"
proc worker(c: Crawler) {.async.} =
try:
while true:
# await c.step()
await sleepAsync(c.config.stepDelayMs.millis)
except Exception as exc:
error "Exception in crawler worker", msg = exc.msg
quit QuitFailure
method start*(c: Crawler): Future[?!void] {.async.} = method start*(c: Crawler): Future[?!void] {.async.} =
info "Starting crawler...", stepDelayMs = $c.config.stepDelayMs info "Starting crawler..."
asyncSpawn c.worker()
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
await c.step()
await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds)
return success() return success()
method stop*(c: Crawler): Future[?!void] {.async.} = method stop*(c: Crawler): Future[?!void] {.async.} =
@ -90,13 +57,12 @@ method stop*(c: Crawler): Future[?!void] {.async.} =
proc new*( proc new*(
T: type Crawler, T: type Crawler,
state: State,
dht: Dht, dht: Dht,
# todoNodes: List, todo: TodoList
# okNodes: List,
# nokNodes: List,
config: Config,
): Crawler = ): Crawler =
Crawler( Crawler(
state: state,
dht: dht, dht: dht,
config: config, # todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes, todo: todo
) )

View File

@ -3,10 +3,9 @@ import pkg/chronos
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import ./dht
import ../list import ../list
import ../state import ../state
import ../metrics import ../services/metrics
import ../component import ../component
import ../utils/asyncdataevent import ../utils/asyncdataevent

View File

@ -30,12 +30,15 @@ proc addNodes(t: TodoList, nids: seq[Nid]) =
s.complete() s.complete()
t.emptySignal = Future[void].none t.emptySignal = Future[void].none
proc pop*(t: TodoList): Future[?!Nid] {.async.} = method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} =
if t.nids.len < 1: if t.nids.len < 1:
trace "List is empty. Waiting for new items..." trace "List is empty. Waiting for new items..."
let signal = newFuture[void]("list.emptySignal") let signal = newFuture[void]("list.emptySignal")
t.emptySignal = some(signal) t.emptySignal = some(signal)
await signal.wait(1.hours) try:
await signal.wait(1.hours)
except CatchableError as exc:
return failure(exc.msg)
if t.nids.len < 1: if t.nids.len < 1:
return failure("TodoList is empty.") return failure("TodoList is empty.")
@ -63,5 +66,5 @@ method stop*(t: TodoList): Future[?!void] {.async.} =
proc new*(_: type TodoList, state: State): TodoList = proc new*(_: type TodoList, state: State): TodoList =
TodoList(nids: newSeq[Nid](), state: state, emptySignal: Future[void].none) TodoList(nids: newSeq[Nid](), state: state, emptySignal: Future[void].none)
proc createTodoList*(state: State): ?!TodoList = proc createTodoList*(state: State): TodoList =
success(TodoList.new(state)) TodoList.new(state)

View File

@ -2,9 +2,9 @@ import pkg/chronos
import pkg/questionable/results import pkg/questionable/results
import ./state import ./state
import ./metrics import ./services/metrics
import ./services/dht
import ./component import ./component
import ./components/dht
import ./components/crawler import ./components/crawler
import ./components/timetracker import ./components/timetracker
import ./components/nodestore import ./components/nodestore
@ -19,14 +19,18 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
without nodeStore =? createNodeStore(state), err: without nodeStore =? createNodeStore(state), err:
return failure(err) return failure(err)
let metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort) let
metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort)
todoList = createTodoList(state)
without dhtMetrics =? createDhtMetrics(state, metrics), err: without dhtMetrics =? createDhtMetrics(state, metrics), err:
return failure(err) return failure(err)
components.add(todoList)
components.add(nodeStore) components.add(nodeStore)
components.add(dht) components.add(dht)
components.add(Crawler.new(dht, state.config)) components.add(Crawler.new(state, dht, todoList))
components.add(TimeTracker.new(state, nodeStore)) components.add(TimeTracker.new(state, nodeStore))
components.add(dhtMetrics) components.add(dhtMetrics)
return success(components) return success(components)

View File

@ -1,5 +1,6 @@
import std/os import std/os
import std/net import std/net
import std/sequtils
import pkg/chronicles import pkg/chronicles
import pkg/chronos import pkg/chronos
import pkg/libp2p import pkg/libp2p
@ -11,63 +12,59 @@ from pkg/nimcrypto import keccak256
import ../utils/keyutils import ../utils/keyutils
import ../utils/datastoreutils import ../utils/datastoreutils
import ../utils/rng import ../utils/rng
import ../utils/asyncdataevent
import ../component import ../component
import ../state import ../state
import ../types
export discv5 export discv5
logScope: logScope:
topics = "dht" topics = "dht"
type Dht* = ref object of Component type
state: State GetNeighborsResponse* = ref object
protocol*: discv5.Protocol isResponsive*: bool
key: PrivateKey nodeIds*: seq[Nid]
peerId: PeerId
announceAddrs*: seq[MultiAddress]
providerRecord*: ?SignedPeerRecord
dhtRecord*: ?SignedPeerRecord
# proc toNodeId*(cid: Cid): NodeId = Dht* = ref object of Component
# ## Cid to discovery id state: State
# ## protocol*: discv5.Protocol
key: PrivateKey
# readUintBE[256](keccak256.digest(cid.data.buffer).data) peerId: PeerId
announceAddrs*: seq[MultiAddress]
# proc toNodeId*(host: ca.Address): NodeId = providerRecord*: ?SignedPeerRecord
# ## Eth address to discovery id dhtRecord*: ?SignedPeerRecord
# ##
# readUintBE[256](keccak256.digest(host.toArray).data)
proc getNode*(d: Dht, nodeId: NodeId): ?!Node = proc getNode*(d: Dht, nodeId: NodeId): ?!Node =
let node = d.protocol.getNode(nodeId) let node = d.protocol.getNode(nodeId)
if node.isSome(): if node.isSome():
return success(node.get()) return success(node.get())
return failure("Node not found for id: " & $(NodeId(nodeId))) return failure("Node not found for id: " & nodeId.toHex())
proc getRoutingTableNodeIds(d: Dht): seq[NodeId] = method getRoutingTableNodeIds*(d: Dht): seq[Nid] {.base.} =
var ids = newSeq[NodeId]() var ids = newSeq[Nid]()
for bucket in d.protocol.routingTable.buckets: for bucket in d.protocol.routingTable.buckets:
for node in bucket.nodes: for node in bucket.nodes:
ids.add(node.id) ids.add(node.id)
return ids return ids
proc getNeighbors*(d: Dht, target: NodeId): Future[?!seq[Node]] {.async.} = method getNeighbors*(d: Dht, target: Nid): Future[?!GetNeighborsResponse] {.async: (raises: []), base.} =
without node =? d.getNode(target), err: without node =? d.getNode(target), err:
return failure(err) return failure(err)
let distances = @[256.uint16] let distances = @[256.uint16]
let response = await d.protocol.findNode(node, distances) try:
let response = await d.protocol.findNode(node, distances)
if response.isOk(): if response.isOk():
let nodes = response.get() let nodes = response.get()
if nodes.len > 0: return success(GetNeighborsResponse(
return success(nodes) isResponsive: true,
nodeIds: nodes.mapIt(it.id))
# Both returning 0 nodes and a failure result are treated as failure of getNeighbors )
return failure("No nodes returned") return failure($response.error())
except CatchableError as exc:
return failure(exc.msg)
proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} = proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} =
trace "protocol.resolve..." trace "protocol.resolve..."
@ -103,19 +100,19 @@ proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) =
if not d.protocol.isNil: if not d.protocol.isNil:
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
proc findRoutingTableNodes(d: Dht) {.async.} = # proc findRoutingTableNodes(d: Dht) {.async.} =
await sleepAsync(5.seconds) # await sleepAsync(5.seconds)
let nodes = d.getRoutingTableNodeIds() # let nodes = d.getRoutingTableNodeIds()
if err =? (await d.state.events.nodesFound.fire(nodes)).errorOption: # if err =? (await d.state.events.nodesFound.fire(nodes)).errorOption:
error "Failed to raise routing-table nodes as found nodes", err = err.msg # error "Failed to raise routing-table nodes as found nodes", err = err.msg
else: # else:
trace "Routing table nodes raise as found nodes", num = nodes.len # trace "Routing table nodes raised as found nodes", num = nodes.len
method start*(d: Dht): Future[?!void] {.async.} = method start*(d: Dht): Future[?!void] {.async.} =
d.protocol.open() d.protocol.open()
await d.protocol.start() await d.protocol.start()
asyncSpawn d.findRoutingTableNodes() # asyncSpawn d.findRoutingTableNodes()
return success() return success()
method stop*(d: Dht): Future[?!void] {.async.} = method stop*(d: Dht): Future[?!void] {.async.} =

View File

@ -40,4 +40,5 @@ method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} =
s.status = ApplicationStatus.Stopping s.status = ApplicationStatus.Stopping
await sleepAsync(delay) await sleepAsync(delay)
# todo this needs a delay because starts are still being called.
asyncSpawn worker() asyncSpawn worker()

View File

@ -0,0 +1,115 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import ../../../codexcrawler/components/crawler
import ../../../codexcrawler/services/dht
import ../../../codexcrawler/utils/asyncdataevent
import ../../../codexcrawler/types
import ../../../codexcrawler/state
import ../mockstate
import ../mockdht
import ../mocktodolist
import ../helpers
suite "Crawler":
var
nid1: Nid
nid2: Nid
state: MockState
todo: MockTodoList
dht: MockDht
crawler: Crawler
setup:
nid1 = genNid()
nid2 = genNid()
state = createMockState()
todo = createMockTodoList()
dht = createMockDht()
crawler = Crawler.new(state, dht, todo)
(await crawler.start()).tryGet()
teardown:
(await crawler.stop()).tryGet()
state.checkAllUnsubscribed()
proc onStep() {.async.} =
(await state.stepper()).tryGet()
proc responsive(nid: Nid): GetNeighborsResponse =
GetNeighborsResponse(
isResponsive: true,
nodeIds: @[nid]
)
proc unresponsive(nid: Nid): GetNeighborsResponse =
GetNeighborsResponse(
isResponsive: false,
nodeIds: @[nid]
)
test "onStep should pop a node from the todoList and getNeighbors for it":
todo.popReturn = success(nid1)
dht.getNeighborsReturn = success(responsive(nid1))
await onStep()
check:
!(dht.getNeighborsArg) == nid1
test "nodes returned by getNeighbors are raised as nodesFound":
var nodesFound = newSeq[Nid]()
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
nodesFound = nids
return success()
let sub = state.events.nodesFound.subscribe(onNodesFound)
todo.popReturn = success(nid1)
dht.getNeighborsReturn = success(responsive(nid2))
await onStep()
check:
nid2 in nodesFound
await state.events.nodesFound.unsubscribe(sub)
test "responsive result from getNeighbors raises the node as successful dhtNodeCheck":
var checkEvent = DhtNodeCheckEventData()
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
checkEvent = event
return success()
let sub = state.events.dhtNodeCheck.subscribe(onCheck)
todo.popReturn = success(nid1)
dht.getNeighborsReturn = success(responsive(nid2))
await onStep()
check:
checkEvent.id == nid1
checkEvent.isOk == true
await state.events.dhtNodeCheck.unsubscribe(sub)
test "unresponsive result from getNeighbors raises the node as unsuccessful dhtNodeCheck":
var checkEvent = DhtNodeCheckEventData()
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
checkEvent = event
return success()
let sub = state.events.dhtNodeCheck.subscribe(onCheck)
todo.popReturn = success(nid1)
dht.getNeighborsReturn = success(unresponsive(nid2))
await onStep()
check:
checkEvent.id == nid1
checkEvent.isOk == false
await state.events.dhtNodeCheck.unsubscribe(sub)

View File

@ -44,6 +44,9 @@ suite "TimeTracker":
await state.events.nodesExpired.unsubscribe(sub) await state.events.nodesExpired.unsubscribe(sub)
state.checkAllUnsubscribed() state.checkAllUnsubscribed()
proc onStep() {.async.} =
(await state.stepper()).tryGet()
proc createNodeInStore(lastVisit: uint64): Nid = proc createNodeInStore(lastVisit: uint64): Nid =
let entry = NodeEntry(id: genNid(), lastVisit: lastVisit) let entry = NodeEntry(id: genNid(), lastVisit: lastVisit)
store.nodesToIterate.add(entry) store.nodesToIterate.add(entry)
@ -55,7 +58,7 @@ suite "TimeTracker":
(Moment.now().epochSeconds - ((1 + state.config.revisitDelayMins) * 60)).uint64 (Moment.now().epochSeconds - ((1 + state.config.revisitDelayMins) * 60)).uint64
expiredNodeId = createNodeInStore(expiredTimestamp) expiredNodeId = createNodeInStore(expiredTimestamp)
(await state.stepper()).tryGet() await onStep()
check: check:
expiredNodeId in expiredNodesReceived expiredNodeId in expiredNodesReceived
@ -66,7 +69,7 @@ suite "TimeTracker":
(Moment.now().epochSeconds - ((state.config.revisitDelayMins - 1) * 60)).uint64 (Moment.now().epochSeconds - ((state.config.revisitDelayMins - 1) * 60)).uint64
recentNodeId = createNodeInStore(recentTimestamp) recentNodeId = createNodeInStore(recentTimestamp)
(await state.stepper()).tryGet() await onStep()
check: check:
recentNodeId notin expiredNodesReceived recentNodeId notin expiredNodesReceived

View File

@ -0,0 +1,26 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ../../codexcrawler/services/dht
import ../../codexcrawler/types
type MockDht* = ref object of Dht
routingTable*: seq[Nid]
getNeighborsArg*: ?Nid
getNeighborsReturn*: ?!GetNeighborsResponse
method getRoutingTableNodeIds*(d: MockDht): seq[Nid] =
return d.routingTable
method getNeighbors*(d: MockDht, target: Nid): Future[?!GetNeighborsResponse] {.async: (raises: []).} =
d.getNeighborsArg = some(target)
return d.getNeighborsReturn
method start*(d: MockDht): Future[?!void] {.async.} =
return success()
method stop*(d: MockDht): Future[?!void] {.async.} =
return success()
proc createMockDht*(): MockDht =
MockDht()

View File

@ -1,4 +1,4 @@
import ../../codexcrawler/metrics import ../../codexcrawler/services/metrics
type MockMetrics* = ref object of Metrics type MockMetrics* = ref object of Metrics
todo*: int todo*: int

View File

@ -7,6 +7,16 @@ import ../../codexcrawler/config
type MockState* = ref object of State type MockState* = ref object of State
stepper*: OnStep stepper*: OnStep
proc checkAllUnsubscribed*(s: MockState) =
check:
s.events.nodesFound.listeners == 0
s.events.newNodesDiscovered.listeners == 0
s.events.dhtNodeCheck.listeners == 0
s.events.nodesExpired.listeners == 0
method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} =
s.stepper = step
proc createMockState*(): MockState = proc createMockState*(): MockState =
MockState( MockState(
status: ApplicationStatus.Running, status: ApplicationStatus.Running,
@ -18,13 +28,3 @@ proc createMockState*(): MockState =
nodesExpired: newAsyncDataEvent[seq[Nid]](), nodesExpired: newAsyncDataEvent[seq[Nid]](),
), ),
) )
proc checkAllUnsubscribed*(s: MockState) =
check:
s.events.nodesFound.listeners == 0
s.events.newNodesDiscovered.listeners == 0
s.events.dhtNodeCheck.listeners == 0
s.events.nodesExpired.listeners == 0
method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} =
s.stepper = step

View File

@ -0,0 +1,20 @@
import pkg/chronos
import pkg/questionable/results
import ../../codexcrawler/components/todolist
import ../../codexcrawler/types
type MockTodoList* = ref object of TodoList
popReturn*: ?!Nid
method pop*(t: MockTodoList): Future[?!Nid] {.async: (raises: []).} =
return t.popReturn
method start*(t: MockTodoList): Future[?!void] {.async.} =
return success()
method stop*(t: MockTodoList): Future[?!void] {.async.} =
return success()
proc createMockTodoList*(): MockTodoList =
MockTodoList()

View File

@ -2,5 +2,6 @@ import ./components/testnodestore
import ./components/testdhtmetrics import ./components/testdhtmetrics
import ./components/testtodolist import ./components/testtodolist
import ./components/testtimetracker import ./components/testtimetracker
import ./components/testcrawler
{.warning[UnusedImport]: off.} {.warning[UnusedImport]: off.}