diff --git a/codexcrawler/components/nodestore.nim b/codexcrawler/components/nodestore.nim index f977507..51ae946 100644 --- a/codexcrawler/components/nodestore.nim +++ b/codexcrawler/components/nodestore.nim @@ -19,7 +19,7 @@ type id*: Nid lastVisit*: uint64 - OnNodeEntry = proc(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} + OnNodeEntry* = proc(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} NodeStore* = ref object of Component state: State @@ -87,19 +87,25 @@ proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = ?await s.fireNewNodesDiscovered(newNodes) return success() -proc iterateAll*(s: NodeStore, onNode: OnNodeEntry): Future[?!void] {.async.} = +method iterateAll*( + s: NodeStore, onNode: OnNodeEntry +): Future[?!void] {.async: (raises: []), base.} = without queryKey =? Key.init(nodestoreName), err: return failure(err) - without iter =? (await query[NodeEntry](s.store, Query.init(queryKey))), err: - return failure(err) - - while not iter.finished: - without item =? (await iter.next()), err: - return failure(err) - without value =? item.value, err: + try: + without iter =? (await query[NodeEntry](s.store, Query.init(queryKey))), err: return failure(err) - ?await onNode(value) + while not iter.finished: + without item =? (await iter.next()), err: + return failure(err) + without value =? item.value, err: + return failure(err) + + ?await onNode(value) + except CatchableError as exc: + return failure(exc.msg) + return success() method start*(s: NodeStore): Future[?!void] {.async.} = diff --git a/codexcrawler/components/timetracker.nim b/codexcrawler/components/timetracker.nim index e811481..a89821a 100644 --- a/codexcrawler/components/timetracker.nim +++ b/codexcrawler/components/timetracker.nim @@ -1,79 +1,49 @@ import pkg/chronicles import pkg/chronos -import pkg/questionable import pkg/questionable/results -import ./dht -import ../list -import ../config +import ./nodestore import ../component import ../state +import ../types +import ../utils/asyncdataevent logScope: topics = "timetracker" type TimeTracker* = ref object of Component - config: Config - todoNodes: List - okNodes: List - nokNodes: List - workerDelay: int + state: State + nodestore: NodeStore -# # proc processList(t: TimeTracker, list: List, expiry: uint64) {.async.} = -# # var toMove = newSeq[NodeEntry]() -# # proc onItem(item: NodeEntry) = -# # if item.lastVisit < expiry: -# # toMove.add(item) +proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} = + let expiry = + (Moment.now().epochSeconds - (t.state.config.revisitDelayMins * 60)).uint64 -# # await list.iterateAll(onItem) + var expired = newSeq[Nid]() + proc checkNode(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} = + if item.lastVisit < expiry: + expired.add(item.id) + return success() -# # if toMove.len > 0: -# # trace "expired node, moving to todo", nodes = $toMove.len - -# # for item in toMove: -# # if err =? (await t.todoNodes.add(item)).errorOption: -# # error "Failed to add expired node to todo list", err = err.msg -# # return -# # if err =? (await list.remove(item)).errorOption: -# # error "Failed to remove expired node to source list", err = err.msg - -# proc step(t: TimeTracker) {.async.} = -# let expiry = (Moment.now().epochSeconds - (t.config.revisitDelayMins * 60)).uint64 -# await t.processList(t.okNodes, expiry) -# await t.processList(t.nokNodes, expiry) - -proc worker(t: TimeTracker) {.async.} = - try: - while true: - # await t.step() - await sleepAsync(t.workerDelay.minutes) - except Exception as exc: - error "Exception in timetracker worker", msg = exc.msg - quit QuitFailure + ?await t.nodestore.iterateAll(checkNode) + ?await t.state.events.nodesExpired.fire(expired) + return success() method start*(t: TimeTracker): Future[?!void] {.async.} = - info "Starting timetracker...", revisitDelayMins = $t.workerDelay - asyncSpawn t.worker() + info "Starting timetracker..." + + proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} = + await t.step() + + var delay = t.state.config.revisitDelayMins div 100 + if delay < 1: + delay = 1 + + await t.state.whileRunning(onStep, delay.minutes) return success() method stop*(t: TimeTracker): Future[?!void] {.async.} = return success() -proc new*( - T: type TimeTracker, - # todoNodes: List, - # okNodes: List, - # nokNodes: List, - config: Config, -): TimeTracker = - var delay = config.revisitDelayMins div 10 - if delay < 1: - delay = 1 - - TimeTracker( - # todoNodes: todoNodes, - # okNodes: okNodes, - # nokNodes: nokNodes, - config: config, - workerDelay: delay, - ) +proc new*(T: type TimeTracker, state: State, nodestore: NodeStore): TimeTracker = + TimeTracker(state: state, nodestore: nodestore) diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim index 48c2e9e..dfd4575 100644 --- a/codexcrawler/installer.nim +++ b/codexcrawler/installer.nim @@ -8,6 +8,7 @@ import ./components/dht import ./components/crawler import ./components/timetracker import ./components/nodestore +import ./components/dhtmetrics proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = var components: seq[Component] = newSeq[Component]() @@ -20,8 +21,12 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = let metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort) + without dhtMetrics =? createDhtMetrics(state, metrics), err: + return failure(err) + components.add(nodeStore) components.add(dht) components.add(Crawler.new(dht, state.config)) - components.add(TimeTracker.new(state.config)) + components.add(TimeTracker.new(state, nodeStore)) + components.add(dhtMetrics) return success(components) diff --git a/codexcrawler/state.nim b/codexcrawler/state.nim index e32e856..5670ce9 100644 --- a/codexcrawler/state.nim +++ b/codexcrawler/state.nim @@ -10,7 +10,7 @@ logScope: topics = "state" type - OnStep = proc(): Future[?!void] {.async: (raises: []), gcsafe.} + OnStep* = proc(): Future[?!void] {.async: (raises: []), gcsafe.} DhtNodeCheckEventData* = object id*: Nid @@ -32,7 +32,7 @@ type config*: Config events*: Events -proc whileRunning*(s: State, step: OnStep, delay: Duration) {.async.} = +method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} = proc worker(): Future[void] {.async.} = while s.status == ApplicationStatus.Running: if err =? (await step()).errorOption: diff --git a/codexcrawler/utils/asyncdataevent.nim b/codexcrawler/utils/asyncdataevent.nim index 52a0ea5..81946d7 100644 --- a/codexcrawler/utils/asyncdataevent.nim +++ b/codexcrawler/utils/asyncdataevent.nim @@ -55,18 +55,26 @@ proc subscribe*[T]( event.subscriptions.add(subscription) subscription -proc fire*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} = +proc fire*[T]( + event: AsyncDataEvent[T], data: T +): Future[?!void] {.async: (raises: []).} = event.queue.emit(data.some) var toUnsubscribe = newSeq[AsyncDataEventSubscription]() for sub in event.subscriptions: - await sub.fireEvent.wait() + try: + await sub.fireEvent.wait() + except CancelledError: + discard if err =? sub.lastResult.errorOption: return failure(err) if sub.delayedUnsubscribe: toUnsubscribe.add(sub) for sub in toUnsubscribe: - await event.unsubscribe(sub) + try: + await event.unsubscribe(sub) + except CatchableError as exc: + return failure(exc.msg) success() diff --git a/tests/codexcrawler/components/testtimetracker.nim b/tests/codexcrawler/components/testtimetracker.nim new file mode 100644 index 0000000..83f1fb2 --- /dev/null +++ b/tests/codexcrawler/components/testtimetracker.nim @@ -0,0 +1,72 @@ +import pkg/chronos +import pkg/questionable/results +import pkg/asynctest/chronos/unittest + +import ../../../codexcrawler/components/timetracker +import ../../../codexcrawler/components/nodestore +import ../../../codexcrawler/utils/asyncdataevent +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mockstate +import ../mocknodestore +import ../helpers + +suite "TimeTracker": + var + nid: Nid + state: MockState + store: MockNodeStore + time: TimeTracker + expiredNodesReceived: seq[Nid] + sub: AsyncDataEventSubscription + + setup: + nid = genNid() + state = createMockState() + store = createMockNodeStore() + + # Subscribe to nodesExpired event + expiredNodesReceived = newSeq[Nid]() + proc onExpired(nids: seq[Nid]): Future[?!void] {.async.} = + expiredNodesReceived = nids + return success() + + sub = state.events.nodesExpired.subscribe(onExpired) + + state.config.revisitDelayMins = 22 + + time = TimeTracker.new(state, store) + + (await time.start()).tryGet() + + teardown: + (await time.stop()).tryGet() + await state.events.nodesExpired.unsubscribe(sub) + state.checkAllUnsubscribed() + + proc createNodeInStore(lastVisit: uint64): Nid = + let entry = NodeEntry(id: genNid(), lastVisit: lastVisit) + store.nodesToIterate.add(entry) + return entry.id + + test "onStep fires nodesExpired event for expired nodes": + let + expiredTimestamp = + (Moment.now().epochSeconds - ((1 + state.config.revisitDelayMins) * 60)).uint64 + expiredNodeId = createNodeInStore(expiredTimestamp) + + (await state.stepper()).tryGet() + + check: + expiredNodeId in expiredNodesReceived + + test "onStep does not fire nodesExpired event for nodes that are recent": + let + recentTimestamp = + (Moment.now().epochSeconds - ((state.config.revisitDelayMins - 1) * 60)).uint64 + recentNodeId = createNodeInStore(recentTimestamp) + + (await state.stepper()).tryGet() + + check: + recentNodeId notin expiredNodesReceived diff --git a/tests/codexcrawler/mocknodestore.nim b/tests/codexcrawler/mocknodestore.nim new file mode 100644 index 0000000..eb3ec19 --- /dev/null +++ b/tests/codexcrawler/mocknodestore.nim @@ -0,0 +1,24 @@ +import std/sequtils +import pkg/questionable/results +import pkg/chronos + +import ../../codexcrawler/components/nodestore + +type MockNodeStore* = ref object of NodeStore + nodesToIterate*: seq[NodeEntry] + +method iterateAll*( + s: MockNodeStore, onNode: OnNodeEntry +): Future[?!void] {.async: (raises: []).} = + for node in s.nodesToIterate: + ?await onNode(node) + return success() + +method start*(s: MockNodeStore): Future[?!void] {.async.} = + return success() + +method stop*(s: MockNodeStore): Future[?!void] {.async.} = + return success() + +proc createMockNodeStore*(): MockNodeStore = + MockNodeStore(nodesToIterate: newSeq[NodeEntry]()) diff --git a/tests/codexcrawler/mockstate.nim b/tests/codexcrawler/mockstate.nim index 7721b08..bfa7e6f 100644 --- a/tests/codexcrawler/mockstate.nim +++ b/tests/codexcrawler/mockstate.nim @@ -5,6 +5,7 @@ import ../../codexcrawler/types import ../../codexcrawler/config type MockState* = ref object of State + stepper*: OnStep proc createMockState*(): MockState = MockState( @@ -18,9 +19,12 @@ proc createMockState*(): MockState = ), ) -proc checkAllUnsubscribed*(this: MockState) = +proc checkAllUnsubscribed*(s: MockState) = check: - this.events.nodesFound.listeners == 0 - this.events.newNodesDiscovered.listeners == 0 - this.events.dhtNodeCheck.listeners == 0 - this.events.nodesExpired.listeners == 0 + s.events.nodesFound.listeners == 0 + s.events.newNodesDiscovered.listeners == 0 + s.events.dhtNodeCheck.listeners == 0 + s.events.nodesExpired.listeners == 0 + +method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} = + s.stepper = step diff --git a/tests/codexcrawler/testcomponents.nim b/tests/codexcrawler/testcomponents.nim index 4226e95..1b9a6b7 100644 --- a/tests/codexcrawler/testcomponents.nim +++ b/tests/codexcrawler/testcomponents.nim @@ -1,5 +1,6 @@ import ./components/testnodestore import ./components/testdhtmetrics import ./components/testtodolist +import ./components/testtimetracker {.warning[UnusedImport]: off.} diff --git a/tests/codexcrawler/teststate.nim b/tests/codexcrawler/teststate.nim index 3a1493e..4ea6bb5 100644 --- a/tests/codexcrawler/teststate.nim +++ b/tests/codexcrawler/teststate.nim @@ -3,14 +3,24 @@ import pkg/questionable/results import pkg/asynctest/chronos/unittest import ../../codexcrawler/state -import ./mockstate +import ../../codexcrawler/config +import ../../codexcrawler/types +import ../../codexcrawler/utils/asyncdataevent suite "State": var state: State setup: - # The behavior we're testing is the same for the mock - state = createMockState() + state = State( + status: ApplicationStatus.Running, + config: Config(), + events: Events( + nodesFound: newAsyncDataEvent[seq[Nid]](), + newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), + dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), + nodesExpired: newAsyncDataEvent[seq[Nid]](), + ), + ) test "whileRunning": var counter = 0