mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-02 13:33:08 +00:00
Implements chainmetrics that publishes slotFill events count
This commit is contained in:
parent
b801e68f1d
commit
002bdc7314
45
codexcrawler/components/chainmetrics.nim
Normal file
45
codexcrawler/components/chainmetrics.nim
Normal file
@ -0,0 +1,45 @@
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ../state
|
||||
import ../services/metrics
|
||||
import ../services/marketplace
|
||||
import ../component
|
||||
|
||||
logScope:
|
||||
topics = "ChainMetrics"
|
||||
|
||||
type ChainMetrics* = ref object of Component
|
||||
state: State
|
||||
metrics: Metrics
|
||||
marketplace: MarketplaceService
|
||||
|
||||
proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} =
|
||||
without slotFills =? (await c.marketplace.getRecentSlotFillEvents()), err:
|
||||
trace "Unable to get recent slotFill events from chain", err = err.msg
|
||||
return success() # We don't propagate this error.
|
||||
# The call is allowed to fail and the app should continue as normal.
|
||||
|
||||
c.metrics.setSlotFill(slotFills.len)
|
||||
return success()
|
||||
|
||||
method start*(c: ChainMetrics): Future[?!void] {.async.} =
|
||||
info "Starting..."
|
||||
|
||||
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
|
||||
return await c.step()
|
||||
|
||||
if c.state.config.marketplaceEnable:
|
||||
await c.state.whileRunning(onStep, 10.minutes)
|
||||
|
||||
return success()
|
||||
|
||||
method stop*(c: ChainMetrics): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
proc new*(
|
||||
T: type ChainMetrics, state: State, metrics: Metrics, marketplace: MarketplaceService
|
||||
): ChainMetrics =
|
||||
ChainMetrics(state: state, metrics: metrics, marketplace: marketplace)
|
||||
@ -6,6 +6,8 @@ declareGauge(todoNodesGauge, "DHT nodes to be visited")
|
||||
declareGauge(okNodesGauge, "DHT nodes successfully contacted")
|
||||
declareGauge(nokNodesGauge, "DHT nodes failed to contact")
|
||||
|
||||
declareGauge(slotFillGauge, "Marketplace recent slots filled")
|
||||
|
||||
type
|
||||
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
|
||||
|
||||
@ -13,6 +15,7 @@ type
|
||||
todoNodes: OnUpdateMetric
|
||||
okNodes: OnUpdateMetric
|
||||
nokNodes: OnUpdateMetric
|
||||
onSlotFill: OnUpdateMetric
|
||||
|
||||
proc startServer(metricsAddress: IpAddress, metricsPort: Port) =
|
||||
let metricsAddress = metricsAddress
|
||||
@ -34,6 +37,9 @@ method setOkNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
||||
method setNokNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
||||
m.nokNodes(value.int64)
|
||||
|
||||
method setSlotFill*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
||||
m.onSlotFill(value.int64)
|
||||
|
||||
proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
|
||||
startServer(metricsAddress, metricsPort)
|
||||
|
||||
@ -47,5 +53,8 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
|
||||
|
||||
proc onNok(value: int64) =
|
||||
nokNodesGauge.set(value)
|
||||
|
||||
proc onSlotFill(value: int64) =
|
||||
slotFillGauge.set(value)
|
||||
|
||||
return Metrics(todoNodes: onTodo, okNodes: onOk, nokNodes: onNok)
|
||||
return Metrics(todoNodes: onTodo, okNodes: onOk, nokNodes: onNok, onSlotFill: onSlotFill)
|
||||
|
||||
90
tests/codexcrawler/components/testchainmetrics.nim
Normal file
90
tests/codexcrawler/components/testchainmetrics.nim
Normal file
@ -0,0 +1,90 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/asynctest/chronos/unittest
|
||||
import std/sequtils
|
||||
|
||||
import ../../../codexcrawler/components/chainmetrics
|
||||
import ../../../codexcrawler/services/marketplace/market
|
||||
import ../../../codexcrawler/types
|
||||
import ../../../codexcrawler/state
|
||||
import ../mocks/mockstate
|
||||
import ../mocks/mockmetrics
|
||||
import ../mocks/mockmarketplace
|
||||
import ../helpers
|
||||
|
||||
suite "ChainMetrics":
|
||||
var
|
||||
state: MockState
|
||||
metrics: MockMetrics
|
||||
marketplace: MockMarketplaceService
|
||||
chain: ChainMetrics
|
||||
|
||||
setup:
|
||||
state = createMockState()
|
||||
metrics = createMockMetrics()
|
||||
marketplace = createMockMarketplaceService()
|
||||
|
||||
metrics.slotFill = -1
|
||||
chain = ChainMetrics.new(state, metrics, marketplace)
|
||||
|
||||
(await chain.start()).tryGet()
|
||||
|
||||
teardown:
|
||||
(await chain.stop()).tryGet()
|
||||
state.checkAllUnsubscribed()
|
||||
|
||||
proc onStep() {.async.} =
|
||||
(await state.steppers[0]()).tryGet()
|
||||
|
||||
test "start should start stepper for 10 minutes":
|
||||
check:
|
||||
state.delays.len == 1
|
||||
state.delays[0] == 10.minutes
|
||||
|
||||
test "onStep is not activated when config.marketplaceEnable is false":
|
||||
# Recreate chainMetrics, reset mockstate:
|
||||
(await chain.stop()).tryGet()
|
||||
state.steppers = @[]
|
||||
# disable marketplace:
|
||||
state.config.marketplaceEnable = false
|
||||
(await chain.start()).tryGet()
|
||||
|
||||
check:
|
||||
state.steppers.len == 0
|
||||
|
||||
test "step should not call setSlotFill when getRecentSlotFillEvents fails":
|
||||
let testValue = -123
|
||||
metrics.slotFill = testValue
|
||||
|
||||
marketplace.recentSlotFillEventsReturn = seq[SlotFilled].failure("testfailure")
|
||||
|
||||
await onStep()
|
||||
|
||||
check:
|
||||
metrics.slotFill == testValue
|
||||
|
||||
test "step should setSlotFill to zero when getRecentSlotFillEvents returns empty seq":
|
||||
metrics.slotFill = -123
|
||||
|
||||
marketplace.recentSlotFillEventsReturn = success(newSeq[SlotFilled]())
|
||||
|
||||
await onStep()
|
||||
|
||||
check:
|
||||
metrics.slotFill == 0
|
||||
|
||||
test "step should setSlotFill to the length of seq returned from getRecentSlotFillEvents":
|
||||
let fills = @[
|
||||
SlotFilled(),
|
||||
SlotFilled(),
|
||||
SlotFilled(),
|
||||
SlotFilled()
|
||||
]
|
||||
|
||||
marketplace.recentSlotFillEventsReturn = success(fills)
|
||||
|
||||
await onStep()
|
||||
|
||||
check:
|
||||
metrics.slotFill == fills.len
|
||||
@ -1,11 +1,8 @@
|
||||
import pkg/ethers
|
||||
import pkg/questionable
|
||||
|
||||
import ./marketplace/market
|
||||
import ./marketplace/marketplace
|
||||
import ../config
|
||||
import ../component
|
||||
import ../state
|
||||
import ../../../codexcrawler/services/marketplace
|
||||
import ../../../codexcrawler/services/marketplace/market
|
||||
|
||||
logScope:
|
||||
topics = "marketplace"
|
||||
@ -14,7 +11,7 @@ type
|
||||
MockMarketplaceService* = ref object of MarketplaceService
|
||||
recentSlotFillEventsReturn*: ?!seq[SlotFilled]
|
||||
|
||||
method getRecentSlotFillEvents*(m: MarketplaceService): Future[?!seq[SlotFilled]] {.async: (raises: []).} =
|
||||
method getRecentSlotFillEvents*(m: MockMarketplaceService): Future[?!seq[SlotFilled]] {.async: (raises: []).} =
|
||||
return m.recentSlotFillEventsReturn
|
||||
|
||||
proc createMockMarketplaceService*(): MockMarketplaceService =
|
||||
|
||||
@ -4,6 +4,7 @@ type MockMetrics* = ref object of Metrics
|
||||
todo*: int
|
||||
ok*: int
|
||||
nok*: int
|
||||
slotFill*: int
|
||||
|
||||
method setTodoNodes*(m: MockMetrics, value: int) =
|
||||
m.todo = value
|
||||
@ -14,5 +15,8 @@ method setOkNodes*(m: MockMetrics, value: int) =
|
||||
method setNokNodes*(m: MockMetrics, value: int) =
|
||||
m.nok = value
|
||||
|
||||
method setSlotFill*(m: MockMetrics, value: int) =
|
||||
m.slotFill = value
|
||||
|
||||
proc createMockMetrics*(): MockMetrics =
|
||||
MockMetrics()
|
||||
|
||||
@ -22,7 +22,7 @@ method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} =
|
||||
proc createMockState*(): MockState =
|
||||
MockState(
|
||||
status: ApplicationStatus.Running,
|
||||
config: Config(dhtEnable: true),
|
||||
config: Config(dhtEnable: true, marketplaceEnable: true),
|
||||
events: Events(
|
||||
nodesFound: newAsyncDataEvent[seq[Nid]](),
|
||||
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
import ./components/testnodestore
|
||||
import ./components/testdhtmetrics
|
||||
import ./components/testtodolist
|
||||
import ./components/testtimetracker
|
||||
import ./components/testchainmetrics
|
||||
import ./components/testcrawler
|
||||
import ./components/testdhtmetrics
|
||||
import ./components/testnodestore
|
||||
import ./components/testtimetracker
|
||||
import ./components/testtodolist
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user