mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-21 06:43:12 +00:00
applies delay at application start
This commit is contained in:
parent
25291f7625
commit
605b561e30
@ -19,11 +19,10 @@ type Crawler* = ref object of Component
|
||||
dht: Dht
|
||||
todo: TodoList
|
||||
|
||||
proc raiseCheckEvent(c: Crawler, nid: Nid, success: bool): Future[?!void] {.async: (raises: []).} =
|
||||
let event = DhtNodeCheckEventData(
|
||||
id: nid,
|
||||
isOk: success
|
||||
)
|
||||
proc raiseCheckEvent(
|
||||
c: Crawler, nid: Nid, success: bool
|
||||
): Future[?!void] {.async: (raises: []).} =
|
||||
let event = DhtNodeCheckEventData(id: nid, isOk: success)
|
||||
if err =? (await c.state.events.dhtNodeCheck.fire(event)).errorOption:
|
||||
return failure(err)
|
||||
return success()
|
||||
@ -48,6 +47,7 @@ method start*(c: Crawler): Future[?!void] {.async.} =
|
||||
|
||||
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
|
||||
await c.step()
|
||||
|
||||
await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds)
|
||||
|
||||
return success()
|
||||
@ -55,14 +55,5 @@ method start*(c: Crawler): Future[?!void] {.async.} =
|
||||
method stop*(c: Crawler): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
proc new*(
|
||||
T: type Crawler,
|
||||
state: State,
|
||||
dht: Dht,
|
||||
todo: TodoList
|
||||
): Crawler =
|
||||
Crawler(
|
||||
state: state,
|
||||
dht: dht,
|
||||
todo: todo
|
||||
)
|
||||
proc new*(T: type Crawler, state: State, dht: Dht, todo: TodoList): Crawler =
|
||||
Crawler(state: state, dht: dht, todo: todo)
|
||||
|
||||
@ -9,6 +9,7 @@ import ./components/crawler
|
||||
import ./components/timetracker
|
||||
import ./components/nodestore
|
||||
import ./components/dhtmetrics
|
||||
import ./components/todolist
|
||||
|
||||
proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
|
||||
var components: seq[Component] = newSeq[Component]()
|
||||
@ -32,5 +33,5 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
|
||||
components.add(Crawler.new(state, dht, todoList))
|
||||
components.add(TimeTracker.new(state, nodeStore))
|
||||
components.add(dhtMetrics)
|
||||
|
||||
|
||||
return success(components)
|
||||
|
||||
@ -21,7 +21,7 @@ export discv5
|
||||
logScope:
|
||||
topics = "dht"
|
||||
|
||||
type
|
||||
type
|
||||
GetNeighborsResponse* = ref object
|
||||
isResponsive*: bool
|
||||
nodeIds*: seq[Nid]
|
||||
@ -35,6 +35,12 @@ type
|
||||
providerRecord*: ?SignedPeerRecord
|
||||
dhtRecord*: ?SignedPeerRecord
|
||||
|
||||
proc responsive(nodeIds: seq[Nid]): GetNeighborsResponse =
|
||||
GetNeighborsResponse(isResponsive: true, nodeIds: nodeIds)
|
||||
|
||||
proc unresponsive(): GetNeighborsResponse =
|
||||
GetNeighborsResponse(isResponsive: false, nodeIds: newSeq[Nid]())
|
||||
|
||||
proc getNode*(d: Dht, nodeId: NodeId): ?!Node =
|
||||
let node = d.protocol.getNode(nodeId)
|
||||
if node.isSome():
|
||||
@ -48,9 +54,11 @@ method getRoutingTableNodeIds*(d: Dht): seq[Nid] {.base.} =
|
||||
ids.add(node.id)
|
||||
return ids
|
||||
|
||||
method getNeighbors*(d: Dht, target: Nid): Future[?!GetNeighborsResponse] {.async: (raises: []), base.} =
|
||||
method getNeighbors*(
|
||||
d: Dht, target: Nid
|
||||
): Future[?!GetNeighborsResponse] {.async: (raises: []), base.} =
|
||||
without node =? d.getNode(target), err:
|
||||
return failure(err)
|
||||
return success(unresponsive())
|
||||
|
||||
let distances = @[256.uint16]
|
||||
try:
|
||||
@ -58,11 +66,12 @@ method getNeighbors*(d: Dht, target: Nid): Future[?!GetNeighborsResponse] {.asyn
|
||||
|
||||
if response.isOk():
|
||||
let nodes = response.get()
|
||||
return success(GetNeighborsResponse(
|
||||
isResponsive: true,
|
||||
nodeIds: nodes.mapIt(it.id))
|
||||
)
|
||||
return failure($response.error())
|
||||
return success(responsive(nodes.mapIt(it.id)))
|
||||
else:
|
||||
let errmsg = $(response.error())
|
||||
if errmsg == "Nodes message not received in time":
|
||||
return success(unresponsive())
|
||||
return failure(errmsg)
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
|
||||
|
||||
@ -32,7 +32,9 @@ type
|
||||
config*: Config
|
||||
events*: Events
|
||||
|
||||
method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} =
|
||||
proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async.} =
|
||||
await sleepAsync(3.seconds)
|
||||
|
||||
proc worker(): Future[void] {.async.} =
|
||||
while s.status == ApplicationStatus.Running:
|
||||
if err =? (await step()).errorOption:
|
||||
@ -40,5 +42,10 @@ method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} =
|
||||
s.status = ApplicationStatus.Stopping
|
||||
await sleepAsync(delay)
|
||||
|
||||
# todo this needs a delay because starts are still being called.
|
||||
asyncSpawn worker()
|
||||
|
||||
method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} =
|
||||
# We use a small delay before starting the workers because 'whileRunning' is likely called from
|
||||
# component 'start' methods, which are executed sequentially in arbitrary order (to prevent temporal coupling).
|
||||
# Worker steps might start raising events that other components haven't had time to subscribe to yet.
|
||||
asyncSpawn s.delayedWorkerStart(step, delay)
|
||||
|
||||
@ -37,20 +37,14 @@ suite "Crawler":
|
||||
(await crawler.stop()).tryGet()
|
||||
state.checkAllUnsubscribed()
|
||||
|
||||
proc onStep() {.async.} =
|
||||
proc onStep() {.async.} =
|
||||
(await state.stepper()).tryGet()
|
||||
|
||||
proc responsive(nid: Nid): GetNeighborsResponse =
|
||||
GetNeighborsResponse(
|
||||
isResponsive: true,
|
||||
nodeIds: @[nid]
|
||||
)
|
||||
GetNeighborsResponse(isResponsive: true, nodeIds: @[nid])
|
||||
|
||||
proc unresponsive(nid: Nid): GetNeighborsResponse =
|
||||
GetNeighborsResponse(
|
||||
isResponsive: false,
|
||||
nodeIds: @[nid]
|
||||
)
|
||||
GetNeighborsResponse(isResponsive: false, nodeIds: @[nid])
|
||||
|
||||
test "onStep should pop a node from the todoList and getNeighbors for it":
|
||||
todo.popReturn = success(nid1)
|
||||
@ -66,6 +60,7 @@ suite "Crawler":
|
||||
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
nodesFound = nids
|
||||
return success()
|
||||
|
||||
let sub = state.events.nodesFound.subscribe(onNodesFound)
|
||||
|
||||
todo.popReturn = success(nid1)
|
||||
@ -83,6 +78,7 @@ suite "Crawler":
|
||||
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
|
||||
checkEvent = event
|
||||
return success()
|
||||
|
||||
let sub = state.events.dhtNodeCheck.subscribe(onCheck)
|
||||
|
||||
todo.popReturn = success(nid1)
|
||||
@ -101,6 +97,7 @@ suite "Crawler":
|
||||
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
|
||||
checkEvent = event
|
||||
return success()
|
||||
|
||||
let sub = state.events.dhtNodeCheck.subscribe(onCheck)
|
||||
|
||||
todo.popReturn = success(nid1)
|
||||
|
||||
@ -44,7 +44,7 @@ suite "TimeTracker":
|
||||
await state.events.nodesExpired.unsubscribe(sub)
|
||||
state.checkAllUnsubscribed()
|
||||
|
||||
proc onStep() {.async.} =
|
||||
proc onStep() {.async.} =
|
||||
(await state.stepper()).tryGet()
|
||||
|
||||
proc createNodeInStore(lastVisit: uint64): Nid =
|
||||
|
||||
@ -12,7 +12,9 @@ type MockDht* = ref object of Dht
|
||||
method getRoutingTableNodeIds*(d: MockDht): seq[Nid] =
|
||||
return d.routingTable
|
||||
|
||||
method getNeighbors*(d: MockDht, target: Nid): Future[?!GetNeighborsResponse] {.async: (raises: []).} =
|
||||
method getNeighbors*(
|
||||
d: MockDht, target: Nid
|
||||
): Future[?!GetNeighborsResponse] {.async: (raises: []).} =
|
||||
d.getNeighborsArg = some(target)
|
||||
return d.getNeighborsReturn
|
||||
|
||||
|
||||
@ -6,7 +6,7 @@ import ../../codexcrawler/types
|
||||
|
||||
type MockTodoList* = ref object of TodoList
|
||||
popReturn*: ?!Nid
|
||||
|
||||
|
||||
method pop*(t: MockTodoList): Future[?!Nid] {.async: (raises: []).} =
|
||||
return t.popReturn
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user