diff --git a/codexcrawler/components/crawler.nim b/codexcrawler/components/crawler.nim index 0f984a0..b4db6f0 100644 --- a/codexcrawler/components/crawler.nim +++ b/codexcrawler/components/crawler.nim @@ -19,11 +19,10 @@ type Crawler* = ref object of Component dht: Dht todo: TodoList -proc raiseCheckEvent(c: Crawler, nid: Nid, success: bool): Future[?!void] {.async: (raises: []).} = - let event = DhtNodeCheckEventData( - id: nid, - isOk: success - ) +proc raiseCheckEvent( + c: Crawler, nid: Nid, success: bool +): Future[?!void] {.async: (raises: []).} = + let event = DhtNodeCheckEventData(id: nid, isOk: success) if err =? (await c.state.events.dhtNodeCheck.fire(event)).errorOption: return failure(err) return success() @@ -48,6 +47,7 @@ method start*(c: Crawler): Future[?!void] {.async.} = proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} = await c.step() + await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds) return success() @@ -55,14 +55,5 @@ method start*(c: Crawler): Future[?!void] {.async.} = method stop*(c: Crawler): Future[?!void] {.async.} = return success() -proc new*( - T: type Crawler, - state: State, - dht: Dht, - todo: TodoList -): Crawler = - Crawler( - state: state, - dht: dht, - todo: todo - ) +proc new*(T: type Crawler, state: State, dht: Dht, todo: TodoList): Crawler = + Crawler(state: state, dht: dht, todo: todo) diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim index e8a7fd5..38cf0d6 100644 --- a/codexcrawler/installer.nim +++ b/codexcrawler/installer.nim @@ -9,6 +9,7 @@ import ./components/crawler import ./components/timetracker import ./components/nodestore import ./components/dhtmetrics +import ./components/todolist proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = var components: seq[Component] = newSeq[Component]() @@ -32,5 +33,5 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = components.add(Crawler.new(state, dht, todoList)) components.add(TimeTracker.new(state, nodeStore)) components.add(dhtMetrics) - + return success(components) diff --git a/codexcrawler/services/dht.nim b/codexcrawler/services/dht.nim index c1c777c..345dd31 100644 --- a/codexcrawler/services/dht.nim +++ b/codexcrawler/services/dht.nim @@ -21,7 +21,7 @@ export discv5 logScope: topics = "dht" -type +type GetNeighborsResponse* = ref object isResponsive*: bool nodeIds*: seq[Nid] @@ -35,6 +35,12 @@ type providerRecord*: ?SignedPeerRecord dhtRecord*: ?SignedPeerRecord +proc responsive(nodeIds: seq[Nid]): GetNeighborsResponse = + GetNeighborsResponse(isResponsive: true, nodeIds: nodeIds) + +proc unresponsive(): GetNeighborsResponse = + GetNeighborsResponse(isResponsive: false, nodeIds: newSeq[Nid]()) + proc getNode*(d: Dht, nodeId: NodeId): ?!Node = let node = d.protocol.getNode(nodeId) if node.isSome(): @@ -48,9 +54,11 @@ method getRoutingTableNodeIds*(d: Dht): seq[Nid] {.base.} = ids.add(node.id) return ids -method getNeighbors*(d: Dht, target: Nid): Future[?!GetNeighborsResponse] {.async: (raises: []), base.} = +method getNeighbors*( + d: Dht, target: Nid +): Future[?!GetNeighborsResponse] {.async: (raises: []), base.} = without node =? d.getNode(target), err: - return failure(err) + return success(unresponsive()) let distances = @[256.uint16] try: @@ -58,11 +66,12 @@ method getNeighbors*(d: Dht, target: Nid): Future[?!GetNeighborsResponse] {.asyn if response.isOk(): let nodes = response.get() - return success(GetNeighborsResponse( - isResponsive: true, - nodeIds: nodes.mapIt(it.id)) - ) - return failure($response.error()) + return success(responsive(nodes.mapIt(it.id))) + else: + let errmsg = $(response.error()) + if errmsg == "Nodes message not received in time": + return success(unresponsive()) + return failure(errmsg) except CatchableError as exc: return failure(exc.msg) diff --git a/codexcrawler/state.nim b/codexcrawler/state.nim index 5394ef1..f4ca2e0 100644 --- a/codexcrawler/state.nim +++ b/codexcrawler/state.nim @@ -32,7 +32,9 @@ type config*: Config events*: Events -method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} = +proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async.} = + await sleepAsync(3.seconds) + proc worker(): Future[void] {.async.} = while s.status == ApplicationStatus.Running: if err =? (await step()).errorOption: @@ -40,5 +42,10 @@ method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} = s.status = ApplicationStatus.Stopping await sleepAsync(delay) - # todo this needs a delay because starts are still being called. asyncSpawn worker() + +method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} = + # We use a small delay before starting the workers because 'whileRunning' is likely called from + # component 'start' methods, which are executed sequentially in arbitrary order (to prevent temporal coupling). + # Worker steps might start raising events that other components haven't had time to subscribe to yet. + asyncSpawn s.delayedWorkerStart(step, delay) diff --git a/tests/codexcrawler/components/testcrawler.nim b/tests/codexcrawler/components/testcrawler.nim index 2ec2948..c2bbe8f 100644 --- a/tests/codexcrawler/components/testcrawler.nim +++ b/tests/codexcrawler/components/testcrawler.nim @@ -37,20 +37,14 @@ suite "Crawler": (await crawler.stop()).tryGet() state.checkAllUnsubscribed() - proc onStep() {.async.} = + proc onStep() {.async.} = (await state.stepper()).tryGet() proc responsive(nid: Nid): GetNeighborsResponse = - GetNeighborsResponse( - isResponsive: true, - nodeIds: @[nid] - ) + GetNeighborsResponse(isResponsive: true, nodeIds: @[nid]) proc unresponsive(nid: Nid): GetNeighborsResponse = - GetNeighborsResponse( - isResponsive: false, - nodeIds: @[nid] - ) + GetNeighborsResponse(isResponsive: false, nodeIds: @[nid]) test "onStep should pop a node from the todoList and getNeighbors for it": todo.popReturn = success(nid1) @@ -66,6 +60,7 @@ suite "Crawler": proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = nodesFound = nids return success() + let sub = state.events.nodesFound.subscribe(onNodesFound) todo.popReturn = success(nid1) @@ -83,6 +78,7 @@ suite "Crawler": proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = checkEvent = event return success() + let sub = state.events.dhtNodeCheck.subscribe(onCheck) todo.popReturn = success(nid1) @@ -101,6 +97,7 @@ suite "Crawler": proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = checkEvent = event return success() + let sub = state.events.dhtNodeCheck.subscribe(onCheck) todo.popReturn = success(nid1) diff --git a/tests/codexcrawler/components/testtimetracker.nim b/tests/codexcrawler/components/testtimetracker.nim index 72666ff..587d1cb 100644 --- a/tests/codexcrawler/components/testtimetracker.nim +++ b/tests/codexcrawler/components/testtimetracker.nim @@ -44,7 +44,7 @@ suite "TimeTracker": await state.events.nodesExpired.unsubscribe(sub) state.checkAllUnsubscribed() - proc onStep() {.async.} = + proc onStep() {.async.} = (await state.stepper()).tryGet() proc createNodeInStore(lastVisit: uint64): Nid = diff --git a/tests/codexcrawler/mockdht.nim b/tests/codexcrawler/mockdht.nim index 98232ad..5aa821c 100644 --- a/tests/codexcrawler/mockdht.nim +++ b/tests/codexcrawler/mockdht.nim @@ -12,7 +12,9 @@ type MockDht* = ref object of Dht method getRoutingTableNodeIds*(d: MockDht): seq[Nid] = return d.routingTable -method getNeighbors*(d: MockDht, target: Nid): Future[?!GetNeighborsResponse] {.async: (raises: []).} = +method getNeighbors*( + d: MockDht, target: Nid +): Future[?!GetNeighborsResponse] {.async: (raises: []).} = d.getNeighborsArg = some(target) return d.getNeighborsReturn diff --git a/tests/codexcrawler/mocktodolist.nim b/tests/codexcrawler/mocktodolist.nim index 3ff9988..7623c89 100644 --- a/tests/codexcrawler/mocktodolist.nim +++ b/tests/codexcrawler/mocktodolist.nim @@ -6,7 +6,7 @@ import ../../codexcrawler/types type MockTodoList* = ref object of TodoList popReturn*: ?!Nid - + method pop*(t: MockTodoList): Future[?!Nid] {.async: (raises: []).} = return t.popReturn