sets up tests for new chainmetrics

This commit is contained in:
ThatBen 2025-03-20 16:20:37 +01:00
parent 86643e7647
commit fe4f5c2991
No known key found for this signature in database
GPG Key ID: E020A7DDCD52E1AB
7 changed files with 165 additions and 86 deletions

View File

@ -6,6 +6,7 @@ import pkg/questionable/results
import ../state import ../state
import ../services/metrics import ../services/metrics
import ../services/marketplace import ../services/marketplace
import ../components/requeststore
import ../component import ../component
logScope: logScope:
@ -14,10 +15,10 @@ logScope:
type ChainMetrics* = ref object of Component type ChainMetrics* = ref object of Component
state: State state: State
metrics: Metrics metrics: Metrics
store: RequestStore
marketplace: MarketplaceService marketplace: MarketplaceService
proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} = proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} =
# replace slotFills entirely:
# iterate all requests in requestStore: # iterate all requests in requestStore:
# get state of request on chain # get state of request on chain
# if failed/canceled/error: # if failed/canceled/error:
@ -29,14 +30,6 @@ proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} =
# total num slots # total num slots
# total size of request # total size of request
# iter finished: update metrics! # iter finished: update metrics!
# without slotFills =? (await c.marketplace.getRecentSlotFillEvents()), err:
# trace "Unable to get recent slotFill events from chain", err = err.msg
# return success() # We don't propagate this error.
# # The call is allowed to fail and the app should continue as normal.
# c.metrics.setSlotFill(slotFills.len)
return success() return success()
method start*(c: ChainMetrics): Future[?!void] {.async.} = method start*(c: ChainMetrics): Future[?!void] {.async.} =
@ -57,6 +50,7 @@ proc new*(
T: type ChainMetrics, T: type ChainMetrics,
state: State, state: State,
metrics: Metrics, metrics: Metrics,
marketplace: MarketplaceService, store: RequestStore,
marketplace: MarketplaceService
): ChainMetrics = ): ChainMetrics =
ChainMetrics(state: state, metrics: metrics, marketplace: marketplace) ChainMetrics(state: state, metrics: metrics, store: store, marketplace: marketplace)

View File

@ -18,10 +18,25 @@ type
market: ?OnChainMarket market: ?OnChainMarket
clock: Clock clock: Clock
OnNewRequest* = proc(id: Rid): Future[?!void] {.async: (raises: []), gcsafe.} OnNewRequest* = proc(id: Rid): Future[?!void] {.async: (raises: []), gcsafe.}
RequestInfo* = ref object
slots*: uint64
slotSize*: uint64
proc notStarted() = proc notStarted() =
raiseAssert("MarketplaceService was called before it was started.") raiseAssert("MarketplaceService was called before it was started.")
proc fetchRequestInfo(market: OnChainMarket, rid: Rid): Future[?RequestInfo] {.async: (raises: []).} =
try:
let request = await market.getRequest(rid)
if r =? request:
return some(RequestInfo(
slots: r.ask.slots,
slotSize: r.ask.slotSize
))
except CatchableError as exc:
trace "Failed to get request info", err = exc.msg
return none(RequestInfo)
method subscribeToNewRequests*(m: MarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []), base.} = method subscribeToNewRequests*(m: MarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []), base.} =
proc resultWrapper(rid: Rid): Future[void] {.async.} = proc resultWrapper(rid: Rid): Future[void] {.async.} =
let response = await onNewRequest(rid) let response = await onNewRequest(rid)
@ -57,6 +72,21 @@ method iteratePastNewRequestEvents*(m: MarketplaceService, onNewRequest: OnNewRe
else: else:
notStarted() notStarted()
method getRequestInfo*(m: MarketplaceService, rid: Rid): Future[?RequestInfo] {.async: (raises: []), base.} =
# If the request id exists and is running, fetch the request object and return the info object.
# otherwise, return none.
if market =? m.market:
try:
let state = await market.requestState(rid)
if s =? state:
if s == RequestState.Started:
return await market.fetchRequestInfo(rid)
except CatchableError as exc:
trace "Failed to get request state", err = exc.msg
return none(RequestInfo)
else:
notStarted()
method start*(m: MarketplaceService): Future[?!void] {.async.} = method start*(m: MarketplaceService): Future[?!void] {.async.} =
let provider = JsonRpcProvider.new(m.state.config.ethProvider) let provider = JsonRpcProvider.new(m.state.config.ethProvider)
without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress): without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress):

View File

@ -6,7 +6,9 @@ declareGauge(todoNodesGauge, "DHT nodes to be visited")
declareGauge(okNodesGauge, "DHT nodes successfully contacted") declareGauge(okNodesGauge, "DHT nodes successfully contacted")
declareGauge(nokNodesGauge, "DHT nodes failed to contact") declareGauge(nokNodesGauge, "DHT nodes failed to contact")
declareGauge(slotFillGauge, "Marketplace recent slots filled") declareGauge(requestsGauge, "Marketplace active storage requests")
declareGauge(requestSlotsGauge, "Marketplace active storage request slots")
declareGauge(totalStorageSizeGauge, "Marketplace total bytes stored in active storage requests")
type type
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].} OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
@ -15,7 +17,10 @@ type
todoNodes: OnUpdateMetric todoNodes: OnUpdateMetric
okNodes: OnUpdateMetric okNodes: OnUpdateMetric
nokNodes: OnUpdateMetric nokNodes: OnUpdateMetric
onSlotFill: OnUpdateMetric
onRequests: OnUpdateMetric
onRequestSlots: OnUpdateMetric
onTotalSize: OnUpdateMetric
proc startServer(metricsAddress: IpAddress, metricsPort: Port) = proc startServer(metricsAddress: IpAddress, metricsPort: Port) =
let metricsAddress = metricsAddress let metricsAddress = metricsAddress
@ -37,8 +42,14 @@ method setOkNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
method setNokNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = method setNokNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
m.nokNodes(value.int64) m.nokNodes(value.int64)
method setSlotFill*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = method setRequests*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
m.onSlotFill(value.int64) m.onRequests(value.int64)
method setRequestSlots*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
m.onRequestSlots(value.int64)
method setTotalSize*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
m.onTotalSize(value.int64)
proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics = proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
startServer(metricsAddress, metricsPort) startServer(metricsAddress, metricsPort)
@ -54,8 +65,14 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
proc onNok(value: int64) = proc onNok(value: int64) =
nokNodesGauge.set(value) nokNodesGauge.set(value)
proc onSlotFill(value: int64) = proc onRequests(value: int64) =
slotFillGauge.set(value) requestsGauge.set(value)
proc onRequestSlots(value: int64) =
requestSlotsGauge.set(value)
proc onTotalSize(value: int64) =
totalStorageSizeGauge.set(value)
return return
Metrics(todoNodes: onTodo, okNodes: onOk, nokNodes: onNok, onSlotFill: onSlotFill) Metrics(todoNodes: onTodo, okNodes: onOk, nokNodes: onNok, onRequests: onRequests, onRequestSlots: onRequestSlots, onTotalSize: onTotalSize)

View File

@ -1,85 +1,110 @@
# import pkg/chronos import pkg/chronos
# import pkg/questionable import pkg/questionable
# import pkg/questionable/results import pkg/questionable/results
# import pkg/asynctest/chronos/unittest import pkg/asynctest/chronos/unittest
# import std/sequtils
# import ../../../codexcrawler/components/chainmetrics import ../../../codexcrawler/components/chainmetrics
# import ../../../codexcrawler/services/marketplace/market import ../../../codexcrawler/components/requeststore
# import ../../../codexcrawler/types import ../../../codexcrawler/services/marketplace
# import ../../../codexcrawler/state import ../../../codexcrawler/types
# import ../mocks/mockstate import ../mocks/mockstate
# import ../mocks/mockmetrics import ../mocks/mockmetrics
# import ../mocks/mockmarketplace import ../mocks/mockrequeststore
# import ../helpers import ../mocks/mockmarketplace
import ../helpers
# suite "ChainMetrics": suite "ChainMetrics":
# var var
# state: MockState state: MockState
# metrics: MockMetrics metrics: MockMetrics
# marketplace: MockMarketplaceService store: MockRequestStore
# chain: ChainMetrics marketplace: MockMarketplaceService
chain: ChainMetrics
# setup: setup:
# state = createMockState() state = createMockState()
# metrics = createMockMetrics() metrics = createMockMetrics()
# marketplace = createMockMarketplaceService() store = createMockRequestStore()
marketplace = createMockMarketplaceService()
# metrics.slotFill = -1 chain = ChainMetrics.new(state, metrics, store, marketplace)
# chain = ChainMetrics.new(state, metrics, marketplace)
# (await chain.start()).tryGet() (await chain.start()).tryGet()
# teardown: teardown:
# (await chain.stop()).tryGet() (await chain.stop()).tryGet()
# state.checkAllUnsubscribed() state.checkAllUnsubscribed()
# proc onStep() {.async.} = proc onStep() {.async.} =
# (await state.steppers[0]()).tryGet() (await state.steppers[0]()).tryGet()
# test "start should start stepper for 10 minutes": test "start should start stepper for 10 minutes":
# check: check:
# state.delays.len == 1 state.delays.len == 1
# state.delays[0] == 10.minutes state.delays[0] == 10.minutes
# test "onStep is not activated when config.marketplaceEnable is false": # iterate all requests in requestStore:
# # Recreate chainMetrics, reset mockstate: # get state of request on chain
# (await chain.stop()).tryGet() # if failed/canceled/error:
# state.steppers = @[] # if last-seen is old (1month?3months?)
# # disable marketplace: # delete entry
# state.config.marketplaceEnable = false # else (request is running):
# (await chain.start()).tryGet() # count:
# total running
# total num slots
# total size of request
# iter finished: update metrics!
# check: test "onStep should remove non-running requests from request store":
# state.steppers.len == 0 let rid = genRid()
store.iterateEntries.add(RequestEntry(id: rid))
# test "step should not call setSlotFill when getRecentSlotFillEvents fails": marketplace.requestInfoReturns = none(RequestInfo)
# let testValue = -123
# metrics.slotFill = testValue
# marketplace.recentSlotFillEventsReturn = seq[SlotFilled].failure("testfailure") await onStep()
# await onStep() check:
marketplace.requestInfoRid == rid
store.removeRid == rid
test "onStep should count the number of active requests":
let rid1 = genRid()
let rid2 = genRid()
store.iterateEntries.add(RequestEntry(id: rid1))
store.iterateEntries.add(RequestEntry(id: rid2))
# check: marketplace.requestInfoReturns = some(RequestInfo())
# metrics.slotFill == testValue
# test "step should setSlotFill to zero when getRecentSlotFillEvents returns empty seq": await onStep()
# metrics.slotFill = -123
# marketplace.recentSlotFillEventsReturn = success(newSeq[SlotFilled]()) check:
metrics.requests == 2
# await onStep() test "onStep should count the number of active slots":
let rid = genRid()
store.iterateEntries.add(RequestEntry(id: rid))
# check: let info = RequestInfo(
# metrics.slotFill == 0 slots: 123
)
marketplace.requestInfoReturns = some(info)
# test "step should setSlotFill to the length of seq returned from getRecentSlotFillEvents": await onStep()
# let fills = @[SlotFilled(), SlotFilled(), SlotFilled(), SlotFilled()]
# marketplace.recentSlotFillEventsReturn = success(fills) check:
metrics.slots == info.slots.int
# await onStep() test "onStep should count the total size of active slots":
let rid = genRid()
store.iterateEntries.add(RequestEntry(id: rid))
# check: let info = RequestInfo(
# metrics.slotFill == fills.len slots: 12,
slotSize: 23
)
marketplace.requestInfoReturns = some(info)
await onStep()
check:
metrics.totalSize == (info.slots * info.slotSize).int

View File

@ -1,5 +1,4 @@
import std/random import std/random
import std/sequtils
import std/typetraits import std/typetraits
import pkg/stint import pkg/stint
import pkg/stew/byteutils import pkg/stew/byteutils

View File

@ -2,7 +2,7 @@ import pkg/ethers
import pkg/questionable import pkg/questionable
import ../../../codexcrawler/services/marketplace import ../../../codexcrawler/services/marketplace
import ../../../codexcrawler/services/marketplace/market import ../../../codexcrawler/types
logScope: logScope:
topics = "marketplace" topics = "marketplace"
@ -10,6 +10,8 @@ logScope:
type MockMarketplaceService* = ref object of MarketplaceService type MockMarketplaceService* = ref object of MarketplaceService
subNewRequestsCallback*: ?OnNewRequest subNewRequestsCallback*: ?OnNewRequest
iterRequestsCallback*: ?OnNewRequest iterRequestsCallback*: ?OnNewRequest
requestInfoReturns*: ?RequestInfo
requestInfoRid*: Rid
method subscribeToNewRequests*(m: MockMarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []).} = method subscribeToNewRequests*(m: MockMarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []).} =
m.subNewRequestsCallback = some(onNewRequest) m.subNewRequestsCallback = some(onNewRequest)
@ -19,6 +21,10 @@ method iteratePastNewRequestEvents*(m: MockMarketplaceService, onNewRequest: OnN
m.iterRequestsCallback = some(onNewRequest) m.iterRequestsCallback = some(onNewRequest)
return success() return success()
method getRequestInfo*(m: MockMarketplaceService, rid: Rid): Future[?RequestInfo] {.async: (raises: []).} =
m.requestInfoRid = rid
return m.requestInfoReturns
proc createMockMarketplaceService*(): MockMarketplaceService = proc createMockMarketplaceService*(): MockMarketplaceService =
MockMarketplaceService( MockMarketplaceService(
subNewRequestsCallback: none(OnNewRequest), subNewRequestsCallback: none(OnNewRequest),

View File

@ -4,7 +4,9 @@ type MockMetrics* = ref object of Metrics
todo*: int todo*: int
ok*: int ok*: int
nok*: int nok*: int
slotFill*: int requests*: int
slots*: int
totalSize*: int
method setTodoNodes*(m: MockMetrics, value: int) = method setTodoNodes*(m: MockMetrics, value: int) =
m.todo = value m.todo = value
@ -15,8 +17,14 @@ method setOkNodes*(m: MockMetrics, value: int) =
method setNokNodes*(m: MockMetrics, value: int) = method setNokNodes*(m: MockMetrics, value: int) =
m.nok = value m.nok = value
method setSlotFill*(m: MockMetrics, value: int) = method setRequests*(m: MockMetrics, value: int) =
m.slotFill = value m.requests = value
method setRequestSlots*(m: MockMetrics, value: int) =
m.slots = value
method setTotalSize*(m: MockMetrics, value: int) =
m.totalSize = value
proc createMockMetrics*(): MockMetrics = proc createMockMetrics*(): MockMetrics =
MockMetrics() MockMetrics()