implements chainmetrics component

This commit is contained in:
ThatBen 2025-03-21 09:41:05 +01:00
parent fe4f5c2991
commit ec3c7c0be3
No known key found for this signature in database
GPG Key ID: E020A7DDCD52E1AB
10 changed files with 87 additions and 68 deletions

View File

@ -1,4 +1,3 @@
import pkg/chronicles
import pkg/chronos
import pkg/questionable/results

View File

@ -8,28 +8,49 @@ import ../services/metrics
import ../services/marketplace
import ../components/requeststore
import ../component
import ../types
logScope:
topics = "chainmetrics"
type ChainMetrics* = ref object of Component
state: State
metrics: Metrics
store: RequestStore
marketplace: MarketplaceService
type
ChainMetrics* = ref object of Component
state: State
metrics: Metrics
store: RequestStore
marketplace: MarketplaceService
Update = ref object
numRequests: int
numSlots: int
totalSize: int64
proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} =
var update = Update(numRequests: 0, numSlots: 0, totalSize: 0)
proc onRequest(entry: RequestEntry): Future[?!void] {.async: (raises: []).} =
let response = await c.marketplace.getRequestInfo(entry.id)
if info =? response:
inc update.numRequests
update.numSlots += info.slots.int
update.totalSize += (info.slots * info.slotSize).int64
else:
?await c.store.remove(entry.id)
return success()
?await c.store.iterateAll(onRequest)
return success(update)
proc updateMetrics(c: ChainMetrics, update: Update) =
c.metrics.setRequests(update.numRequests)
c.metrics.setRequestSlots(update.numSlots)
c.metrics.setTotalSize(update.totalSize)
proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} =
# iterate all requests in requestStore:
# get state of request on chain
# if failed/canceled/error:
# if last-seen is old (1month?3months?)
# delete entry
# else (request is running):
# count:
# total running
# total num slots
# total size of request
# iter finished: update metrics!
without update =? (await c.collectUpdate()), err:
return failure(err)
c.updateMetrics(update)
return success()
method start*(c: ChainMetrics): Future[?!void] {.async.} =
@ -51,6 +72,6 @@ proc new*(
state: State,
metrics: Metrics,
store: RequestStore,
marketplace: MarketplaceService
marketplace: MarketplaceService,
): ChainMetrics =
ChainMetrics(state: state, metrics: metrics, store: store, marketplace: marketplace)

View File

@ -10,7 +10,6 @@ import ../types
import ../component
import ../state
import ../utils/datastoreutils
import ../utils/asyncdataevent
import ../services/clock
const requeststoreName = "requeststore"

View File

@ -17,6 +17,7 @@ type
state: State
market: ?OnChainMarket
clock: Clock
OnNewRequest* = proc(id: Rid): Future[?!void] {.async: (raises: []), gcsafe.}
RequestInfo* = ref object
slots*: uint64
@ -25,25 +26,28 @@ type
proc notStarted() =
raiseAssert("MarketplaceService was called before it was started.")
proc fetchRequestInfo(market: OnChainMarket, rid: Rid): Future[?RequestInfo] {.async: (raises: []).} =
proc fetchRequestInfo(
market: OnChainMarket, rid: Rid
): Future[?RequestInfo] {.async: (raises: []).} =
try:
let request = await market.getRequest(rid)
if r =? request:
return some(RequestInfo(
slots: r.ask.slots,
slotSize: r.ask.slotSize
))
return some(RequestInfo(slots: r.ask.slots, slotSize: r.ask.slotSize))
except CatchableError as exc:
trace "Failed to get request info", err = exc.msg
return none(RequestInfo)
method subscribeToNewRequests*(m: MarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []), base.} =
method subscribeToNewRequests*(
m: MarketplaceService, onNewRequest: OnNewRequest
): Future[?!void] {.async: (raises: []), base.} =
proc resultWrapper(rid: Rid): Future[void] {.async.} =
let response = await onNewRequest(rid)
if error =? response.errorOption:
raiseAssert("Error result in handling of onNewRequest callback: " & error.msg)
proc onRequest(id: RequestId, ask: StorageAsk, expiry: uint64) {.gcsafe, upraises: [].} =
proc onRequest(
id: RequestId, ask: StorageAsk, expiry: uint64
) {.gcsafe, upraises: [].} =
asyncSpawn resultWrapper(Rid(id))
if market =? m.market:
@ -54,8 +58,10 @@ method subscribeToNewRequests*(m: MarketplaceService, onNewRequest: OnNewRequest
else:
notStarted()
return success()
method iteratePastNewRequestEvents*(m: MarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []), base.} =
method iteratePastNewRequestEvents*(
m: MarketplaceService, onNewRequest: OnNewRequest
): Future[?!void] {.async: (raises: []), base.} =
let
oneDay = 60 * 60 * 24
timespan = oneDay * 30
@ -72,7 +78,9 @@ method iteratePastNewRequestEvents*(m: MarketplaceService, onNewRequest: OnNewRe
else:
notStarted()
method getRequestInfo*(m: MarketplaceService, rid: Rid): Future[?RequestInfo] {.async: (raises: []), base.} =
method getRequestInfo*(
m: MarketplaceService, rid: Rid
): Future[?RequestInfo] {.async: (raises: []), base.} =
# If the request id exists and is running, fetch the request object and return the info object.
# otherwise, return none.
if market =? m.market:

View File

@ -8,7 +8,9 @@ declareGauge(nokNodesGauge, "DHT nodes failed to contact")
declareGauge(requestsGauge, "Marketplace active storage requests")
declareGauge(requestSlotsGauge, "Marketplace active storage request slots")
declareGauge(totalStorageSizeGauge, "Marketplace total bytes stored in active storage requests")
declareGauge(
totalStorageSizeGauge, "Marketplace total bytes stored in active storage requests"
)
type
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
@ -48,7 +50,7 @@ method setRequests*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
method setRequestSlots*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
m.onRequestSlots(value.int64)
method setTotalSize*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
method setTotalSize*(m: Metrics, value: int64) {.base, gcsafe, raises: [].} =
m.onTotalSize(value.int64)
proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
@ -74,5 +76,11 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
proc onTotalSize(value: int64) =
totalStorageSizeGauge.set(value)
return
Metrics(todoNodes: onTodo, okNodes: onOk, nokNodes: onNok, onRequests: onRequests, onRequestSlots: onRequestSlots, onTotalSize: onTotalSize)
return Metrics(
todoNodes: onTodo,
okNodes: onOk,
nokNodes: onNok,
onRequests: onRequests,
onRequestSlots: onRequestSlots,
onTotalSize: onTotalSize,
)

View File

@ -25,7 +25,7 @@ suite "ChainCrawler":
state = createMockState()
store = createMockRequestStore()
marketplace = createMockMarketplaceService()
crawler = ChainCrawler.new(state, store, marketplace)
(await crawler.start()).tryGet()

View File

@ -43,18 +43,6 @@ suite "ChainMetrics":
state.delays.len == 1
state.delays[0] == 10.minutes
# iterate all requests in requestStore:
# get state of request on chain
# if failed/canceled/error:
# if last-seen is old (1month?3months?)
# delete entry
# else (request is running):
# count:
# total running
# total num slots
# total size of request
# iter finished: update metrics!
test "onStep should remove non-running requests from request store":
let rid = genRid()
store.iterateEntries.add(RequestEntry(id: rid))
@ -66,7 +54,7 @@ suite "ChainMetrics":
check:
marketplace.requestInfoRid == rid
store.removeRid == rid
test "onStep should count the number of active requests":
let rid1 = genRid()
let rid2 = genRid()
@ -84,9 +72,7 @@ suite "ChainMetrics":
let rid = genRid()
store.iterateEntries.add(RequestEntry(id: rid))
let info = RequestInfo(
slots: 123
)
let info = RequestInfo(slots: 123)
marketplace.requestInfoReturns = some(info)
await onStep()
@ -98,10 +84,7 @@ suite "ChainMetrics":
let rid = genRid()
store.iterateEntries.add(RequestEntry(id: rid))
let info = RequestInfo(
slots: 12,
slotSize: 23
)
let info = RequestInfo(slots: 12, slotSize: 23)
marketplace.requestInfoReturns = some(info)
await onStep()

View File

@ -13,20 +13,25 @@ type MockMarketplaceService* = ref object of MarketplaceService
requestInfoReturns*: ?RequestInfo
requestInfoRid*: Rid
method subscribeToNewRequests*(m: MockMarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []).} =
method subscribeToNewRequests*(
m: MockMarketplaceService, onNewRequest: OnNewRequest
): Future[?!void] {.async: (raises: []).} =
m.subNewRequestsCallback = some(onNewRequest)
return success()
method iteratePastNewRequestEvents*(m: MockMarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []).} =
method iteratePastNewRequestEvents*(
m: MockMarketplaceService, onNewRequest: OnNewRequest
): Future[?!void] {.async: (raises: []).} =
m.iterRequestsCallback = some(onNewRequest)
return success()
method getRequestInfo*(m: MockMarketplaceService, rid: Rid): Future[?RequestInfo] {.async: (raises: []).} =
method getRequestInfo*(
m: MockMarketplaceService, rid: Rid
): Future[?RequestInfo] {.async: (raises: []).} =
m.requestInfoRid = rid
return m.requestInfoReturns
proc createMockMarketplaceService*(): MockMarketplaceService =
MockMarketplaceService(
subNewRequestsCallback: none(OnNewRequest),
iterRequestsCallback: none(OnNewRequest)
subNewRequestsCallback: none(OnNewRequest), iterRequestsCallback: none(OnNewRequest)
)

View File

@ -6,7 +6,7 @@ type MockMetrics* = ref object of Metrics
nok*: int
requests*: int
slots*: int
totalSize*: int
totalSize*: int64
method setTodoNodes*(m: MockMetrics, value: int) =
m.todo = value
@ -23,7 +23,7 @@ method setRequests*(m: MockMetrics, value: int) =
method setRequestSlots*(m: MockMetrics, value: int) =
m.slots = value
method setTotalSize*(m: MockMetrics, value: int) =
method setTotalSize*(m: MockMetrics, value: int64) =
m.totalSize = value
proc createMockMetrics*(): MockMetrics =

View File

@ -10,15 +10,11 @@ type MockRequestStore* = ref object of RequestStore
removeRid*: Rid
iterateEntries*: seq[RequestEntry]
method update*(
s: MockRequestStore, rid: Rid
): Future[?!void] {.async: (raises: []).} =
method update*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} =
s.updateRid = rid
return success()
method remove*(
s: MockRequestStore, rid: Rid
): Future[?!void] {.async: (raises: []).} =
method remove*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} =
s.removeRid = rid
return success()