sets up dht-metrics component

This commit is contained in:
Ben 2025-02-11 14:02:30 +01:00
parent c1f25f10cc
commit a2e9d4fac8
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
8 changed files with 229 additions and 37 deletions

View File

@ -17,10 +17,6 @@ import ./state
import ./component
import ./types
declareGauge(todoNodesGauge, "DHT nodes to be visited")
declareGauge(okNodesGauge, "DHT nodes successfully contacted")
declareGauge(nokNodesGauge, "DHT nodes failed to contact")
type
ApplicationStatus* {.pure.} = enum
Stopped
@ -99,7 +95,6 @@ proc run*(app: Application) =
if not existsDir(app.config.dataDir):
createDir(app.config.dataDir)
setupMetrics(app.config.metricsAddress, app.config.metricsPort)
info "Metrics endpoint initialized"
info "Starting application"

View File

@ -0,0 +1,51 @@
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ./dht
import ../list
import ../state
import ../component
import ../types
import ../utils/asyncdataevent
import ../metrics
logScope:
topics = "dhtmetrics"
type DhtMetrics* = ref object of Component
state: State
ok: List
nok: List
method start*(d: DhtMetrics): Future[?!void] {.async.} =
info "Starting DhtMetrics..."
return success()
method stop*(d: DhtMetrics): Future[?!void] {.async.} =
return success()
proc new*(
T: type DhtMetrics,
state: State,
okList: List,
nokList: List
): DhtMetrics =
DhtMetrics(
state: state,
ok: okList,
nok: nokList
)
proc createDhtMetrics*(state: State): ?!DhtMetrics =
without okList =? createList(state.config.dataDir, "dhtok"), err:
return failure(err)
without nokList =? createList(state.config.dataDir, "dhtnok"), err:
return failure(err)
success(DhtMetrics.new(
state,
okList,
nokList
))

View File

@ -2,6 +2,7 @@ import pkg/chronos
import pkg/questionable/results
import ./state
import ./metrics
import ./component
import ./components/dht
import ./components/crawler
@ -17,6 +18,8 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
without nodeStore =? createNodeStore(state), err:
return failure(err)
let metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort)
components.add(nodeStore)
components.add(dht)
components.add(Crawler.new(dht, state.config))

View File

@ -1,6 +1,6 @@
import std/os
import pkg/chronos
import pkg/chronicles
import pkg/metrics
import pkg/datastore
import pkg/datastore/typedds
import pkg/stew/byteutils
@ -14,18 +14,16 @@ import std/sequtils
import std/os
import ./types
import ./utils/datastoreutils
logScope:
topics = "list"
type
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
List* = ref object
List* = ref object of RootObj
name: string
store: TypedDatastore
items: HashSet[Nid]
onMetric: OnUpdateMetric
emptySignal: ?Future[void]
proc encode(s: Nid): seq[byte] =
@ -42,7 +40,7 @@ proc saveItem(this: List, item: Nid): Future[?!void] {.async.} =
?await this.store.put(itemKey, item)
return success()
proc load*(this: List): Future[?!void] {.async.} =
method load*(this: List): Future[?!void] {.async, base.} =
without queryKey =? Key.init(this.name), err:
return failure(err)
without iter =? (await query[Nid](this.store, Query.init(queryKey))), err:
@ -56,24 +54,17 @@ proc load*(this: List): Future[?!void] {.async.} =
if value > 0:
this.items.incl(value)
this.onMetric(this.items.len.int64)
info "Loaded list", name = this.name, items = this.items.len
return success()
proc new*(
_: type List, name: string, store: TypedDatastore, onMetric: OnUpdateMetric
): List =
List(name: name, store: store, onMetric: onMetric)
proc contains*(this: List, nid: Nid): bool =
this.items.anyIt(it == nid)
proc add*(this: List, nid: Nid): Future[?!void] {.async.} =
method add*(this: List, nid: Nid): Future[?!void] {.async, base.} =
if this.contains(nid):
return success()
this.items.incl(nid)
this.onMetric(this.items.len.int64)
if err =? (await this.saveItem(nid)).errorOption:
return failure(err)
@ -85,7 +76,7 @@ proc add*(this: List, nid: Nid): Future[?!void] {.async.} =
return success()
proc remove*(this: List, nid: Nid): Future[?!void] {.async.} =
method remove*(this: List, nid: Nid): Future[?!void] {.async, base.} =
if this.items.len < 1:
return failure(this.name & "List is empty.")
@ -93,23 +84,17 @@ proc remove*(this: List, nid: Nid): Future[?!void] {.async.} =
without itemKey =? Key.init(this.name / $nid), err:
return failure(err)
?await this.store.delete(itemKey)
this.onMetric(this.items.len.int64)
return success()
proc pop*(this: List): Future[?!Nid] {.async.} =
if this.items.len < 1:
trace "List is empty. Waiting for new items...", name = this.name
let signal = newFuture[void]("list.emptySignal")
this.emptySignal = some(signal)
await signal.wait(1.hours)
if this.items.len < 1:
return failure(this.name & "List is empty.")
let item = this.items.pop()
if err =? (await this.remove(item)).errorOption:
return failure(err)
return success(item)
proc len*(this: List): int =
this.items.len
proc new*(
_: type List, name: string, store: TypedDatastore
): List =
List(name: name, store: store)
proc createList*(dataDir: string, name: string): ?!List =
without store =? createTypedDatastore(dataDir / name), err:
return failure(err)
success(List.new(name, store))

View File

@ -2,7 +2,19 @@ import pkg/chronicles
import pkg/metrics
import pkg/metrics/chronos_httpserver
proc setupMetrics*(metricsAddress: IpAddress, metricsPort: Port) =
declareGauge(todoNodesGauge, "DHT nodes to be visited")
declareGauge(okNodesGauge, "DHT nodes successfully contacted")
declareGauge(nokNodesGauge, "DHT nodes failed to contact")
type
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
Metrics* = ref object
todoNodes: OnUpdateMetric
okNodes: OnUpdateMetric
nokNodes: OnUpdateMetric
proc startServer(metricsAddress: IpAddress, metricsPort: Port) =
let metricsAddress = metricsAddress
notice "Starting metrics HTTP server",
url = "http://" & $metricsAddress & ":" & $metricsPort & "/metrics"
@ -12,3 +24,32 @@ proc setupMetrics*(metricsAddress: IpAddress, metricsPort: Port) =
raiseAssert exc.msg
except Exception as exc:
raiseAssert exc.msg # TODO fix metrics
method setTodoNodes*(m: Metrics, value: int) {.base.} =
m.todoNodes(value.int64)
method setOkNodes*(m: Metrics, value: int) {.base.} =
m.okNodes(value.int64)
method setNokNodes*(m: Metrics, value: int) {.base.} =
m.nokNodes(value.int64)
proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
startServer(metricsAddress, metricsPort)
# We can't extract this into a function because gauges cannot be passed as argument.
# The use of global state in nim-metrics is not pleasant.
proc onTodo(value: int64) =
todoNodesGauge.set(value)
proc onOk(value: int64) =
okNodesGauge.set(value)
proc onNok(value: int64) =
nokNodesGauge.set(value)
return Metrics(
todoNodes: onTodo,
okNodes: onOk,
nokNodes: onNok
)

View File

@ -0,0 +1,79 @@
import std/os
import pkg/chronos
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import pkg/datastore/typedds
import ../../../codexcrawler/components/dhtmetrics
import ../../../codexcrawler/utils/asyncdataevent
import ../../../codexcrawler/types
import ../../../codexcrawler/state
import ../mockstate
import ../mocklist
import ../helpers
suite "DhtMetrics":
var
nid: Nid
state: MockState
okList: MockList
nokList: MockList
dhtmetrics: DhtMetrics
setup:
nid = genNid()
state = createMockState()
okList = createMockList()
nokList = createMockList()
dhtmetrics = DhtMetrics.new(
state,
okList,
nokList
)
(await dhtmetrics.start()).tryGet()
teardown:
(await dhtmetrics.stop()).tryGet()
state.checkAllUnsubscribed()
proc fireDhtNodeCheckEvent(isOk: bool) {.async.} =
let
event = DhtNodeCheckEventData(
id: nid,
isOk: isOk
)
(await state.events.dhtNodeCheck.fire(event)).tryGet()
test "dhtmetrics start should load both lists":
(await dhtmetrics.start()).tryGet()
check:
okList.loadCalled
nokList.loadCalled
test "dhtNodeCheck event should add node to okList if check is successful":
await fireDhtNodeCheckEvent(true)
check:
nid in okList.added
test "dhtNodeCheck event should add node to nokList if check has failed":
await fireDhtNodeCheckEvent(false)
check:
nid in nokList.added
test "dhtNodeCheck event should remove node from nokList if check is successful":
await fireDhtNodeCheckEvent(true)
check:
nid in nokList.removed
test "dhtNodeCheck event should remove node from okList if check has failed":
await fireDhtNodeCheckEvent(false)
check:
nid in okList.removed

View File

@ -0,0 +1,37 @@
import pkg/chronos
import pkg/questionable/results
import ../../codexcrawler/types
import ../../codexcrawler/list
type
MockList* = ref object of List
loadCalled*: bool
added*: seq[Nid]
addSuccess*: bool
removed*: seq[Nid]
removeSuccess*: bool
method load*(this: MockList): Future[?!void] {.async.} =
this.loadCalled = true
method add*(this: MockList, nid: Nid): Future[?!void] {.async.} =
this.added.add(nid)
if this.addSuccess:
return success()
return failure("test failure")
method remove*(this: MockList, nid: Nid): Future[?!void] {.async.} =
this.removed.add(nid)
if this.removeSuccess:
return success()
return failure("test failure")
proc createMockList*(): MockList =
MockList(
loadCalled: false,
added: newSeq[Nid](),
addSuccess: true,
removed: newSeq[Nid](),
removeSuccess: true
)

View File

@ -1,3 +1,4 @@
import ./components/testnodestore
import ./components/testdhtmetrics
{.warning[UnusedImport]: off.}