From ec3c7c0be379744761436cab295ca4a37c9127f6 Mon Sep 17 00:00:00 2001 From: ThatBen Date: Fri, 21 Mar 2025 09:41:05 +0100 Subject: [PATCH] implements chainmetrics component --- codexcrawler/components/chaincrawler.nim | 1 - codexcrawler/components/chainmetrics.nim | 55 +++++++++++++------ codexcrawler/components/requeststore.nim | 1 - codexcrawler/services/marketplace.nim | 28 ++++++---- codexcrawler/services/metrics.nim | 16 ++++-- .../components/testchaincrawler.nim | 2 +- .../components/testchainmetrics.nim | 23 +------- tests/codexcrawler/mocks/mockmarketplace.nim | 17 ++++-- tests/codexcrawler/mocks/mockmetrics.nim | 4 +- tests/codexcrawler/mocks/mockrequeststore.nim | 8 +-- 10 files changed, 87 insertions(+), 68 deletions(-) diff --git a/codexcrawler/components/chaincrawler.nim b/codexcrawler/components/chaincrawler.nim index 8295618..70941fd 100644 --- a/codexcrawler/components/chaincrawler.nim +++ b/codexcrawler/components/chaincrawler.nim @@ -1,4 +1,3 @@ - import pkg/chronicles import pkg/chronos import pkg/questionable/results diff --git a/codexcrawler/components/chainmetrics.nim b/codexcrawler/components/chainmetrics.nim index 62db5e6..8f8f899 100644 --- a/codexcrawler/components/chainmetrics.nim +++ b/codexcrawler/components/chainmetrics.nim @@ -8,28 +8,49 @@ import ../services/metrics import ../services/marketplace import ../components/requeststore import ../component +import ../types logScope: topics = "chainmetrics" -type ChainMetrics* = ref object of Component - state: State - metrics: Metrics - store: RequestStore - marketplace: MarketplaceService +type + ChainMetrics* = ref object of Component + state: State + metrics: Metrics + store: RequestStore + marketplace: MarketplaceService + + Update = ref object + numRequests: int + numSlots: int + totalSize: int64 + +proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} = + var update = Update(numRequests: 0, numSlots: 0, totalSize: 0) + + proc onRequest(entry: RequestEntry): Future[?!void] {.async: (raises: []).} = + let response = await c.marketplace.getRequestInfo(entry.id) + if info =? response: + inc update.numRequests + update.numSlots += info.slots.int + update.totalSize += (info.slots * info.slotSize).int64 + else: + ?await c.store.remove(entry.id) + return success() + + ?await c.store.iterateAll(onRequest) + return success(update) + +proc updateMetrics(c: ChainMetrics, update: Update) = + c.metrics.setRequests(update.numRequests) + c.metrics.setRequestSlots(update.numSlots) + c.metrics.setTotalSize(update.totalSize) proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} = - # iterate all requests in requestStore: - # get state of request on chain - # if failed/canceled/error: - # if last-seen is old (1month?3months?) - # delete entry - # else (request is running): - # count: - # total running - # total num slots - # total size of request - # iter finished: update metrics! + without update =? (await c.collectUpdate()), err: + return failure(err) + + c.updateMetrics(update) return success() method start*(c: ChainMetrics): Future[?!void] {.async.} = @@ -51,6 +72,6 @@ proc new*( state: State, metrics: Metrics, store: RequestStore, - marketplace: MarketplaceService + marketplace: MarketplaceService, ): ChainMetrics = ChainMetrics(state: state, metrics: metrics, store: store, marketplace: marketplace) diff --git a/codexcrawler/components/requeststore.nim b/codexcrawler/components/requeststore.nim index 566c308..e4f18b6 100644 --- a/codexcrawler/components/requeststore.nim +++ b/codexcrawler/components/requeststore.nim @@ -10,7 +10,6 @@ import ../types import ../component import ../state import ../utils/datastoreutils -import ../utils/asyncdataevent import ../services/clock const requeststoreName = "requeststore" diff --git a/codexcrawler/services/marketplace.nim b/codexcrawler/services/marketplace.nim index eb4c73d..4a84d1a 100644 --- a/codexcrawler/services/marketplace.nim +++ b/codexcrawler/services/marketplace.nim @@ -17,6 +17,7 @@ type state: State market: ?OnChainMarket clock: Clock + OnNewRequest* = proc(id: Rid): Future[?!void] {.async: (raises: []), gcsafe.} RequestInfo* = ref object slots*: uint64 @@ -25,25 +26,28 @@ type proc notStarted() = raiseAssert("MarketplaceService was called before it was started.") -proc fetchRequestInfo(market: OnChainMarket, rid: Rid): Future[?RequestInfo] {.async: (raises: []).} = +proc fetchRequestInfo( + market: OnChainMarket, rid: Rid +): Future[?RequestInfo] {.async: (raises: []).} = try: let request = await market.getRequest(rid) if r =? request: - return some(RequestInfo( - slots: r.ask.slots, - slotSize: r.ask.slotSize - )) + return some(RequestInfo(slots: r.ask.slots, slotSize: r.ask.slotSize)) except CatchableError as exc: trace "Failed to get request info", err = exc.msg return none(RequestInfo) -method subscribeToNewRequests*(m: MarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []), base.} = +method subscribeToNewRequests*( + m: MarketplaceService, onNewRequest: OnNewRequest +): Future[?!void] {.async: (raises: []), base.} = proc resultWrapper(rid: Rid): Future[void] {.async.} = let response = await onNewRequest(rid) if error =? response.errorOption: raiseAssert("Error result in handling of onNewRequest callback: " & error.msg) - proc onRequest(id: RequestId, ask: StorageAsk, expiry: uint64) {.gcsafe, upraises: [].} = + proc onRequest( + id: RequestId, ask: StorageAsk, expiry: uint64 + ) {.gcsafe, upraises: [].} = asyncSpawn resultWrapper(Rid(id)) if market =? m.market: @@ -54,8 +58,10 @@ method subscribeToNewRequests*(m: MarketplaceService, onNewRequest: OnNewRequest else: notStarted() return success() - -method iteratePastNewRequestEvents*(m: MarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []), base.} = + +method iteratePastNewRequestEvents*( + m: MarketplaceService, onNewRequest: OnNewRequest +): Future[?!void] {.async: (raises: []), base.} = let oneDay = 60 * 60 * 24 timespan = oneDay * 30 @@ -72,7 +78,9 @@ method iteratePastNewRequestEvents*(m: MarketplaceService, onNewRequest: OnNewRe else: notStarted() -method getRequestInfo*(m: MarketplaceService, rid: Rid): Future[?RequestInfo] {.async: (raises: []), base.} = +method getRequestInfo*( + m: MarketplaceService, rid: Rid +): Future[?RequestInfo] {.async: (raises: []), base.} = # If the request id exists and is running, fetch the request object and return the info object. # otherwise, return none. if market =? m.market: diff --git a/codexcrawler/services/metrics.nim b/codexcrawler/services/metrics.nim index d4dc1e1..adc8852 100644 --- a/codexcrawler/services/metrics.nim +++ b/codexcrawler/services/metrics.nim @@ -8,7 +8,9 @@ declareGauge(nokNodesGauge, "DHT nodes failed to contact") declareGauge(requestsGauge, "Marketplace active storage requests") declareGauge(requestSlotsGauge, "Marketplace active storage request slots") -declareGauge(totalStorageSizeGauge, "Marketplace total bytes stored in active storage requests") +declareGauge( + totalStorageSizeGauge, "Marketplace total bytes stored in active storage requests" +) type OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].} @@ -48,7 +50,7 @@ method setRequests*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = method setRequestSlots*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = m.onRequestSlots(value.int64) -method setTotalSize*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = +method setTotalSize*(m: Metrics, value: int64) {.base, gcsafe, raises: [].} = m.onTotalSize(value.int64) proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics = @@ -74,5 +76,11 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics = proc onTotalSize(value: int64) = totalStorageSizeGauge.set(value) - return - Metrics(todoNodes: onTodo, okNodes: onOk, nokNodes: onNok, onRequests: onRequests, onRequestSlots: onRequestSlots, onTotalSize: onTotalSize) + return Metrics( + todoNodes: onTodo, + okNodes: onOk, + nokNodes: onNok, + onRequests: onRequests, + onRequestSlots: onRequestSlots, + onTotalSize: onTotalSize, + ) diff --git a/tests/codexcrawler/components/testchaincrawler.nim b/tests/codexcrawler/components/testchaincrawler.nim index 8a50a78..cdd74f3 100644 --- a/tests/codexcrawler/components/testchaincrawler.nim +++ b/tests/codexcrawler/components/testchaincrawler.nim @@ -25,7 +25,7 @@ suite "ChainCrawler": state = createMockState() store = createMockRequestStore() marketplace = createMockMarketplaceService() - + crawler = ChainCrawler.new(state, store, marketplace) (await crawler.start()).tryGet() diff --git a/tests/codexcrawler/components/testchainmetrics.nim b/tests/codexcrawler/components/testchainmetrics.nim index 84a079e..81c062b 100644 --- a/tests/codexcrawler/components/testchainmetrics.nim +++ b/tests/codexcrawler/components/testchainmetrics.nim @@ -43,18 +43,6 @@ suite "ChainMetrics": state.delays.len == 1 state.delays[0] == 10.minutes - # iterate all requests in requestStore: - # get state of request on chain - # if failed/canceled/error: - # if last-seen is old (1month?3months?) - # delete entry - # else (request is running): - # count: - # total running - # total num slots - # total size of request - # iter finished: update metrics! - test "onStep should remove non-running requests from request store": let rid = genRid() store.iterateEntries.add(RequestEntry(id: rid)) @@ -66,7 +54,7 @@ suite "ChainMetrics": check: marketplace.requestInfoRid == rid store.removeRid == rid - + test "onStep should count the number of active requests": let rid1 = genRid() let rid2 = genRid() @@ -84,9 +72,7 @@ suite "ChainMetrics": let rid = genRid() store.iterateEntries.add(RequestEntry(id: rid)) - let info = RequestInfo( - slots: 123 - ) + let info = RequestInfo(slots: 123) marketplace.requestInfoReturns = some(info) await onStep() @@ -98,10 +84,7 @@ suite "ChainMetrics": let rid = genRid() store.iterateEntries.add(RequestEntry(id: rid)) - let info = RequestInfo( - slots: 12, - slotSize: 23 - ) + let info = RequestInfo(slots: 12, slotSize: 23) marketplace.requestInfoReturns = some(info) await onStep() diff --git a/tests/codexcrawler/mocks/mockmarketplace.nim b/tests/codexcrawler/mocks/mockmarketplace.nim index a48f24f..97d0ef1 100644 --- a/tests/codexcrawler/mocks/mockmarketplace.nim +++ b/tests/codexcrawler/mocks/mockmarketplace.nim @@ -13,20 +13,25 @@ type MockMarketplaceService* = ref object of MarketplaceService requestInfoReturns*: ?RequestInfo requestInfoRid*: Rid -method subscribeToNewRequests*(m: MockMarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []).} = +method subscribeToNewRequests*( + m: MockMarketplaceService, onNewRequest: OnNewRequest +): Future[?!void] {.async: (raises: []).} = m.subNewRequestsCallback = some(onNewRequest) return success() - -method iteratePastNewRequestEvents*(m: MockMarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []).} = + +method iteratePastNewRequestEvents*( + m: MockMarketplaceService, onNewRequest: OnNewRequest +): Future[?!void] {.async: (raises: []).} = m.iterRequestsCallback = some(onNewRequest) return success() -method getRequestInfo*(m: MockMarketplaceService, rid: Rid): Future[?RequestInfo] {.async: (raises: []).} = +method getRequestInfo*( + m: MockMarketplaceService, rid: Rid +): Future[?RequestInfo] {.async: (raises: []).} = m.requestInfoRid = rid return m.requestInfoReturns proc createMockMarketplaceService*(): MockMarketplaceService = MockMarketplaceService( - subNewRequestsCallback: none(OnNewRequest), - iterRequestsCallback: none(OnNewRequest) + subNewRequestsCallback: none(OnNewRequest), iterRequestsCallback: none(OnNewRequest) ) diff --git a/tests/codexcrawler/mocks/mockmetrics.nim b/tests/codexcrawler/mocks/mockmetrics.nim index cfe1b01..01787cc 100644 --- a/tests/codexcrawler/mocks/mockmetrics.nim +++ b/tests/codexcrawler/mocks/mockmetrics.nim @@ -6,7 +6,7 @@ type MockMetrics* = ref object of Metrics nok*: int requests*: int slots*: int - totalSize*: int + totalSize*: int64 method setTodoNodes*(m: MockMetrics, value: int) = m.todo = value @@ -23,7 +23,7 @@ method setRequests*(m: MockMetrics, value: int) = method setRequestSlots*(m: MockMetrics, value: int) = m.slots = value -method setTotalSize*(m: MockMetrics, value: int) = +method setTotalSize*(m: MockMetrics, value: int64) = m.totalSize = value proc createMockMetrics*(): MockMetrics = diff --git a/tests/codexcrawler/mocks/mockrequeststore.nim b/tests/codexcrawler/mocks/mockrequeststore.nim index 7005594..075fc42 100644 --- a/tests/codexcrawler/mocks/mockrequeststore.nim +++ b/tests/codexcrawler/mocks/mockrequeststore.nim @@ -10,15 +10,11 @@ type MockRequestStore* = ref object of RequestStore removeRid*: Rid iterateEntries*: seq[RequestEntry] -method update*( - s: MockRequestStore, rid: Rid -): Future[?!void] {.async: (raises: []).} = +method update*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} = s.updateRid = rid return success() -method remove*( - s: MockRequestStore, rid: Rid -): Future[?!void] {.async: (raises: []).} = +method remove*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} = s.removeRid = rid return success()