Extracts out clock object

This commit is contained in:
thatben 2025-02-13 10:49:50 +01:00
parent 4ffbf1f421
commit a714c19814
No known key found for this signature in database
GPG Key ID: 62C543548433D43E
8 changed files with 98 additions and 24 deletions

View File

@ -11,6 +11,7 @@ import ../component
import ../state
import ../utils/datastoreutils
import ../utils/asyncdataevent
import ../services/clock
const nodestoreName = "nodestore"
@ -27,7 +28,9 @@ type
NodeStore* = ref object of Component
state: State
store: TypedDatastore
sub: AsyncDataEventSubscription
clock: Clock
subFound: AsyncDataEventSubscription
subCheck: AsyncDataEventSubscription
proc `$`*(entry: NodeEntry): string =
$entry.id & ":" & $entry.lastVisit
@ -91,6 +94,18 @@ proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
?await s.fireNewNodesDiscovered(newNodes)
return success()
proc updateLastVisit(s: NodeStore, nid: Nid): Future[?!void] {.async.} =
without key =? Key.init(nodestoreName / $nid), err:
return failure(err)
without var entry =? (await get[NodeEntry](s.store, key)), err:
return failure(err)
entry.lastVisit = s.clock.now()
?await s.store.put(key, entry)
return success()
method iterateAll*(
s: NodeStore, onNode: OnNodeEntry
): Future[?!void] {.async: (raises: []), base.} =
@ -118,19 +133,26 @@ method start*(s: NodeStore): Future[?!void] {.async.} =
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
return await s.processFoundNodes(nids)
s.sub = s.state.events.nodesFound.subscribe(onNodesFound)
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
return await s.updateLastVisit(event.id)
s.subFound = s.state.events.nodesFound.subscribe(onNodesFound)
s.subCheck = s.state.events.dhtNodeCheck.subscribe(onCheck)
return success()
method stop*(s: NodeStore): Future[?!void] {.async.} =
await s.state.events.nodesFound.unsubscribe(s.sub)
await s.state.events.nodesFound.unsubscribe(s.subFound)
await s.state.events.dhtNodeCheck.unsubscribe(s.subCheck)
return success()
proc new*(T: type NodeStore, state: State, store: TypedDatastore): NodeStore =
NodeStore(state: state, store: store)
proc new*(
T: type NodeStore, state: State, store: TypedDatastore, clock: Clock
): NodeStore =
NodeStore(state: state, store: store, clock: clock)
proc createNodeStore*(state: State): ?!NodeStore =
proc createNodeStore*(state: State, clock: Clock): ?!NodeStore =
without ds =? createTypedDatastore(state.config.dataDir / "nodestore"), err:
error "Failed to create typed datastore for node store", err = err.msg
return failure(err)
return success(NodeStore.new(state, ds))
return success(NodeStore.new(state, ds, clock))

View File

@ -4,6 +4,7 @@ import pkg/questionable/results
import ./nodestore
import ../services/dht
import ../services/clock
import ../component
import ../state
import ../types
@ -16,10 +17,10 @@ type TimeTracker* = ref object of Component
state: State
nodestore: NodeStore
dht: Dht
clock: Clock
proc checkForExpiredNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
let expiry =
(Moment.now().epochSeconds - (t.state.config.revisitDelayMins * 60)).uint64
let expiry = t.clock.now() - (t.state.config.revisitDelayMins * 60).uint64
var expired = newSeq[Nid]()
proc checkNode(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
@ -54,7 +55,7 @@ method start*(t: TimeTracker): Future[?!void] {.async.} =
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
await t.step()
var delay = t.state.config.revisitDelayMins div 100
var delay = t.state.config.revisitDelayMins
if delay < 1:
delay = 1
@ -65,6 +66,6 @@ method stop*(t: TimeTracker): Future[?!void] {.async.} =
return success()
proc new*(
T: type TimeTracker, state: State, nodestore: NodeStore, dht: Dht
T: type TimeTracker, state: State, nodestore: NodeStore, dht: Dht, clock: Clock
): TimeTracker =
TimeTracker(state: state, nodestore: nodestore, dht: dht)
TimeTracker(state: state, nodestore: nodestore, dht: dht, clock: clock)

View File

@ -13,15 +13,15 @@ Usage:
codexcrawler [--logLevel=<l>] [--publicIp=<a>] [--metricsAddress=<ip>] [--metricsPort=<p>] [--dataDir=<dir>] [--discoveryPort=<p>] [--bootNodes=<n>] [--stepDelay=<ms>] [--revisitDelay=<m>]
Options:
--logLevel=<l> Sets log level [default: INFO]
--publicIp=<a> Public IP address where this instance is reachable.
--logLevel=<l> Sets log level [default: TRACE]
--publicIp=<a> Public IP address where this instance is reachable. [default: 62.45.154.249]
--metricsAddress=<ip> Listen address of the metrics server [default: 0.0.0.0]
--metricsPort=<p> Listen HTTP port of the metrics server [default: 8008]
--dataDir=<dir> Directory for storing data [default: crawler_data]
--discoveryPort=<p> Port used for DHT [default: 8090]
--bootNodes=<n> Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs]
--stepDelay=<ms> Delay in milliseconds per crawl step [default: 1000]
--revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 10] (24h)
--stepDelay=<ms> Delay in milliseconds per crawl step [default: 100]
--revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 1] (24h)
"""
import strutils

View File

@ -2,6 +2,7 @@ import pkg/chronos
import pkg/questionable/results
import ./state
import ./services/clock
import ./services/metrics
import ./services/dht
import ./component
@ -14,10 +15,12 @@ import ./components/todolist
proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
var components: seq[Component] = newSeq[Component]()
let clock = createClock()
without dht =? (await createDht(state)), err:
return failure(err)
without nodeStore =? createNodeStore(state), err:
without nodeStore =? createNodeStore(state, clock), err:
return failure(err)
let
@ -31,7 +34,7 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
components.add(todoList)
components.add(nodeStore)
components.add(Crawler.new(state, dht, todoList))
components.add(TimeTracker.new(state, nodeStore, dht))
components.add(TimeTracker.new(state, nodeStore, dht, clock))
components.add(dhtMetrics)
return success(components)

View File

@ -0,0 +1,10 @@
import std/times
type Clock* = ref object of RootObj
method now*(clock: Clock): uint64 {.base, gcsafe, raises: [].} =
let now = times.now().utc
now.toTime().toUnix().uint64
proc createClock*(): Clock =
Clock()

View File

@ -8,7 +8,9 @@ import ../../../codexcrawler/components/nodestore
import ../../../codexcrawler/utils/datastoreutils
import ../../../codexcrawler/utils/asyncdataevent
import ../../../codexcrawler/types
import ../../../codexcrawler/state
import ../mocks/mockstate
import ../mocks/mockclock
import ../helpers
suite "Nodestore":
@ -19,13 +21,15 @@ suite "Nodestore":
var
ds: TypedDatastore
state: MockState
clock: MockClock
store: NodeStore
setup:
ds = createTypedDatastore(dsPath).tryGet()
state = createMockState()
clock = createMockClock()
store = NodeStore.new(state, ds)
store = NodeStore.new(state, ds, clock)
(await store.start()).tryGet()
@ -121,3 +125,22 @@ suite "Nodestore":
nid1 in iterNodes
nid2 in iterNodes
nid3 in iterNodes
test "dhtNodeCheck event should update lastVisit":
let
nid = genNid()
expectedKey = Key.init(nodestoreName / $nid).tryGet()
clock.setNow = 123456789.uint64
(await state.events.nodesFound.fire(@[nid])).tryGet()
let originalEntry = (await get[NodeEntry](ds, expectedKey)).tryGet()
check:
originalEntry.lastVisit == 0
(await state.events.dhtNodeCheck.fire(DhtNodeCheckEventData(id: nid, isOk: true))).tryGet()
let updatedEntry = (await get[NodeEntry](ds, expectedKey)).tryGet()
check:
clock.setNow == updatedEntry.lastVisit

View File

@ -10,13 +10,17 @@ import ../../../codexcrawler/state
import ../mocks/mockstate
import ../mocks/mocknodestore
import ../mocks/mockdht
import ../mocks/mockclock
import ../helpers
suite "TimeTracker":
let now = 123456789.uint64
var
nid: Nid
state: MockState
store: MockNodeStore
clock: MockClock
dht: MockDht
time: TimeTracker
expiredNodesReceived: seq[Nid]
@ -26,8 +30,11 @@ suite "TimeTracker":
nid = genNid()
state = createMockState()
store = createMockNodeStore()
clock = createMockClock()
dht = createMockDht()
clock.setNow = now
# Subscribe to nodesExpired event
expiredNodesReceived = newSeq[Nid]()
proc onExpired(nids: seq[Nid]): Future[?!void] {.async.} =
@ -38,7 +45,7 @@ suite "TimeTracker":
state.config.revisitDelayMins = 22
time = TimeTracker.new(state, store, dht)
time = TimeTracker.new(state, store, dht, clock)
(await time.start()).tryGet()
@ -57,8 +64,7 @@ suite "TimeTracker":
test "onStep fires nodesExpired event for expired nodes":
let
expiredTimestamp =
(Moment.now().epochSeconds - ((1 + state.config.revisitDelayMins) * 60)).uint64
expiredTimestamp = now - ((1 + state.config.revisitDelayMins) * 60).uint64
expiredNodeId = createNodeInStore(expiredTimestamp)
await onStep()
@ -68,8 +74,7 @@ suite "TimeTracker":
test "onStep does not fire nodesExpired event for nodes that are recent":
let
recentTimestamp =
(Moment.now().epochSeconds - ((state.config.revisitDelayMins - 1) * 60)).uint64
recentTimestamp = now - ((state.config.revisitDelayMins - 1) * 60).uint64
recentNodeId = createNodeInStore(recentTimestamp)
await onStep()

View File

@ -0,0 +1,10 @@
import ../../../codexcrawler/services/clock
type MockClock* = ref object of Clock
setNow*: uint64
method now*(clock: MockClock): uint64 {.raises: [].} =
clock.setNow
proc createMockClock*(): MockClock =
MockClock()