From 14c5270e836011f8168d2da6a389080d0324645b Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Thu, 20 Jul 2023 09:56:28 +0200 Subject: [PATCH] Add metrics (#478) * Adds metrics to block exchange. * Adds metrics to purchasing * Adds metrics to upload and download API * Adds metrics to the repo store * Fixes exception in repostore start. * Merge managed to mess up indentation. --- codex/blockexchange/engine/engine.nim | 23 +++++++++- codex/blockexchange/engine/pendingblocks.nim | 8 ++++ codex/namespaces.nim | 1 + codex/purchasing/states/cancelled.nim | 4 ++ codex/purchasing/states/error.nim | 4 ++ codex/purchasing/states/failed.nim | 4 ++ codex/purchasing/states/finished.nim | 4 ++ codex/purchasing/states/pending.nim | 4 ++ codex/purchasing/states/started.nim | 4 ++ codex/purchasing/states/submitted.nim | 4 ++ codex/purchasing/states/unknown.nim | 4 ++ codex/rest/api.nim | 6 +++ codex/stores/keyutils.nim | 1 + codex/stores/repostore.nim | 48 ++++++++++++++++++-- 14 files changed, 115 insertions(+), 4 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index c517c72a..be4f3da3 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -15,6 +15,7 @@ import std/algorithm import pkg/chronos import pkg/chronicles import pkg/libp2p +import pkg/metrics import pkg/stint import ../../stores/blockstore @@ -36,6 +37,13 @@ export peers, pendingblocks, payments, discovery logScope: topics = "codex blockexcengine" +declareCounter(codexBlockExchangeWantHaveListsSent, "codex blockexchange wantHave lists sent") +declareCounter(codexBlockExchangeWantHaveListsReceived, "codex blockexchange wantHave lists received") +declareCounter(codexBlockExchangeWantBlockListsSent, "codex blockexchange wantBlock lists sent") +declareCounter(codexBlockExchangeWantBlockListsReceived, "codex blockexchange wantBlock lists received") +declareCounter(codexBlockExchangeBlocksSent, "codex blockexchange blocks sent") +declareCounter(codexBlockExchangeBlocksReceived, "codex blockexchange blocks received") + const DefaultMaxPeersPerRequest* = 10 DefaultTaskQueueSize = 100 @@ -190,6 +198,8 @@ proc requestBlock*( await b.sendWantBlock(cid, blockPeer) + codexBlockExchangeWantBlockListsSent.inc() + if (peers.len - 1) == 0: trace "No peers to send want list to", cid b.discovery.queueFindBlocksReq(@[cid]) @@ -197,6 +207,8 @@ proc requestBlock*( await b.sendWantHave(cid, blockPeer, toSeq(b.peers)) + codexBlockExchangeWantHaveListsSent.inc() + return await blk proc blockPresenceHandler*( @@ -297,6 +309,8 @@ proc blocksHandler*( trace "Unable to store block", cid = blk.cid await b.resolveBlocks(blocks) + codexBlockExchangeBlocksReceived.inc(blocks.len.int64) + let peerCtx = b.peers.get(peer) @@ -336,6 +350,9 @@ proc wantListHandler*( b.pricing.get(Pricing(price: 0.u256)) .price.toBytesBE) + if e.wantType == WantType.WantHave: + codexBlockExchangeWantHaveListsReceived.inc() + if not have and e.sendDontHave: trace "Adding dont have entry to presence response", cid = e.cid presence.add( @@ -353,6 +370,7 @@ proc wantListHandler*( elif e.wantType == WantType.WantBlock: trace "Added entry to peer's want blocks list", cid = e.cid peerCtx.peerWants.add(e) + codexBlockExchangeWantBlockListsReceived.inc() else: # peer doesn't want this block anymore if e.cancel: @@ -467,6 +485,9 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = task.id, blocks) + codexBlockExchangeBlocksSent.inc(blocks.len.int64) + + trace "About to remove entries from peerWants", blocks = blocks.len, items = task.peerWants.len # Remove successfully sent blocks task.peerWants.keepIf( proc(e: Entry): bool = @@ -500,7 +521,7 @@ proc new*( peersPerRequest = DefaultMaxPeersPerRequest ): BlockExcEngine = ## Create new block exchange engine instance - ## + ## let engine = BlockExcEngine( diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 171311f4..e897a66f 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -16,12 +16,15 @@ push: {.upraises: [].} import pkg/chronicles import pkg/chronos import pkg/libp2p +import pkg/metrics import ../../blocktype logScope: topics = "codex pendingblocks" +declareGauge(codexBlockExchangePendingBlockRequests, "codex blockexchange pending block requests") + const DefaultBlockTimeout* = 10.minutes @@ -33,6 +36,9 @@ type PendingBlocksManager* = ref object of RootObj blocks*: Table[Cid, BlockReq] # pending Block requests +proc updatePendingBlockGauge(p: PendingBlocksManager) = + codexBlockExchangePendingBlockRequests.set(p.blocks.len.int64) + proc getWantHandle*( p: PendingBlocksManager, cid: Cid, @@ -50,6 +56,7 @@ proc getWantHandle*( trace "Adding pending future for block", cid, inFlight = p.blocks[cid].inFlight + p.updatePendingBlockGauge() return await p.blocks[cid].handle.wait(timeout) except CancelledError as exc: trace "Blocks cancelled", exc = exc.msg, cid @@ -60,6 +67,7 @@ proc getWantHandle*( raise exc finally: p.blocks.del(cid) + p.updatePendingBlockGauge() proc resolve*(p: PendingBlocksManager, blocks: seq[Block]) = diff --git a/codex/namespaces.nim b/codex/namespaces.nim index 93d7902c..b7988cdc 100644 --- a/codex/namespaces.nim +++ b/codex/namespaces.nim @@ -11,6 +11,7 @@ const # Namespaces CodexMetaNamespace* = "meta" # meta info stored here CodexRepoNamespace* = "repo" # repository namespace, blocks and manifests are subkeys + CodexBlockTotalNamespace* = CodexMetaNamespace & "/total" # number of blocks in the repo CodexBlocksNamespace* = CodexRepoNamespace & "/blocks" # blocks namespace CodexManifestNamespace* = CodexRepoNamespace & "/manifests" # manifest namespace CodexBlocksTtlNamespace* = # Cid TTL diff --git a/codex/purchasing/states/cancelled.nim b/codex/purchasing/states/cancelled.nim index a0d8315b..cb0be05d 100644 --- a/codex/purchasing/states/cancelled.nim +++ b/codex/purchasing/states/cancelled.nim @@ -1,13 +1,17 @@ +import pkg/metrics import ../statemachine import ./errorhandling import ./error +declareCounter(codexPurchasesCancelled, "codex purchases cancelled") + type PurchaseCancelled* = ref object of ErrorHandlingState method `$`*(state: PurchaseCancelled): string = "cancelled" method run*(state: PurchaseCancelled, machine: Machine): Future[?State] {.async.} = + codexPurchasesCancelled.inc() let purchase = Purchase(machine) await purchase.market.withdrawFunds(purchase.requestId) let error = newException(Timeout, "Purchase cancelled due to timeout") diff --git a/codex/purchasing/states/error.nim b/codex/purchasing/states/error.nim index df1c8d5c..c1fbbc6a 100644 --- a/codex/purchasing/states/error.nim +++ b/codex/purchasing/states/error.nim @@ -1,5 +1,8 @@ +import pkg/metrics import ../statemachine +declareCounter(codexPurchasesError, "codex purchases error") + type PurchaseErrored* = ref object of PurchaseState error*: ref CatchableError @@ -7,5 +10,6 @@ method `$`*(state: PurchaseErrored): string = "errored" method run*(state: PurchaseErrored, machine: Machine): Future[?State] {.async.} = + codexPurchasesError.inc() let purchase = Purchase(machine) purchase.future.fail(state.error) diff --git a/codex/purchasing/states/failed.nim b/codex/purchasing/states/failed.nim index 3fbe36f7..8a814147 100644 --- a/codex/purchasing/states/failed.nim +++ b/codex/purchasing/states/failed.nim @@ -1,6 +1,9 @@ +import pkg/metrics import ../statemachine import ./error +declareCounter(codexPurchasesFailed, "codex purchases failed") + type PurchaseFailed* = ref object of PurchaseState @@ -8,5 +11,6 @@ method `$`*(state: PurchaseFailed): string = "failed" method run*(state: PurchaseFailed, machine: Machine): Future[?State] {.async.} = + codexPurchasesFailed.inc() let error = newException(PurchaseError, "Purchase failed") return some State(PurchaseErrored(error: error)) diff --git a/codex/purchasing/states/finished.nim b/codex/purchasing/states/finished.nim index 93e8b4f0..6e28bec2 100644 --- a/codex/purchasing/states/finished.nim +++ b/codex/purchasing/states/finished.nim @@ -1,10 +1,14 @@ +import pkg/metrics import ../statemachine +declareCounter(codexPurchasesFinished, "codex purchases finished") + type PurchaseFinished* = ref object of PurchaseState method `$`*(state: PurchaseFinished): string = "finished" method run*(state: PurchaseFinished, machine: Machine): Future[?State] {.async.} = + codexPurchasesFinished.inc() let purchase = Purchase(machine) purchase.future.complete() diff --git a/codex/purchasing/states/pending.nim b/codex/purchasing/states/pending.nim index 64a7fdd5..14d1240b 100644 --- a/codex/purchasing/states/pending.nim +++ b/codex/purchasing/states/pending.nim @@ -1,13 +1,17 @@ +import pkg/metrics import ../statemachine import ./errorhandling import ./submitted +declareCounter(codexPurchasesPending, "codex purchases pending") + type PurchasePending* = ref object of ErrorHandlingState method `$`*(state: PurchasePending): string = "pending" method run*(state: PurchasePending, machine: Machine): Future[?State] {.async.} = + codexPurchasesPending.inc() let purchase = Purchase(machine) let request = !purchase.request await purchase.market.requestStorage(request) diff --git a/codex/purchasing/states/started.nim b/codex/purchasing/states/started.nim index 27d28ddf..26cc871d 100644 --- a/codex/purchasing/states/started.nim +++ b/codex/purchasing/states/started.nim @@ -1,14 +1,18 @@ +import pkg/metrics import ../statemachine import ./errorhandling import ./finished import ./failed +declareCounter(codexPurchasesStarted, "codex purchases started") + type PurchaseStarted* = ref object of ErrorHandlingState method `$`*(state: PurchaseStarted): string = "started" method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.} = + codexPurchasesStarted.inc() let purchase = Purchase(machine) let clock = purchase.clock diff --git a/codex/purchasing/states/submitted.nim b/codex/purchasing/states/submitted.nim index 5e6dd892..4505ae91 100644 --- a/codex/purchasing/states/submitted.nim +++ b/codex/purchasing/states/submitted.nim @@ -1,14 +1,18 @@ +import pkg/metrics import ../statemachine import ./errorhandling import ./started import ./cancelled +declareCounter(codexPurchasesSubmitted, "codex purchases submitted") + type PurchaseSubmitted* = ref object of ErrorHandlingState method `$`*(state: PurchaseSubmitted): string = "submitted" method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.} = + codexPurchasesSubmitted.inc() let purchase = Purchase(machine) let request = !purchase.request let market = purchase.market diff --git a/codex/purchasing/states/unknown.nim b/codex/purchasing/states/unknown.nim index 38628334..0431396e 100644 --- a/codex/purchasing/states/unknown.nim +++ b/codex/purchasing/states/unknown.nim @@ -1,3 +1,4 @@ +import pkg/metrics import ../statemachine import ./errorhandling import ./submitted @@ -6,12 +7,15 @@ import ./cancelled import ./finished import ./failed +declareCounter(codexPurchasesUnknown, "codex purchases unknown") + type PurchaseUnknown* = ref object of ErrorHandlingState method `$`*(state: PurchaseUnknown): string = "unknown" method run*(state: PurchaseUnknown, machine: Machine): Future[?State] {.async.} = + codexPurchasesUnknown.inc() let purchase = Purchase(machine) if (request =? await purchase.market.getRequest(purchase.requestId)) and (requestState =? await purchase.market.requestState(purchase.requestId)): diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 77d36f63..651d370d 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -20,6 +20,7 @@ import pkg/chronicles import pkg/chronos import pkg/presto import pkg/libp2p +import pkg/metrics import pkg/stew/base10 import pkg/stew/byteutils import pkg/confutils @@ -42,6 +43,9 @@ import ./json logScope: topics = "codex restapi" +declareCounter(codexApiUploads, "codex API uploads") +declareCounter(codexApiDownloads, "codex API downloads") + proc validate( pattern: string, value: string): int @@ -164,6 +168,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = trace "Sending chunk", size = buff.len await resp.sendChunk(addr buff[0], buff.len) await resp.finish() + codexApiDownloads.inc() except CatchableError as exc: trace "Excepting streaming blocks", exc = exc.msg return RestApiResponse.error(Http500) @@ -238,6 +243,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = trace "Error uploading file", exc = error.msg return RestApiResponse.error(Http500, error.msg) + codexApiUploads.inc() trace "Uploaded file", cid return RestApiResponse.response($cid) except CancelledError: diff --git a/codex/stores/keyutils.nim b/codex/stores/keyutils.nim index e4a6c98a..4b8507d0 100644 --- a/codex/stores/keyutils.nim +++ b/codex/stores/keyutils.nim @@ -20,6 +20,7 @@ const CodexMetaKey* = Key.init(CodexMetaNamespace).tryGet CodexRepoKey* = Key.init(CodexRepoNamespace).tryGet CodexBlocksKey* = Key.init(CodexBlocksNamespace).tryGet + CodexTotalBlocksKey* = Key.init(CodexBlockTotalNamespace).tryGet CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet QuotaKey* = Key.init(CodexQuotaNamespace).tryGet diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index 6e2731a4..c9c79f36 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -14,6 +14,7 @@ push: {.upraises: [].} import pkg/chronos import pkg/chronicles import pkg/libp2p +import pkg/metrics import pkg/questionable import pkg/questionable/results import pkg/datastore @@ -30,6 +31,10 @@ export blocktype, libp2p logScope: topics = "codex repostore" +declareGauge(codexRepostoreBlocks, "codex repostore blocks") +declareGauge(codexRepostoreBytesUsed, "codex repostore bytes used") +declareGauge(codexRepostoreBytesReserved, "codex repostore bytes reserved") + const DefaultBlockTtl* = 24.hours DefaultQuotaBytes* = 1'u shl 33'u # ~8GB @@ -43,6 +48,7 @@ type repoDs*: Datastore metaDs*: Datastore clock: Clock + totalBlocks*: uint # number of blocks in the store quotaMaxBytes*: uint # maximum available bytes quotaUsedBytes*: uint # bytes used by the repo quotaReservedBytes*: uint # bytes reserved by the repo @@ -61,6 +67,11 @@ iterator items*(q: BlockExpirationIter): Future[?BlockExpiration] = while not q.finished: yield q.next() +proc updateMetrics(self: RepoStore) = + codexRepostoreBlocks.set(self.totalBlocks.int64) + codexRepostoreBytesUsed.set(self.quotaUsedBytes.int64) + codexRepostoreBytesReserved.set(self.quotaReservedBytes.int64) + func totalUsed*(self: RepoStore): uint = (self.quotaUsedBytes + self.quotaReservedBytes) @@ -105,6 +116,14 @@ proc getBlockExpirationEntry( let value = self.getBlockExpirationTimestamp(ttl).toBytes return success((key, value)) +proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} = + if err =? (await self.metaDs.put( + CodexTotalBlocksKey, + @(self.totalBlocks.uint64.toBytesBE))).errorOption: + trace "Error total blocks key!", err = err.msg + return failure(err) + return success() + method putBlock*( self: RepoStore, blk: Block, @@ -156,6 +175,12 @@ method putBlock*( return failure(err) self.quotaUsedBytes = used + inc self.totalBlocks + if isErr (await self.persistTotalBlocksCount()): + trace "Unable to update block total metadata" + return failure("Unable to update block total metadata") + + self.updateMetrics() return success() proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} = @@ -166,6 +191,7 @@ proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} trace "Error updating quota key!", err = err.msg return failure(err) self.quotaUsedBytes = used + self.updateMetrics() return success() proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} = @@ -195,6 +221,12 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = trace "Deleted block", cid, totalUsed = self.totalUsed + dec self.totalBlocks + if isErr (await self.persistTotalBlocksCount()): + trace "Unable to update block total metadata" + return failure("Unable to update block total metadata") + + self.updateMetrics() return success() method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = @@ -251,7 +283,7 @@ method getBlockExpirations*( offset: int ): Future[?!BlockExpirationIter] {.async, base.} = ## Get block experiartions from the given RepoStore - ## + ## without query =? createBlockExpirationQuery(maxNumber, offset), err: trace "Unable to format block expirations query" return failure(err) @@ -346,6 +378,7 @@ proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = return failure(err) trace "Released bytes", bytes + self.updateMetrics() return success() proc start*(self: RepoStore): Future[void] {.async.} = @@ -358,6 +391,14 @@ proc start*(self: RepoStore): Future[void] {.async.} = trace "Starting repo" + without total =? await self.metaDs.get(CodexTotalBlocksKey), err: + if not (err of DatastoreKeyNotFound): + error "Unable to read total number of blocks from metadata store", err = err.msg, key = $CodexTotalBlocksKey + + if total.len > 0: + self.totalBlocks = uint64.fromBytesBE(total).uint + trace "Number of blocks in store at start", total = self.totalBlocks + ## load current persist and cache bytes from meta ds without quotaUsedBytes =? await self.metaDs.get(QuotaUsedKey), err: if not (err of DatastoreKeyNotFound): @@ -386,6 +427,7 @@ proc start*(self: RepoStore): Future[void] {.async.} = notice "Current bytes used for persist quota", bytes = self.quotaReservedBytes + self.updateMetrics() self.started = true proc stop*(self: RepoStore): Future[void] {.async.} = @@ -410,8 +452,8 @@ func new*( quotaMaxBytes = DefaultQuotaBytes, blockTtl = DefaultBlockTtl ): RepoStore = - ## Create new instance of a RepoStore - ## + ## Create new instance of a RepoStore + ## RepoStore( repoDs: repoDs, metaDs: metaDs,