diff --git a/codexcrawler/components/crawler.nim b/codexcrawler/components/crawler.nim index e86de85..0f984a0 100644 --- a/codexcrawler/components/crawler.nim +++ b/codexcrawler/components/crawler.nim @@ -3,86 +3,53 @@ import pkg/chronos import pkg/questionable import pkg/questionable/results -import ./dht -import ../list +import ../services/dht +import ./todolist import ../config -import ../component import ../types +import ../component import ../state import ../utils/asyncdataevent -import std/sequtils - logScope: topics = "crawler" type Crawler* = ref object of Component + state: State dht: Dht - config: Config - todoNodes: List - okNodes: List - nokNodes: List + todo: TodoList -# This is not going to stay this way. -proc isNew(c: Crawler, node: Node): bool = - not c.todoNodes.contains(node.id) and not c.okNodes.contains(node.id) and - not c.nokNodes.contains(node.id) +proc raiseCheckEvent(c: Crawler, nid: Nid, success: bool): Future[?!void] {.async: (raises: []).} = + let event = DhtNodeCheckEventData( + id: nid, + isOk: success + ) + if err =? (await c.state.events.dhtNodeCheck.fire(event)).errorOption: + return failure(err) + return success() -# proc handleNodeNotOk(c: Crawler, target: NodeEntry) {.async.} = -# if err =? (await c.nokNodes.add(target)).errorOption: -# error "Failed to add not-OK-node to list", err = err.msg +proc step(c: Crawler): Future[?!void] {.async: (raises: []).} = + without nid =? (await c.todo.pop()), err: + return failure(err) -# proc handleNodeOk(c: Crawler, target: NodeEntry) {.async.} = -# if err =? (await c.okNodes.add(target)).errorOption: -# error "Failed to add OK-node to list", err = err.msg + without response =? await c.dht.getNeighbors(nid), err: + return failure(err) -# proc addNewTodoNode(c: Crawler, nodeId: NodeId): Future[?!void] {.async.} = -# let entry = NodeEntry(id: nodeId, lastVisit: 0) -# return await c.todoNodes.add(entry) + if err =? (await c.raiseCheckEvent(nid, response.isResponsive)).errorOption: + return failure(err) -# proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} = -# for node in newNodes: -# if err =? (await c.addNewTodoNode(node.id)).errorOption: -# error "Failed to add todo-node to list", err = err.msg + if err =? (await c.state.events.nodesFound.fire(response.nodeIds)).errorOption: + return failure(err) -# proc step(c: Crawler) {.async.} = -# logScope: -# todo = $c.todoNodes.len -# ok = $c.okNodes.len -# nok = $c.nokNodes.len - -# without var target =? (await c.todoNodes.pop()), err: -# error "Failed to get todo node", err = err.msg - -# target.lastVisit = Moment.now().epochSeconds.uint64 - -# without receivedNodes =? (await c.dht.getNeighbors(target.id)), err: -# await c.handleNodeNotOk(target) -# return - -# let newNodes = receivedNodes.filterIt(isNew(c, it)) -# if newNodes.len > 0: -# trace "Discovered new nodes", newNodes = newNodes.len - -# await c.handleNodeOk(target) -# await c.addNewTodoNodes(newNodes) - -# # Don't log the status every loop: -# if (c.todoNodes.len mod 10) == 0: -# trace "Status" - -proc worker(c: Crawler) {.async.} = - try: - while true: - # await c.step() - await sleepAsync(c.config.stepDelayMs.millis) - except Exception as exc: - error "Exception in crawler worker", msg = exc.msg - quit QuitFailure + return success() method start*(c: Crawler): Future[?!void] {.async.} = - info "Starting crawler...", stepDelayMs = $c.config.stepDelayMs - asyncSpawn c.worker() + info "Starting crawler..." + + proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} = + await c.step() + await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds) + return success() method stop*(c: Crawler): Future[?!void] {.async.} = @@ -90,13 +57,12 @@ method stop*(c: Crawler): Future[?!void] {.async.} = proc new*( T: type Crawler, + state: State, dht: Dht, - # todoNodes: List, - # okNodes: List, - # nokNodes: List, - config: Config, + todo: TodoList ): Crawler = Crawler( + state: state, dht: dht, - config: config, # todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes, + todo: todo ) diff --git a/codexcrawler/components/dhtmetrics.nim b/codexcrawler/components/dhtmetrics.nim index 0ba6ca8..092abde 100644 --- a/codexcrawler/components/dhtmetrics.nim +++ b/codexcrawler/components/dhtmetrics.nim @@ -3,10 +3,9 @@ import pkg/chronos import pkg/questionable import pkg/questionable/results -import ./dht import ../list import ../state -import ../metrics +import ../services/metrics import ../component import ../utils/asyncdataevent diff --git a/codexcrawler/components/todolist.nim b/codexcrawler/components/todolist.nim index 7eac856..0d55ccb 100644 --- a/codexcrawler/components/todolist.nim +++ b/codexcrawler/components/todolist.nim @@ -30,12 +30,15 @@ proc addNodes(t: TodoList, nids: seq[Nid]) = s.complete() t.emptySignal = Future[void].none -proc pop*(t: TodoList): Future[?!Nid] {.async.} = +method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} = if t.nids.len < 1: trace "List is empty. Waiting for new items..." let signal = newFuture[void]("list.emptySignal") t.emptySignal = some(signal) - await signal.wait(1.hours) + try: + await signal.wait(1.hours) + except CatchableError as exc: + return failure(exc.msg) if t.nids.len < 1: return failure("TodoList is empty.") @@ -63,5 +66,5 @@ method stop*(t: TodoList): Future[?!void] {.async.} = proc new*(_: type TodoList, state: State): TodoList = TodoList(nids: newSeq[Nid](), state: state, emptySignal: Future[void].none) -proc createTodoList*(state: State): ?!TodoList = - success(TodoList.new(state)) +proc createTodoList*(state: State): TodoList = + TodoList.new(state) diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim index dfd4575..e8a7fd5 100644 --- a/codexcrawler/installer.nim +++ b/codexcrawler/installer.nim @@ -2,9 +2,9 @@ import pkg/chronos import pkg/questionable/results import ./state -import ./metrics +import ./services/metrics +import ./services/dht import ./component -import ./components/dht import ./components/crawler import ./components/timetracker import ./components/nodestore @@ -19,14 +19,18 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = without nodeStore =? createNodeStore(state), err: return failure(err) - let metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort) + let + metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort) + todoList = createTodoList(state) without dhtMetrics =? createDhtMetrics(state, metrics), err: return failure(err) + components.add(todoList) components.add(nodeStore) components.add(dht) - components.add(Crawler.new(dht, state.config)) + components.add(Crawler.new(state, dht, todoList)) components.add(TimeTracker.new(state, nodeStore)) components.add(dhtMetrics) + return success(components) diff --git a/codexcrawler/components/dht.nim b/codexcrawler/services/dht.nim similarity index 74% rename from codexcrawler/components/dht.nim rename to codexcrawler/services/dht.nim index 565af1e..c1c777c 100644 --- a/codexcrawler/components/dht.nim +++ b/codexcrawler/services/dht.nim @@ -1,5 +1,6 @@ import std/os import std/net +import std/sequtils import pkg/chronicles import pkg/chronos import pkg/libp2p @@ -11,63 +12,59 @@ from pkg/nimcrypto import keccak256 import ../utils/keyutils import ../utils/datastoreutils import ../utils/rng -import ../utils/asyncdataevent import ../component import ../state +import ../types export discv5 logScope: topics = "dht" -type Dht* = ref object of Component - state: State - protocol*: discv5.Protocol - key: PrivateKey - peerId: PeerId - announceAddrs*: seq[MultiAddress] - providerRecord*: ?SignedPeerRecord - dhtRecord*: ?SignedPeerRecord +type + GetNeighborsResponse* = ref object + isResponsive*: bool + nodeIds*: seq[Nid] -# proc toNodeId*(cid: Cid): NodeId = -# ## Cid to discovery id -# ## - -# readUintBE[256](keccak256.digest(cid.data.buffer).data) - -# proc toNodeId*(host: ca.Address): NodeId = -# ## Eth address to discovery id -# ## - -# readUintBE[256](keccak256.digest(host.toArray).data) + Dht* = ref object of Component + state: State + protocol*: discv5.Protocol + key: PrivateKey + peerId: PeerId + announceAddrs*: seq[MultiAddress] + providerRecord*: ?SignedPeerRecord + dhtRecord*: ?SignedPeerRecord proc getNode*(d: Dht, nodeId: NodeId): ?!Node = let node = d.protocol.getNode(nodeId) if node.isSome(): return success(node.get()) - return failure("Node not found for id: " & $(NodeId(nodeId))) + return failure("Node not found for id: " & nodeId.toHex()) -proc getRoutingTableNodeIds(d: Dht): seq[NodeId] = - var ids = newSeq[NodeId]() +method getRoutingTableNodeIds*(d: Dht): seq[Nid] {.base.} = + var ids = newSeq[Nid]() for bucket in d.protocol.routingTable.buckets: for node in bucket.nodes: ids.add(node.id) return ids -proc getNeighbors*(d: Dht, target: NodeId): Future[?!seq[Node]] {.async.} = +method getNeighbors*(d: Dht, target: Nid): Future[?!GetNeighborsResponse] {.async: (raises: []), base.} = without node =? d.getNode(target), err: return failure(err) let distances = @[256.uint16] - let response = await d.protocol.findNode(node, distances) + try: + let response = await d.protocol.findNode(node, distances) - if response.isOk(): - let nodes = response.get() - if nodes.len > 0: - return success(nodes) - - # Both returning 0 nodes and a failure result are treated as failure of getNeighbors - return failure("No nodes returned") + if response.isOk(): + let nodes = response.get() + return success(GetNeighborsResponse( + isResponsive: true, + nodeIds: nodes.mapIt(it.id)) + ) + return failure($response.error()) + except CatchableError as exc: + return failure(exc.msg) proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} = trace "protocol.resolve..." @@ -103,19 +100,19 @@ proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) = if not d.protocol.isNil: d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") -proc findRoutingTableNodes(d: Dht) {.async.} = - await sleepAsync(5.seconds) - let nodes = d.getRoutingTableNodeIds() +# proc findRoutingTableNodes(d: Dht) {.async.} = +# await sleepAsync(5.seconds) +# let nodes = d.getRoutingTableNodeIds() - if err =? (await d.state.events.nodesFound.fire(nodes)).errorOption: - error "Failed to raise routing-table nodes as found nodes", err = err.msg - else: - trace "Routing table nodes raise as found nodes", num = nodes.len +# if err =? (await d.state.events.nodesFound.fire(nodes)).errorOption: +# error "Failed to raise routing-table nodes as found nodes", err = err.msg +# else: +# trace "Routing table nodes raised as found nodes", num = nodes.len method start*(d: Dht): Future[?!void] {.async.} = d.protocol.open() await d.protocol.start() - asyncSpawn d.findRoutingTableNodes() + # asyncSpawn d.findRoutingTableNodes() return success() method stop*(d: Dht): Future[?!void] {.async.} = diff --git a/codexcrawler/metrics.nim b/codexcrawler/services/metrics.nim similarity index 100% rename from codexcrawler/metrics.nim rename to codexcrawler/services/metrics.nim diff --git a/codexcrawler/state.nim b/codexcrawler/state.nim index 5670ce9..5394ef1 100644 --- a/codexcrawler/state.nim +++ b/codexcrawler/state.nim @@ -40,4 +40,5 @@ method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} = s.status = ApplicationStatus.Stopping await sleepAsync(delay) + # todo this needs a delay because starts are still being called. asyncSpawn worker() diff --git a/tests/codexcrawler/components/testcrawler.nim b/tests/codexcrawler/components/testcrawler.nim new file mode 100644 index 0000000..2ec2948 --- /dev/null +++ b/tests/codexcrawler/components/testcrawler.nim @@ -0,0 +1,115 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/asynctest/chronos/unittest + +import ../../../codexcrawler/components/crawler +import ../../../codexcrawler/services/dht +import ../../../codexcrawler/utils/asyncdataevent +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mockstate +import ../mockdht +import ../mocktodolist +import ../helpers + +suite "Crawler": + var + nid1: Nid + nid2: Nid + state: MockState + todo: MockTodoList + dht: MockDht + crawler: Crawler + + setup: + nid1 = genNid() + nid2 = genNid() + state = createMockState() + todo = createMockTodoList() + dht = createMockDht() + + crawler = Crawler.new(state, dht, todo) + + (await crawler.start()).tryGet() + + teardown: + (await crawler.stop()).tryGet() + state.checkAllUnsubscribed() + + proc onStep() {.async.} = + (await state.stepper()).tryGet() + + proc responsive(nid: Nid): GetNeighborsResponse = + GetNeighborsResponse( + isResponsive: true, + nodeIds: @[nid] + ) + + proc unresponsive(nid: Nid): GetNeighborsResponse = + GetNeighborsResponse( + isResponsive: false, + nodeIds: @[nid] + ) + + test "onStep should pop a node from the todoList and getNeighbors for it": + todo.popReturn = success(nid1) + dht.getNeighborsReturn = success(responsive(nid1)) + + await onStep() + + check: + !(dht.getNeighborsArg) == nid1 + + test "nodes returned by getNeighbors are raised as nodesFound": + var nodesFound = newSeq[Nid]() + proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = + nodesFound = nids + return success() + let sub = state.events.nodesFound.subscribe(onNodesFound) + + todo.popReturn = success(nid1) + dht.getNeighborsReturn = success(responsive(nid2)) + + await onStep() + + check: + nid2 in nodesFound + + await state.events.nodesFound.unsubscribe(sub) + + test "responsive result from getNeighbors raises the node as successful dhtNodeCheck": + var checkEvent = DhtNodeCheckEventData() + proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = + checkEvent = event + return success() + let sub = state.events.dhtNodeCheck.subscribe(onCheck) + + todo.popReturn = success(nid1) + dht.getNeighborsReturn = success(responsive(nid2)) + + await onStep() + + check: + checkEvent.id == nid1 + checkEvent.isOk == true + + await state.events.dhtNodeCheck.unsubscribe(sub) + + test "unresponsive result from getNeighbors raises the node as unsuccessful dhtNodeCheck": + var checkEvent = DhtNodeCheckEventData() + proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = + checkEvent = event + return success() + let sub = state.events.dhtNodeCheck.subscribe(onCheck) + + todo.popReturn = success(nid1) + dht.getNeighborsReturn = success(unresponsive(nid2)) + + await onStep() + + check: + checkEvent.id == nid1 + checkEvent.isOk == false + + await state.events.dhtNodeCheck.unsubscribe(sub) diff --git a/tests/codexcrawler/components/testtimetracker.nim b/tests/codexcrawler/components/testtimetracker.nim index 83f1fb2..72666ff 100644 --- a/tests/codexcrawler/components/testtimetracker.nim +++ b/tests/codexcrawler/components/testtimetracker.nim @@ -44,6 +44,9 @@ suite "TimeTracker": await state.events.nodesExpired.unsubscribe(sub) state.checkAllUnsubscribed() + proc onStep() {.async.} = + (await state.stepper()).tryGet() + proc createNodeInStore(lastVisit: uint64): Nid = let entry = NodeEntry(id: genNid(), lastVisit: lastVisit) store.nodesToIterate.add(entry) @@ -55,7 +58,7 @@ suite "TimeTracker": (Moment.now().epochSeconds - ((1 + state.config.revisitDelayMins) * 60)).uint64 expiredNodeId = createNodeInStore(expiredTimestamp) - (await state.stepper()).tryGet() + await onStep() check: expiredNodeId in expiredNodesReceived @@ -66,7 +69,7 @@ suite "TimeTracker": (Moment.now().epochSeconds - ((state.config.revisitDelayMins - 1) * 60)).uint64 recentNodeId = createNodeInStore(recentTimestamp) - (await state.stepper()).tryGet() + await onStep() check: recentNodeId notin expiredNodesReceived diff --git a/tests/codexcrawler/mockdht.nim b/tests/codexcrawler/mockdht.nim new file mode 100644 index 0000000..98232ad --- /dev/null +++ b/tests/codexcrawler/mockdht.nim @@ -0,0 +1,26 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import ../../codexcrawler/services/dht +import ../../codexcrawler/types + +type MockDht* = ref object of Dht + routingTable*: seq[Nid] + getNeighborsArg*: ?Nid + getNeighborsReturn*: ?!GetNeighborsResponse + +method getRoutingTableNodeIds*(d: MockDht): seq[Nid] = + return d.routingTable + +method getNeighbors*(d: MockDht, target: Nid): Future[?!GetNeighborsResponse] {.async: (raises: []).} = + d.getNeighborsArg = some(target) + return d.getNeighborsReturn + +method start*(d: MockDht): Future[?!void] {.async.} = + return success() + +method stop*(d: MockDht): Future[?!void] {.async.} = + return success() + +proc createMockDht*(): MockDht = + MockDht() diff --git a/tests/codexcrawler/mockmetrics.nim b/tests/codexcrawler/mockmetrics.nim index 021a8b0..16837b3 100644 --- a/tests/codexcrawler/mockmetrics.nim +++ b/tests/codexcrawler/mockmetrics.nim @@ -1,4 +1,4 @@ -import ../../codexcrawler/metrics +import ../../codexcrawler/services/metrics type MockMetrics* = ref object of Metrics todo*: int diff --git a/tests/codexcrawler/mockstate.nim b/tests/codexcrawler/mockstate.nim index bfa7e6f..abc9594 100644 --- a/tests/codexcrawler/mockstate.nim +++ b/tests/codexcrawler/mockstate.nim @@ -7,6 +7,16 @@ import ../../codexcrawler/config type MockState* = ref object of State stepper*: OnStep +proc checkAllUnsubscribed*(s: MockState) = + check: + s.events.nodesFound.listeners == 0 + s.events.newNodesDiscovered.listeners == 0 + s.events.dhtNodeCheck.listeners == 0 + s.events.nodesExpired.listeners == 0 + +method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} = + s.stepper = step + proc createMockState*(): MockState = MockState( status: ApplicationStatus.Running, @@ -18,13 +28,3 @@ proc createMockState*(): MockState = nodesExpired: newAsyncDataEvent[seq[Nid]](), ), ) - -proc checkAllUnsubscribed*(s: MockState) = - check: - s.events.nodesFound.listeners == 0 - s.events.newNodesDiscovered.listeners == 0 - s.events.dhtNodeCheck.listeners == 0 - s.events.nodesExpired.listeners == 0 - -method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} = - s.stepper = step diff --git a/tests/codexcrawler/mocktodolist.nim b/tests/codexcrawler/mocktodolist.nim new file mode 100644 index 0000000..3ff9988 --- /dev/null +++ b/tests/codexcrawler/mocktodolist.nim @@ -0,0 +1,20 @@ +import pkg/chronos +import pkg/questionable/results + +import ../../codexcrawler/components/todolist +import ../../codexcrawler/types + +type MockTodoList* = ref object of TodoList + popReturn*: ?!Nid + +method pop*(t: MockTodoList): Future[?!Nid] {.async: (raises: []).} = + return t.popReturn + +method start*(t: MockTodoList): Future[?!void] {.async.} = + return success() + +method stop*(t: MockTodoList): Future[?!void] {.async.} = + return success() + +proc createMockTodoList*(): MockTodoList = + MockTodoList() diff --git a/tests/codexcrawler/testcomponents.nim b/tests/codexcrawler/testcomponents.nim index 1b9a6b7..0773ec0 100644 --- a/tests/codexcrawler/testcomponents.nim +++ b/tests/codexcrawler/testcomponents.nim @@ -2,5 +2,6 @@ import ./components/testnodestore import ./components/testdhtmetrics import ./components/testtodolist import ./components/testtimetracker +import ./components/testcrawler {.warning[UnusedImport]: off.}