mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-03 22:13:09 +00:00
implements dht-metrics
This commit is contained in:
parent
a2e9d4fac8
commit
3f697acefc
@ -6,10 +6,9 @@ import pkg/questionable/results
|
|||||||
import ./dht
|
import ./dht
|
||||||
import ../list
|
import ../list
|
||||||
import ../state
|
import ../state
|
||||||
import ../component
|
|
||||||
import ../types
|
|
||||||
import ../utils/asyncdataevent
|
|
||||||
import ../metrics
|
import ../metrics
|
||||||
|
import ../component
|
||||||
|
import ../utils/asyncdataevent
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "dhtmetrics"
|
topics = "dhtmetrics"
|
||||||
@ -18,34 +17,48 @@ type DhtMetrics* = ref object of Component
|
|||||||
state: State
|
state: State
|
||||||
ok: List
|
ok: List
|
||||||
nok: List
|
nok: List
|
||||||
|
sub: AsyncDataEventSubscription
|
||||||
|
metrics: Metrics
|
||||||
|
|
||||||
|
proc handleCheckEvent(
|
||||||
|
d: DhtMetrics, event: DhtNodeCheckEventData
|
||||||
|
): Future[?!void] {.async.} =
|
||||||
|
if event.isOk:
|
||||||
|
?await d.ok.add(event.id)
|
||||||
|
?await d.nok.remove(event.id)
|
||||||
|
else:
|
||||||
|
?await d.ok.remove(event.id)
|
||||||
|
?await d.nok.add(event.id)
|
||||||
|
|
||||||
|
d.metrics.setOkNodes(d.ok.len)
|
||||||
|
d.metrics.setNokNodes(d.nok.len)
|
||||||
|
|
||||||
|
return success()
|
||||||
|
|
||||||
method start*(d: DhtMetrics): Future[?!void] {.async.} =
|
method start*(d: DhtMetrics): Future[?!void] {.async.} =
|
||||||
info "Starting DhtMetrics..."
|
info "Starting DhtMetrics..."
|
||||||
|
?await d.ok.load()
|
||||||
|
?await d.nok.load()
|
||||||
|
|
||||||
|
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
|
||||||
|
await d.handleCheckEvent(event)
|
||||||
|
|
||||||
|
d.sub = d.state.events.dhtNodeCheck.subscribe(onCheck)
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method stop*(d: DhtMetrics): Future[?!void] {.async.} =
|
method stop*(d: DhtMetrics): Future[?!void] {.async.} =
|
||||||
|
await d.state.events.dhtNodeCheck.unsubscribe(d.sub)
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: type DhtMetrics,
|
T: type DhtMetrics, state: State, okList: List, nokList: List, metrics: Metrics
|
||||||
state: State,
|
|
||||||
okList: List,
|
|
||||||
nokList: List
|
|
||||||
): DhtMetrics =
|
): DhtMetrics =
|
||||||
DhtMetrics(
|
DhtMetrics(state: state, ok: okList, nok: nokList, metrics: metrics)
|
||||||
state: state,
|
|
||||||
ok: okList,
|
|
||||||
nok: nokList
|
|
||||||
)
|
|
||||||
|
|
||||||
proc createDhtMetrics*(state: State): ?!DhtMetrics =
|
proc createDhtMetrics*(state: State, metrics: Metrics): ?!DhtMetrics =
|
||||||
without okList =? createList(state.config.dataDir, "dhtok"), err:
|
without okList =? createList(state.config.dataDir, "dhtok"), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
without nokList =? createList(state.config.dataDir, "dhtnok"), err:
|
without nokList =? createList(state.config.dataDir, "dhtnok"), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
success(DhtMetrics.new(
|
success(DhtMetrics.new(state, okList, nokList, metrics))
|
||||||
state,
|
|
||||||
okList,
|
|
||||||
nokList
|
|
||||||
))
|
|
||||||
|
|||||||
@ -19,12 +19,11 @@ import ./utils/datastoreutils
|
|||||||
logScope:
|
logScope:
|
||||||
topics = "list"
|
topics = "list"
|
||||||
|
|
||||||
type
|
type List* = ref object of RootObj
|
||||||
List* = ref object of RootObj
|
name: string
|
||||||
name: string
|
store: TypedDatastore
|
||||||
store: TypedDatastore
|
items: HashSet[Nid]
|
||||||
items: HashSet[Nid]
|
emptySignal: ?Future[void]
|
||||||
emptySignal: ?Future[void]
|
|
||||||
|
|
||||||
proc encode(s: Nid): seq[byte] =
|
proc encode(s: Nid): seq[byte] =
|
||||||
s.toBytes()
|
s.toBytes()
|
||||||
@ -86,12 +85,10 @@ method remove*(this: List, nid: Nid): Future[?!void] {.async, base.} =
|
|||||||
?await this.store.delete(itemKey)
|
?await this.store.delete(itemKey)
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
proc len*(this: List): int =
|
method len*(this: List): int {.base, gcsafe, raises: [].} =
|
||||||
this.items.len
|
this.items.len
|
||||||
|
|
||||||
proc new*(
|
proc new*(_: type List, name: string, store: TypedDatastore): List =
|
||||||
_: type List, name: string, store: TypedDatastore
|
|
||||||
): List =
|
|
||||||
List(name: name, store: store)
|
List(name: name, store: store)
|
||||||
|
|
||||||
proc createList*(dataDir: string, name: string): ?!List =
|
proc createList*(dataDir: string, name: string): ?!List =
|
||||||
|
|||||||
@ -9,7 +9,7 @@ declareGauge(nokNodesGauge, "DHT nodes failed to contact")
|
|||||||
type
|
type
|
||||||
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
|
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
|
||||||
|
|
||||||
Metrics* = ref object
|
Metrics* = ref object of RootObj
|
||||||
todoNodes: OnUpdateMetric
|
todoNodes: OnUpdateMetric
|
||||||
okNodes: OnUpdateMetric
|
okNodes: OnUpdateMetric
|
||||||
nokNodes: OnUpdateMetric
|
nokNodes: OnUpdateMetric
|
||||||
@ -25,13 +25,13 @@ proc startServer(metricsAddress: IpAddress, metricsPort: Port) =
|
|||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
raiseAssert exc.msg # TODO fix metrics
|
raiseAssert exc.msg # TODO fix metrics
|
||||||
|
|
||||||
method setTodoNodes*(m: Metrics, value: int) {.base.} =
|
method setTodoNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
||||||
m.todoNodes(value.int64)
|
m.todoNodes(value.int64)
|
||||||
|
|
||||||
method setOkNodes*(m: Metrics, value: int) {.base.} =
|
method setOkNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
||||||
m.okNodes(value.int64)
|
m.okNodes(value.int64)
|
||||||
|
|
||||||
method setNokNodes*(m: Metrics, value: int) {.base.} =
|
method setNokNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
||||||
m.nokNodes(value.int64)
|
m.nokNodes(value.int64)
|
||||||
|
|
||||||
proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
|
proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
|
||||||
@ -47,9 +47,5 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
|
|||||||
|
|
||||||
proc onNok(value: int64) =
|
proc onNok(value: int64) =
|
||||||
nokNodesGauge.set(value)
|
nokNodesGauge.set(value)
|
||||||
|
|
||||||
return Metrics(
|
return Metrics(todoNodes: onTodo, okNodes: onOk, nokNodes: onNok)
|
||||||
todoNodes: onTodo,
|
|
||||||
okNodes: onOk,
|
|
||||||
nokNodes: onNok
|
|
||||||
)
|
|
||||||
|
|||||||
@ -1,8 +1,6 @@
|
|||||||
import std/os
|
|
||||||
import pkg/chronos
|
import pkg/chronos
|
||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
import pkg/asynctest/chronos/unittest
|
import pkg/asynctest/chronos/unittest
|
||||||
import pkg/datastore/typedds
|
|
||||||
|
|
||||||
import ../../../codexcrawler/components/dhtmetrics
|
import ../../../codexcrawler/components/dhtmetrics
|
||||||
import ../../../codexcrawler/utils/asyncdataevent
|
import ../../../codexcrawler/utils/asyncdataevent
|
||||||
@ -10,6 +8,7 @@ import ../../../codexcrawler/types
|
|||||||
import ../../../codexcrawler/state
|
import ../../../codexcrawler/state
|
||||||
import ../mockstate
|
import ../mockstate
|
||||||
import ../mocklist
|
import ../mocklist
|
||||||
|
import ../mockmetrics
|
||||||
import ../helpers
|
import ../helpers
|
||||||
|
|
||||||
suite "DhtMetrics":
|
suite "DhtMetrics":
|
||||||
@ -18,6 +17,7 @@ suite "DhtMetrics":
|
|||||||
state: MockState
|
state: MockState
|
||||||
okList: MockList
|
okList: MockList
|
||||||
nokList: MockList
|
nokList: MockList
|
||||||
|
metrics: MockMetrics
|
||||||
dhtmetrics: DhtMetrics
|
dhtmetrics: DhtMetrics
|
||||||
|
|
||||||
setup:
|
setup:
|
||||||
@ -25,12 +25,9 @@ suite "DhtMetrics":
|
|||||||
state = createMockState()
|
state = createMockState()
|
||||||
okList = createMockList()
|
okList = createMockList()
|
||||||
nokList = createMockList()
|
nokList = createMockList()
|
||||||
|
metrics = createMockMetrics()
|
||||||
|
|
||||||
dhtmetrics = DhtMetrics.new(
|
dhtmetrics = DhtMetrics.new(state, okList, nokList, metrics)
|
||||||
state,
|
|
||||||
okList,
|
|
||||||
nokList
|
|
||||||
)
|
|
||||||
|
|
||||||
(await dhtmetrics.start()).tryGet()
|
(await dhtmetrics.start()).tryGet()
|
||||||
|
|
||||||
@ -39,17 +36,11 @@ suite "DhtMetrics":
|
|||||||
state.checkAllUnsubscribed()
|
state.checkAllUnsubscribed()
|
||||||
|
|
||||||
proc fireDhtNodeCheckEvent(isOk: bool) {.async.} =
|
proc fireDhtNodeCheckEvent(isOk: bool) {.async.} =
|
||||||
let
|
let event = DhtNodeCheckEventData(id: nid, isOk: isOk)
|
||||||
event = DhtNodeCheckEventData(
|
|
||||||
id: nid,
|
|
||||||
isOk: isOk
|
|
||||||
)
|
|
||||||
|
|
||||||
(await state.events.dhtNodeCheck.fire(event)).tryGet()
|
(await state.events.dhtNodeCheck.fire(event)).tryGet()
|
||||||
|
|
||||||
test "dhtmetrics start should load both lists":
|
test "dhtmetrics start should load both lists":
|
||||||
(await dhtmetrics.start()).tryGet()
|
|
||||||
|
|
||||||
check:
|
check:
|
||||||
okList.loadCalled
|
okList.loadCalled
|
||||||
nokList.loadCalled
|
nokList.loadCalled
|
||||||
@ -58,13 +49,13 @@ suite "DhtMetrics":
|
|||||||
await fireDhtNodeCheckEvent(true)
|
await fireDhtNodeCheckEvent(true)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
nid in okList.added
|
nid in okList.added
|
||||||
|
|
||||||
test "dhtNodeCheck event should add node to nokList if check has failed":
|
test "dhtNodeCheck event should add node to nokList if check has failed":
|
||||||
await fireDhtNodeCheckEvent(false)
|
await fireDhtNodeCheckEvent(false)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
nid in nokList.added
|
nid in nokList.added
|
||||||
|
|
||||||
test "dhtNodeCheck event should remove node from nokList if check is successful":
|
test "dhtNodeCheck event should remove node from nokList if check is successful":
|
||||||
await fireDhtNodeCheckEvent(true)
|
await fireDhtNodeCheckEvent(true)
|
||||||
@ -77,3 +68,23 @@ suite "DhtMetrics":
|
|||||||
|
|
||||||
check:
|
check:
|
||||||
nid in okList.removed
|
nid in okList.removed
|
||||||
|
|
||||||
|
test "dhtNodeCheck event should set okList length as dht-ok metric":
|
||||||
|
let length = 123
|
||||||
|
|
||||||
|
okList.length = length
|
||||||
|
|
||||||
|
await fireDhtNodeCheckEvent(true)
|
||||||
|
|
||||||
|
check:
|
||||||
|
metrics.ok == length
|
||||||
|
|
||||||
|
test "dhtNodeCheck event should set nokList length as dht-nok metric":
|
||||||
|
let length = 234
|
||||||
|
|
||||||
|
nokList.length = length
|
||||||
|
|
||||||
|
await fireDhtNodeCheckEvent(true)
|
||||||
|
|
||||||
|
check:
|
||||||
|
metrics.nok == length
|
||||||
|
|||||||
@ -4,16 +4,17 @@ import pkg/questionable/results
|
|||||||
import ../../codexcrawler/types
|
import ../../codexcrawler/types
|
||||||
import ../../codexcrawler/list
|
import ../../codexcrawler/list
|
||||||
|
|
||||||
type
|
type MockList* = ref object of List
|
||||||
MockList* = ref object of List
|
loadCalled*: bool
|
||||||
loadCalled*: bool
|
added*: seq[Nid]
|
||||||
added*: seq[Nid]
|
addSuccess*: bool
|
||||||
addSuccess*: bool
|
removed*: seq[Nid]
|
||||||
removed*: seq[Nid]
|
removeSuccess*: bool
|
||||||
removeSuccess*: bool
|
length*: int
|
||||||
|
|
||||||
method load*(this: MockList): Future[?!void] {.async.} =
|
method load*(this: MockList): Future[?!void] {.async.} =
|
||||||
this.loadCalled = true
|
this.loadCalled = true
|
||||||
|
return success()
|
||||||
|
|
||||||
method add*(this: MockList, nid: Nid): Future[?!void] {.async.} =
|
method add*(this: MockList, nid: Nid): Future[?!void] {.async.} =
|
||||||
this.added.add(nid)
|
this.added.add(nid)
|
||||||
@ -27,11 +28,15 @@ method remove*(this: MockList, nid: Nid): Future[?!void] {.async.} =
|
|||||||
return success()
|
return success()
|
||||||
return failure("test failure")
|
return failure("test failure")
|
||||||
|
|
||||||
|
method len*(this: MockList): int =
|
||||||
|
return this.length
|
||||||
|
|
||||||
proc createMockList*(): MockList =
|
proc createMockList*(): MockList =
|
||||||
MockList(
|
MockList(
|
||||||
loadCalled: false,
|
loadCalled: false,
|
||||||
added: newSeq[Nid](),
|
added: newSeq[Nid](),
|
||||||
addSuccess: true,
|
addSuccess: true,
|
||||||
removed: newSeq[Nid](),
|
removed: newSeq[Nid](),
|
||||||
removeSuccess: true
|
removeSuccess: true,
|
||||||
|
length: 0,
|
||||||
)
|
)
|
||||||
|
|||||||
18
tests/codexcrawler/mockmetrics.nim
Normal file
18
tests/codexcrawler/mockmetrics.nim
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
import ../../codexcrawler/metrics
|
||||||
|
|
||||||
|
type MockMetrics* = ref object of Metrics
|
||||||
|
todo*: int
|
||||||
|
ok*: int
|
||||||
|
nok*: int
|
||||||
|
|
||||||
|
method setTodoNodes*(m: MockMetrics, value: int) =
|
||||||
|
m.todo = value
|
||||||
|
|
||||||
|
method setOkNodes*(m: MockMetrics, value: int) =
|
||||||
|
m.ok = value
|
||||||
|
|
||||||
|
method setNokNodes*(m: MockMetrics, value: int) =
|
||||||
|
m.nok = value
|
||||||
|
|
||||||
|
proc createMockMetrics*(): MockMetrics =
|
||||||
|
MockMetrics()
|
||||||
Loading…
x
Reference in New Issue
Block a user