From a2e9d4fac888ecec1df064db24ff377a3758861a Mon Sep 17 00:00:00 2001 From: Ben Date: Tue, 11 Feb 2025 14:02:30 +0100 Subject: [PATCH] sets up dht-metrics component --- codexcrawler/application.nim | 5 -- codexcrawler/components/dhtmetrics.nim | 51 ++++++++++++ codexcrawler/installer.nim | 3 + codexcrawler/list.nim | 47 ++++------- codexcrawler/metrics.nim | 43 +++++++++- .../components/testdhtmetrics.nim | 79 +++++++++++++++++++ tests/codexcrawler/mocklist.nim | 37 +++++++++ tests/codexcrawler/testcomponents.nim | 1 + 8 files changed, 229 insertions(+), 37 deletions(-) create mode 100644 codexcrawler/components/dhtmetrics.nim create mode 100644 tests/codexcrawler/components/testdhtmetrics.nim create mode 100644 tests/codexcrawler/mocklist.nim diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index d894aa6..e207544 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -17,10 +17,6 @@ import ./state import ./component import ./types -declareGauge(todoNodesGauge, "DHT nodes to be visited") -declareGauge(okNodesGauge, "DHT nodes successfully contacted") -declareGauge(nokNodesGauge, "DHT nodes failed to contact") - type ApplicationStatus* {.pure.} = enum Stopped @@ -99,7 +95,6 @@ proc run*(app: Application) = if not existsDir(app.config.dataDir): createDir(app.config.dataDir) - setupMetrics(app.config.metricsAddress, app.config.metricsPort) info "Metrics endpoint initialized" info "Starting application" diff --git a/codexcrawler/components/dhtmetrics.nim b/codexcrawler/components/dhtmetrics.nim new file mode 100644 index 0000000..8bffa25 --- /dev/null +++ b/codexcrawler/components/dhtmetrics.nim @@ -0,0 +1,51 @@ +import pkg/chronicles +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import ./dht +import ../list +import ../state +import ../component +import ../types +import ../utils/asyncdataevent +import ../metrics + +logScope: + topics = "dhtmetrics" + +type DhtMetrics* = ref object of Component + state: State + ok: List + nok: List + +method start*(d: DhtMetrics): Future[?!void] {.async.} = + info "Starting DhtMetrics..." + return success() + +method stop*(d: DhtMetrics): Future[?!void] {.async.} = + return success() + +proc new*( + T: type DhtMetrics, + state: State, + okList: List, + nokList: List +): DhtMetrics = + DhtMetrics( + state: state, + ok: okList, + nok: nokList + ) + +proc createDhtMetrics*(state: State): ?!DhtMetrics = + without okList =? createList(state.config.dataDir, "dhtok"), err: + return failure(err) + without nokList =? createList(state.config.dataDir, "dhtnok"), err: + return failure(err) + + success(DhtMetrics.new( + state, + okList, + nokList + )) diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim index 3642b8f..48c2e9e 100644 --- a/codexcrawler/installer.nim +++ b/codexcrawler/installer.nim @@ -2,6 +2,7 @@ import pkg/chronos import pkg/questionable/results import ./state +import ./metrics import ./component import ./components/dht import ./components/crawler @@ -17,6 +18,8 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = without nodeStore =? createNodeStore(state), err: return failure(err) + let metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort) + components.add(nodeStore) components.add(dht) components.add(Crawler.new(dht, state.config)) diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index f9b4a7e..fdd1f5e 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -1,6 +1,6 @@ +import std/os import pkg/chronos import pkg/chronicles -import pkg/metrics import pkg/datastore import pkg/datastore/typedds import pkg/stew/byteutils @@ -14,18 +14,16 @@ import std/sequtils import std/os import ./types +import ./utils/datastoreutils logScope: topics = "list" type - OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].} - - List* = ref object + List* = ref object of RootObj name: string store: TypedDatastore items: HashSet[Nid] - onMetric: OnUpdateMetric emptySignal: ?Future[void] proc encode(s: Nid): seq[byte] = @@ -42,7 +40,7 @@ proc saveItem(this: List, item: Nid): Future[?!void] {.async.} = ?await this.store.put(itemKey, item) return success() -proc load*(this: List): Future[?!void] {.async.} = +method load*(this: List): Future[?!void] {.async, base.} = without queryKey =? Key.init(this.name), err: return failure(err) without iter =? (await query[Nid](this.store, Query.init(queryKey))), err: @@ -56,24 +54,17 @@ proc load*(this: List): Future[?!void] {.async.} = if value > 0: this.items.incl(value) - this.onMetric(this.items.len.int64) info "Loaded list", name = this.name, items = this.items.len return success() -proc new*( - _: type List, name: string, store: TypedDatastore, onMetric: OnUpdateMetric -): List = - List(name: name, store: store, onMetric: onMetric) - proc contains*(this: List, nid: Nid): bool = this.items.anyIt(it == nid) -proc add*(this: List, nid: Nid): Future[?!void] {.async.} = +method add*(this: List, nid: Nid): Future[?!void] {.async, base.} = if this.contains(nid): return success() this.items.incl(nid) - this.onMetric(this.items.len.int64) if err =? (await this.saveItem(nid)).errorOption: return failure(err) @@ -85,7 +76,7 @@ proc add*(this: List, nid: Nid): Future[?!void] {.async.} = return success() -proc remove*(this: List, nid: Nid): Future[?!void] {.async.} = +method remove*(this: List, nid: Nid): Future[?!void] {.async, base.} = if this.items.len < 1: return failure(this.name & "List is empty.") @@ -93,23 +84,17 @@ proc remove*(this: List, nid: Nid): Future[?!void] {.async.} = without itemKey =? Key.init(this.name / $nid), err: return failure(err) ?await this.store.delete(itemKey) - this.onMetric(this.items.len.int64) return success() -proc pop*(this: List): Future[?!Nid] {.async.} = - if this.items.len < 1: - trace "List is empty. Waiting for new items...", name = this.name - let signal = newFuture[void]("list.emptySignal") - this.emptySignal = some(signal) - await signal.wait(1.hours) - if this.items.len < 1: - return failure(this.name & "List is empty.") - - let item = this.items.pop() - - if err =? (await this.remove(item)).errorOption: - return failure(err) - return success(item) - proc len*(this: List): int = this.items.len + +proc new*( + _: type List, name: string, store: TypedDatastore +): List = + List(name: name, store: store) + +proc createList*(dataDir: string, name: string): ?!List = + without store =? createTypedDatastore(dataDir / name), err: + return failure(err) + success(List.new(name, store)) diff --git a/codexcrawler/metrics.nim b/codexcrawler/metrics.nim index 7f6b54e..ea713c8 100644 --- a/codexcrawler/metrics.nim +++ b/codexcrawler/metrics.nim @@ -2,7 +2,19 @@ import pkg/chronicles import pkg/metrics import pkg/metrics/chronos_httpserver -proc setupMetrics*(metricsAddress: IpAddress, metricsPort: Port) = +declareGauge(todoNodesGauge, "DHT nodes to be visited") +declareGauge(okNodesGauge, "DHT nodes successfully contacted") +declareGauge(nokNodesGauge, "DHT nodes failed to contact") + +type + OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].} + + Metrics* = ref object + todoNodes: OnUpdateMetric + okNodes: OnUpdateMetric + nokNodes: OnUpdateMetric + +proc startServer(metricsAddress: IpAddress, metricsPort: Port) = let metricsAddress = metricsAddress notice "Starting metrics HTTP server", url = "http://" & $metricsAddress & ":" & $metricsPort & "/metrics" @@ -12,3 +24,32 @@ proc setupMetrics*(metricsAddress: IpAddress, metricsPort: Port) = raiseAssert exc.msg except Exception as exc: raiseAssert exc.msg # TODO fix metrics + +method setTodoNodes*(m: Metrics, value: int) {.base.} = + m.todoNodes(value.int64) + +method setOkNodes*(m: Metrics, value: int) {.base.} = + m.okNodes(value.int64) + +method setNokNodes*(m: Metrics, value: int) {.base.} = + m.nokNodes(value.int64) + +proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics = + startServer(metricsAddress, metricsPort) + + # We can't extract this into a function because gauges cannot be passed as argument. + # The use of global state in nim-metrics is not pleasant. + proc onTodo(value: int64) = + todoNodesGauge.set(value) + + proc onOk(value: int64) = + okNodesGauge.set(value) + + proc onNok(value: int64) = + nokNodesGauge.set(value) + + return Metrics( + todoNodes: onTodo, + okNodes: onOk, + nokNodes: onNok + ) diff --git a/tests/codexcrawler/components/testdhtmetrics.nim b/tests/codexcrawler/components/testdhtmetrics.nim new file mode 100644 index 0000000..0c0d613 --- /dev/null +++ b/tests/codexcrawler/components/testdhtmetrics.nim @@ -0,0 +1,79 @@ +import std/os +import pkg/chronos +import pkg/questionable/results +import pkg/asynctest/chronos/unittest +import pkg/datastore/typedds + +import ../../../codexcrawler/components/dhtmetrics +import ../../../codexcrawler/utils/asyncdataevent +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mockstate +import ../mocklist +import ../helpers + +suite "DhtMetrics": + var + nid: Nid + state: MockState + okList: MockList + nokList: MockList + dhtmetrics: DhtMetrics + + setup: + nid = genNid() + state = createMockState() + okList = createMockList() + nokList = createMockList() + + dhtmetrics = DhtMetrics.new( + state, + okList, + nokList + ) + + (await dhtmetrics.start()).tryGet() + + teardown: + (await dhtmetrics.stop()).tryGet() + state.checkAllUnsubscribed() + + proc fireDhtNodeCheckEvent(isOk: bool) {.async.} = + let + event = DhtNodeCheckEventData( + id: nid, + isOk: isOk + ) + + (await state.events.dhtNodeCheck.fire(event)).tryGet() + + test "dhtmetrics start should load both lists": + (await dhtmetrics.start()).tryGet() + + check: + okList.loadCalled + nokList.loadCalled + + test "dhtNodeCheck event should add node to okList if check is successful": + await fireDhtNodeCheckEvent(true) + + check: + nid in okList.added + + test "dhtNodeCheck event should add node to nokList if check has failed": + await fireDhtNodeCheckEvent(false) + + check: + nid in nokList.added + + test "dhtNodeCheck event should remove node from nokList if check is successful": + await fireDhtNodeCheckEvent(true) + + check: + nid in nokList.removed + + test "dhtNodeCheck event should remove node from okList if check has failed": + await fireDhtNodeCheckEvent(false) + + check: + nid in okList.removed diff --git a/tests/codexcrawler/mocklist.nim b/tests/codexcrawler/mocklist.nim new file mode 100644 index 0000000..001e9f1 --- /dev/null +++ b/tests/codexcrawler/mocklist.nim @@ -0,0 +1,37 @@ +import pkg/chronos +import pkg/questionable/results + +import ../../codexcrawler/types +import ../../codexcrawler/list + +type + MockList* = ref object of List + loadCalled*: bool + added*: seq[Nid] + addSuccess*: bool + removed*: seq[Nid] + removeSuccess*: bool + +method load*(this: MockList): Future[?!void] {.async.} = + this.loadCalled = true + +method add*(this: MockList, nid: Nid): Future[?!void] {.async.} = + this.added.add(nid) + if this.addSuccess: + return success() + return failure("test failure") + +method remove*(this: MockList, nid: Nid): Future[?!void] {.async.} = + this.removed.add(nid) + if this.removeSuccess: + return success() + return failure("test failure") + +proc createMockList*(): MockList = + MockList( + loadCalled: false, + added: newSeq[Nid](), + addSuccess: true, + removed: newSeq[Nid](), + removeSuccess: true + ) diff --git a/tests/codexcrawler/testcomponents.nim b/tests/codexcrawler/testcomponents.nim index 1363c5a..f013640 100644 --- a/tests/codexcrawler/testcomponents.nim +++ b/tests/codexcrawler/testcomponents.nim @@ -1,3 +1,4 @@ import ./components/testnodestore +import ./components/testdhtmetrics {.warning[UnusedImport]: off.}