Implements timetracker and tests

This commit is contained in:
Ben 2025-02-11 16:31:23 +01:00
parent 4fff78903d
commit da1d82a4cd
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
10 changed files with 182 additions and 82 deletions

View File

@ -19,7 +19,7 @@ type
id*: Nid
lastVisit*: uint64
OnNodeEntry = proc(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.}
OnNodeEntry* = proc(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.}
NodeStore* = ref object of Component
state: State
@ -87,19 +87,25 @@ proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
?await s.fireNewNodesDiscovered(newNodes)
return success()
proc iterateAll*(s: NodeStore, onNode: OnNodeEntry): Future[?!void] {.async.} =
method iterateAll*(
s: NodeStore, onNode: OnNodeEntry
): Future[?!void] {.async: (raises: []), base.} =
without queryKey =? Key.init(nodestoreName), err:
return failure(err)
without iter =? (await query[NodeEntry](s.store, Query.init(queryKey))), err:
return failure(err)
while not iter.finished:
without item =? (await iter.next()), err:
return failure(err)
without value =? item.value, err:
try:
without iter =? (await query[NodeEntry](s.store, Query.init(queryKey))), err:
return failure(err)
?await onNode(value)
while not iter.finished:
without item =? (await iter.next()), err:
return failure(err)
without value =? item.value, err:
return failure(err)
?await onNode(value)
except CatchableError as exc:
return failure(exc.msg)
return success()
method start*(s: NodeStore): Future[?!void] {.async.} =

View File

@ -1,79 +1,49 @@
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ./dht
import ../list
import ../config
import ./nodestore
import ../component
import ../state
import ../types
import ../utils/asyncdataevent
logScope:
topics = "timetracker"
type TimeTracker* = ref object of Component
config: Config
todoNodes: List
okNodes: List
nokNodes: List
workerDelay: int
state: State
nodestore: NodeStore
# # proc processList(t: TimeTracker, list: List, expiry: uint64) {.async.} =
# # var toMove = newSeq[NodeEntry]()
# # proc onItem(item: NodeEntry) =
# # if item.lastVisit < expiry:
# # toMove.add(item)
proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
let expiry =
(Moment.now().epochSeconds - (t.state.config.revisitDelayMins * 60)).uint64
# # await list.iterateAll(onItem)
var expired = newSeq[Nid]()
proc checkNode(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
if item.lastVisit < expiry:
expired.add(item.id)
return success()
# # if toMove.len > 0:
# # trace "expired node, moving to todo", nodes = $toMove.len
# # for item in toMove:
# # if err =? (await t.todoNodes.add(item)).errorOption:
# # error "Failed to add expired node to todo list", err = err.msg
# # return
# # if err =? (await list.remove(item)).errorOption:
# # error "Failed to remove expired node to source list", err = err.msg
# proc step(t: TimeTracker) {.async.} =
# let expiry = (Moment.now().epochSeconds - (t.config.revisitDelayMins * 60)).uint64
# await t.processList(t.okNodes, expiry)
# await t.processList(t.nokNodes, expiry)
proc worker(t: TimeTracker) {.async.} =
try:
while true:
# await t.step()
await sleepAsync(t.workerDelay.minutes)
except Exception as exc:
error "Exception in timetracker worker", msg = exc.msg
quit QuitFailure
?await t.nodestore.iterateAll(checkNode)
?await t.state.events.nodesExpired.fire(expired)
return success()
method start*(t: TimeTracker): Future[?!void] {.async.} =
info "Starting timetracker...", revisitDelayMins = $t.workerDelay
asyncSpawn t.worker()
info "Starting timetracker..."
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
await t.step()
var delay = t.state.config.revisitDelayMins div 100
if delay < 1:
delay = 1
await t.state.whileRunning(onStep, delay.minutes)
return success()
method stop*(t: TimeTracker): Future[?!void] {.async.} =
return success()
proc new*(
T: type TimeTracker,
# todoNodes: List,
# okNodes: List,
# nokNodes: List,
config: Config,
): TimeTracker =
var delay = config.revisitDelayMins div 10
if delay < 1:
delay = 1
TimeTracker(
# todoNodes: todoNodes,
# okNodes: okNodes,
# nokNodes: nokNodes,
config: config,
workerDelay: delay,
)
proc new*(T: type TimeTracker, state: State, nodestore: NodeStore): TimeTracker =
TimeTracker(state: state, nodestore: nodestore)

View File

@ -8,6 +8,7 @@ import ./components/dht
import ./components/crawler
import ./components/timetracker
import ./components/nodestore
import ./components/dhtmetrics
proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
var components: seq[Component] = newSeq[Component]()
@ -20,8 +21,12 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
let metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort)
without dhtMetrics =? createDhtMetrics(state, metrics), err:
return failure(err)
components.add(nodeStore)
components.add(dht)
components.add(Crawler.new(dht, state.config))
components.add(TimeTracker.new(state.config))
components.add(TimeTracker.new(state, nodeStore))
components.add(dhtMetrics)
return success(components)

View File

@ -10,7 +10,7 @@ logScope:
topics = "state"
type
OnStep = proc(): Future[?!void] {.async: (raises: []), gcsafe.}
OnStep* = proc(): Future[?!void] {.async: (raises: []), gcsafe.}
DhtNodeCheckEventData* = object
id*: Nid
@ -32,7 +32,7 @@ type
config*: Config
events*: Events
proc whileRunning*(s: State, step: OnStep, delay: Duration) {.async.} =
method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} =
proc worker(): Future[void] {.async.} =
while s.status == ApplicationStatus.Running:
if err =? (await step()).errorOption:

View File

@ -55,18 +55,26 @@ proc subscribe*[T](
event.subscriptions.add(subscription)
subscription
proc fire*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} =
proc fire*[T](
event: AsyncDataEvent[T], data: T
): Future[?!void] {.async: (raises: []).} =
event.queue.emit(data.some)
var toUnsubscribe = newSeq[AsyncDataEventSubscription]()
for sub in event.subscriptions:
await sub.fireEvent.wait()
try:
await sub.fireEvent.wait()
except CancelledError:
discard
if err =? sub.lastResult.errorOption:
return failure(err)
if sub.delayedUnsubscribe:
toUnsubscribe.add(sub)
for sub in toUnsubscribe:
await event.unsubscribe(sub)
try:
await event.unsubscribe(sub)
except CatchableError as exc:
return failure(exc.msg)
success()

View File

@ -0,0 +1,72 @@
import pkg/chronos
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import ../../../codexcrawler/components/timetracker
import ../../../codexcrawler/components/nodestore
import ../../../codexcrawler/utils/asyncdataevent
import ../../../codexcrawler/types
import ../../../codexcrawler/state
import ../mockstate
import ../mocknodestore
import ../helpers
suite "TimeTracker":
var
nid: Nid
state: MockState
store: MockNodeStore
time: TimeTracker
expiredNodesReceived: seq[Nid]
sub: AsyncDataEventSubscription
setup:
nid = genNid()
state = createMockState()
store = createMockNodeStore()
# Subscribe to nodesExpired event
expiredNodesReceived = newSeq[Nid]()
proc onExpired(nids: seq[Nid]): Future[?!void] {.async.} =
expiredNodesReceived = nids
return success()
sub = state.events.nodesExpired.subscribe(onExpired)
state.config.revisitDelayMins = 22
time = TimeTracker.new(state, store)
(await time.start()).tryGet()
teardown:
(await time.stop()).tryGet()
await state.events.nodesExpired.unsubscribe(sub)
state.checkAllUnsubscribed()
proc createNodeInStore(lastVisit: uint64): Nid =
let entry = NodeEntry(id: genNid(), lastVisit: lastVisit)
store.nodesToIterate.add(entry)
return entry.id
test "onStep fires nodesExpired event for expired nodes":
let
expiredTimestamp =
(Moment.now().epochSeconds - ((1 + state.config.revisitDelayMins) * 60)).uint64
expiredNodeId = createNodeInStore(expiredTimestamp)
(await state.stepper()).tryGet()
check:
expiredNodeId in expiredNodesReceived
test "onStep does not fire nodesExpired event for nodes that are recent":
let
recentTimestamp =
(Moment.now().epochSeconds - ((state.config.revisitDelayMins - 1) * 60)).uint64
recentNodeId = createNodeInStore(recentTimestamp)
(await state.stepper()).tryGet()
check:
recentNodeId notin expiredNodesReceived

View File

@ -0,0 +1,24 @@
import std/sequtils
import pkg/questionable/results
import pkg/chronos
import ../../codexcrawler/components/nodestore
type MockNodeStore* = ref object of NodeStore
nodesToIterate*: seq[NodeEntry]
method iterateAll*(
s: MockNodeStore, onNode: OnNodeEntry
): Future[?!void] {.async: (raises: []).} =
for node in s.nodesToIterate:
?await onNode(node)
return success()
method start*(s: MockNodeStore): Future[?!void] {.async.} =
return success()
method stop*(s: MockNodeStore): Future[?!void] {.async.} =
return success()
proc createMockNodeStore*(): MockNodeStore =
MockNodeStore(nodesToIterate: newSeq[NodeEntry]())

View File

@ -5,6 +5,7 @@ import ../../codexcrawler/types
import ../../codexcrawler/config
type MockState* = ref object of State
stepper*: OnStep
proc createMockState*(): MockState =
MockState(
@ -18,9 +19,12 @@ proc createMockState*(): MockState =
),
)
proc checkAllUnsubscribed*(this: MockState) =
proc checkAllUnsubscribed*(s: MockState) =
check:
this.events.nodesFound.listeners == 0
this.events.newNodesDiscovered.listeners == 0
this.events.dhtNodeCheck.listeners == 0
this.events.nodesExpired.listeners == 0
s.events.nodesFound.listeners == 0
s.events.newNodesDiscovered.listeners == 0
s.events.dhtNodeCheck.listeners == 0
s.events.nodesExpired.listeners == 0
method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} =
s.stepper = step

View File

@ -1,5 +1,6 @@
import ./components/testnodestore
import ./components/testdhtmetrics
import ./components/testtodolist
import ./components/testtimetracker
{.warning[UnusedImport]: off.}

View File

@ -3,14 +3,24 @@ import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import ../../codexcrawler/state
import ./mockstate
import ../../codexcrawler/config
import ../../codexcrawler/types
import ../../codexcrawler/utils/asyncdataevent
suite "State":
var state: State
setup:
# The behavior we're testing is the same for the mock
state = createMockState()
state = State(
status: ApplicationStatus.Running,
config: Config(),
events: Events(
nodesFound: newAsyncDataEvent[seq[Nid]](),
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](),
nodesExpired: newAsyncDataEvent[seq[Nid]](),
),
)
test "whileRunning":
var counter = 0