From a714c19814f7519e4ddc42776493c5183f0bbbe5 Mon Sep 17 00:00:00 2001 From: thatben Date: Thu, 13 Feb 2025 10:49:50 +0100 Subject: [PATCH] Extracts out clock object --- codexcrawler/components/nodestore.nim | 36 +++++++++++++++---- codexcrawler/components/timetracker.nim | 11 +++--- codexcrawler/config.nim | 8 ++--- codexcrawler/installer.nim | 7 ++-- codexcrawler/services/clock.nim | 10 ++++++ .../codexcrawler/components/testnodestore.nim | 25 ++++++++++++- .../components/testtimetracker.nim | 15 +++++--- tests/codexcrawler/mocks/mockclock.nim | 10 ++++++ 8 files changed, 98 insertions(+), 24 deletions(-) create mode 100644 codexcrawler/services/clock.nim create mode 100644 tests/codexcrawler/mocks/mockclock.nim diff --git a/codexcrawler/components/nodestore.nim b/codexcrawler/components/nodestore.nim index de5b6b9..99ac8da 100644 --- a/codexcrawler/components/nodestore.nim +++ b/codexcrawler/components/nodestore.nim @@ -11,6 +11,7 @@ import ../component import ../state import ../utils/datastoreutils import ../utils/asyncdataevent +import ../services/clock const nodestoreName = "nodestore" @@ -27,7 +28,9 @@ type NodeStore* = ref object of Component state: State store: TypedDatastore - sub: AsyncDataEventSubscription + clock: Clock + subFound: AsyncDataEventSubscription + subCheck: AsyncDataEventSubscription proc `$`*(entry: NodeEntry): string = $entry.id & ":" & $entry.lastVisit @@ -91,6 +94,18 @@ proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = ?await s.fireNewNodesDiscovered(newNodes) return success() +proc updateLastVisit(s: NodeStore, nid: Nid): Future[?!void] {.async.} = + without key =? Key.init(nodestoreName / $nid), err: + return failure(err) + + without var entry =? (await get[NodeEntry](s.store, key)), err: + return failure(err) + + entry.lastVisit = s.clock.now() + + ?await s.store.put(key, entry) + return success() + method iterateAll*( s: NodeStore, onNode: OnNodeEntry ): Future[?!void] {.async: (raises: []), base.} = @@ -118,19 +133,26 @@ method start*(s: NodeStore): Future[?!void] {.async.} = proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = return await s.processFoundNodes(nids) - s.sub = s.state.events.nodesFound.subscribe(onNodesFound) + proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = + return await s.updateLastVisit(event.id) + + s.subFound = s.state.events.nodesFound.subscribe(onNodesFound) + s.subCheck = s.state.events.dhtNodeCheck.subscribe(onCheck) return success() method stop*(s: NodeStore): Future[?!void] {.async.} = - await s.state.events.nodesFound.unsubscribe(s.sub) + await s.state.events.nodesFound.unsubscribe(s.subFound) + await s.state.events.dhtNodeCheck.unsubscribe(s.subCheck) return success() -proc new*(T: type NodeStore, state: State, store: TypedDatastore): NodeStore = - NodeStore(state: state, store: store) +proc new*( + T: type NodeStore, state: State, store: TypedDatastore, clock: Clock +): NodeStore = + NodeStore(state: state, store: store, clock: clock) -proc createNodeStore*(state: State): ?!NodeStore = +proc createNodeStore*(state: State, clock: Clock): ?!NodeStore = without ds =? createTypedDatastore(state.config.dataDir / "nodestore"), err: error "Failed to create typed datastore for node store", err = err.msg return failure(err) - return success(NodeStore.new(state, ds)) + return success(NodeStore.new(state, ds, clock)) diff --git a/codexcrawler/components/timetracker.nim b/codexcrawler/components/timetracker.nim index 89448dd..2be8034 100644 --- a/codexcrawler/components/timetracker.nim +++ b/codexcrawler/components/timetracker.nim @@ -4,6 +4,7 @@ import pkg/questionable/results import ./nodestore import ../services/dht +import ../services/clock import ../component import ../state import ../types @@ -16,10 +17,10 @@ type TimeTracker* = ref object of Component state: State nodestore: NodeStore dht: Dht + clock: Clock proc checkForExpiredNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} = - let expiry = - (Moment.now().epochSeconds - (t.state.config.revisitDelayMins * 60)).uint64 + let expiry = t.clock.now() - (t.state.config.revisitDelayMins * 60).uint64 var expired = newSeq[Nid]() proc checkNode(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} = @@ -54,7 +55,7 @@ method start*(t: TimeTracker): Future[?!void] {.async.} = proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} = await t.step() - var delay = t.state.config.revisitDelayMins div 100 + var delay = t.state.config.revisitDelayMins if delay < 1: delay = 1 @@ -65,6 +66,6 @@ method stop*(t: TimeTracker): Future[?!void] {.async.} = return success() proc new*( - T: type TimeTracker, state: State, nodestore: NodeStore, dht: Dht + T: type TimeTracker, state: State, nodestore: NodeStore, dht: Dht, clock: Clock ): TimeTracker = - TimeTracker(state: state, nodestore: nodestore, dht: dht) + TimeTracker(state: state, nodestore: nodestore, dht: dht, clock: clock) diff --git a/codexcrawler/config.nim b/codexcrawler/config.nim index 71e183b..4187938 100644 --- a/codexcrawler/config.nim +++ b/codexcrawler/config.nim @@ -13,15 +13,15 @@ Usage: codexcrawler [--logLevel=] [--publicIp=] [--metricsAddress=] [--metricsPort=

] [--dataDir=

] [--discoveryPort=

] [--bootNodes=] [--stepDelay=] [--revisitDelay=] Options: - --logLevel= Sets log level [default: INFO] - --publicIp= Public IP address where this instance is reachable. + --logLevel= Sets log level [default: TRACE] + --publicIp= Public IP address where this instance is reachable. [default: 62.45.154.249] --metricsAddress= Listen address of the metrics server [default: 0.0.0.0] --metricsPort=

Listen HTTP port of the metrics server [default: 8008] --dataDir=

Directory for storing data [default: crawler_data] --discoveryPort=

Port used for DHT [default: 8090] --bootNodes= Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs] - --stepDelay= Delay in milliseconds per crawl step [default: 1000] - --revisitDelay= Delay in minutes after which a node can be revisited [default: 10] (24h) + --stepDelay= Delay in milliseconds per crawl step [default: 100] + --revisitDelay= Delay in minutes after which a node can be revisited [default: 1] (24h) """ import strutils diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim index fb5fa59..8b952aa 100644 --- a/codexcrawler/installer.nim +++ b/codexcrawler/installer.nim @@ -2,6 +2,7 @@ import pkg/chronos import pkg/questionable/results import ./state +import ./services/clock import ./services/metrics import ./services/dht import ./component @@ -14,10 +15,12 @@ import ./components/todolist proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = var components: seq[Component] = newSeq[Component]() + let clock = createClock() + without dht =? (await createDht(state)), err: return failure(err) - without nodeStore =? createNodeStore(state), err: + without nodeStore =? createNodeStore(state, clock), err: return failure(err) let @@ -31,7 +34,7 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = components.add(todoList) components.add(nodeStore) components.add(Crawler.new(state, dht, todoList)) - components.add(TimeTracker.new(state, nodeStore, dht)) + components.add(TimeTracker.new(state, nodeStore, dht, clock)) components.add(dhtMetrics) return success(components) diff --git a/codexcrawler/services/clock.nim b/codexcrawler/services/clock.nim new file mode 100644 index 0000000..ddd80ea --- /dev/null +++ b/codexcrawler/services/clock.nim @@ -0,0 +1,10 @@ +import std/times + +type Clock* = ref object of RootObj + +method now*(clock: Clock): uint64 {.base, gcsafe, raises: [].} = + let now = times.now().utc + now.toTime().toUnix().uint64 + +proc createClock*(): Clock = + Clock() diff --git a/tests/codexcrawler/components/testnodestore.nim b/tests/codexcrawler/components/testnodestore.nim index 79f9a38..b9687be 100644 --- a/tests/codexcrawler/components/testnodestore.nim +++ b/tests/codexcrawler/components/testnodestore.nim @@ -8,7 +8,9 @@ import ../../../codexcrawler/components/nodestore import ../../../codexcrawler/utils/datastoreutils import ../../../codexcrawler/utils/asyncdataevent import ../../../codexcrawler/types +import ../../../codexcrawler/state import ../mocks/mockstate +import ../mocks/mockclock import ../helpers suite "Nodestore": @@ -19,13 +21,15 @@ suite "Nodestore": var ds: TypedDatastore state: MockState + clock: MockClock store: NodeStore setup: ds = createTypedDatastore(dsPath).tryGet() state = createMockState() + clock = createMockClock() - store = NodeStore.new(state, ds) + store = NodeStore.new(state, ds, clock) (await store.start()).tryGet() @@ -121,3 +125,22 @@ suite "Nodestore": nid1 in iterNodes nid2 in iterNodes nid3 in iterNodes + + test "dhtNodeCheck event should update lastVisit": + let + nid = genNid() + expectedKey = Key.init(nodestoreName / $nid).tryGet() + + clock.setNow = 123456789.uint64 + + (await state.events.nodesFound.fire(@[nid])).tryGet() + + let originalEntry = (await get[NodeEntry](ds, expectedKey)).tryGet() + check: + originalEntry.lastVisit == 0 + + (await state.events.dhtNodeCheck.fire(DhtNodeCheckEventData(id: nid, isOk: true))).tryGet() + + let updatedEntry = (await get[NodeEntry](ds, expectedKey)).tryGet() + check: + clock.setNow == updatedEntry.lastVisit diff --git a/tests/codexcrawler/components/testtimetracker.nim b/tests/codexcrawler/components/testtimetracker.nim index 34873c5..59cf180 100644 --- a/tests/codexcrawler/components/testtimetracker.nim +++ b/tests/codexcrawler/components/testtimetracker.nim @@ -10,13 +10,17 @@ import ../../../codexcrawler/state import ../mocks/mockstate import ../mocks/mocknodestore import ../mocks/mockdht +import ../mocks/mockclock import ../helpers suite "TimeTracker": + let now = 123456789.uint64 + var nid: Nid state: MockState store: MockNodeStore + clock: MockClock dht: MockDht time: TimeTracker expiredNodesReceived: seq[Nid] @@ -26,8 +30,11 @@ suite "TimeTracker": nid = genNid() state = createMockState() store = createMockNodeStore() + clock = createMockClock() dht = createMockDht() + clock.setNow = now + # Subscribe to nodesExpired event expiredNodesReceived = newSeq[Nid]() proc onExpired(nids: seq[Nid]): Future[?!void] {.async.} = @@ -38,7 +45,7 @@ suite "TimeTracker": state.config.revisitDelayMins = 22 - time = TimeTracker.new(state, store, dht) + time = TimeTracker.new(state, store, dht, clock) (await time.start()).tryGet() @@ -57,8 +64,7 @@ suite "TimeTracker": test "onStep fires nodesExpired event for expired nodes": let - expiredTimestamp = - (Moment.now().epochSeconds - ((1 + state.config.revisitDelayMins) * 60)).uint64 + expiredTimestamp = now - ((1 + state.config.revisitDelayMins) * 60).uint64 expiredNodeId = createNodeInStore(expiredTimestamp) await onStep() @@ -68,8 +74,7 @@ suite "TimeTracker": test "onStep does not fire nodesExpired event for nodes that are recent": let - recentTimestamp = - (Moment.now().epochSeconds - ((state.config.revisitDelayMins - 1) * 60)).uint64 + recentTimestamp = now - ((state.config.revisitDelayMins - 1) * 60).uint64 recentNodeId = createNodeInStore(recentTimestamp) await onStep() diff --git a/tests/codexcrawler/mocks/mockclock.nim b/tests/codexcrawler/mocks/mockclock.nim new file mode 100644 index 0000000..5c5cf44 --- /dev/null +++ b/tests/codexcrawler/mocks/mockclock.nim @@ -0,0 +1,10 @@ +import ../../../codexcrawler/services/clock + +type MockClock* = ref object of Clock + setNow*: uint64 + +method now*(clock: MockClock): uint64 {.raises: [].} = + clock.setNow + +proc createMockClock*(): MockClock = + MockClock()