parent
0014ffdef5
commit
11cd2c46ad
|
@ -37,12 +37,12 @@ export peers, pendingblocks, payments, discovery
|
||||||
logScope:
|
logScope:
|
||||||
topics = "codex blockexcengine"
|
topics = "codex blockexcengine"
|
||||||
|
|
||||||
declareCounter(codexBlockExchangeWantHaveListsSent, "codex blockexchange wantHave lists sent")
|
declareCounter(codex_block_exchange_want_have_lists_sent, "codex blockexchange wantHave lists sent")
|
||||||
declareCounter(codexBlockExchangeWantHaveListsReceived, "codex blockexchange wantHave lists received")
|
declareCounter(codex_block_exchange_want_have_lists_received, "codex blockexchange wantHave lists received")
|
||||||
declareCounter(codexBlockExchangeWantBlockListsSent, "codex blockexchange wantBlock lists sent")
|
declareCounter(codex_block_exchange_want_block_lists_sent, "codex blockexchange wantBlock lists sent")
|
||||||
declareCounter(codexBlockExchangeWantBlockListsReceived, "codex blockexchange wantBlock lists received")
|
declareCounter(codex_block_exchange_want_block_lists_received, "codex blockexchange wantBlock lists received")
|
||||||
declareCounter(codexBlockExchangeBlocksSent, "codex blockexchange blocks sent")
|
declareCounter(codex_block_exchange_blocks_sent, "codex blockexchange blocks sent")
|
||||||
declareCounter(codexBlockExchangeBlocksReceived, "codex blockexchange blocks received")
|
declareCounter(codex_block_exchange_blocks_received, "codex blockexchange blocks received")
|
||||||
|
|
||||||
const
|
const
|
||||||
DefaultMaxPeersPerRequest* = 10
|
DefaultMaxPeersPerRequest* = 10
|
||||||
|
@ -198,7 +198,7 @@ proc requestBlock*(
|
||||||
|
|
||||||
await b.sendWantBlock(cid, blockPeer)
|
await b.sendWantBlock(cid, blockPeer)
|
||||||
|
|
||||||
codexBlockExchangeWantBlockListsSent.inc()
|
codex_block_exchange_want_block_lists_sent.inc()
|
||||||
|
|
||||||
if (peers.len - 1) == 0:
|
if (peers.len - 1) == 0:
|
||||||
trace "No peers to send want list to", cid
|
trace "No peers to send want list to", cid
|
||||||
|
@ -207,7 +207,7 @@ proc requestBlock*(
|
||||||
|
|
||||||
await b.sendWantHave(cid, blockPeer, toSeq(b.peers))
|
await b.sendWantHave(cid, blockPeer, toSeq(b.peers))
|
||||||
|
|
||||||
codexBlockExchangeWantHaveListsSent.inc()
|
codex_block_exchange_want_have_lists_sent.inc()
|
||||||
|
|
||||||
return await blk
|
return await blk
|
||||||
|
|
||||||
|
@ -309,7 +309,7 @@ proc blocksHandler*(
|
||||||
trace "Unable to store block", cid = blk.cid
|
trace "Unable to store block", cid = blk.cid
|
||||||
|
|
||||||
await b.resolveBlocks(blocks)
|
await b.resolveBlocks(blocks)
|
||||||
codexBlockExchangeBlocksReceived.inc(blocks.len.int64)
|
codex_block_exchange_blocks_received.inc(blocks.len.int64)
|
||||||
|
|
||||||
let
|
let
|
||||||
peerCtx = b.peers.get(peer)
|
peerCtx = b.peers.get(peer)
|
||||||
|
@ -351,7 +351,7 @@ proc wantListHandler*(
|
||||||
.price.toBytesBE)
|
.price.toBytesBE)
|
||||||
|
|
||||||
if e.wantType == WantType.WantHave:
|
if e.wantType == WantType.WantHave:
|
||||||
codexBlockExchangeWantHaveListsReceived.inc()
|
codex_block_exchange_want_have_lists_received.inc()
|
||||||
|
|
||||||
if not have and e.sendDontHave:
|
if not have and e.sendDontHave:
|
||||||
trace "Adding dont have entry to presence response", cid = e.cid
|
trace "Adding dont have entry to presence response", cid = e.cid
|
||||||
|
@ -370,7 +370,7 @@ proc wantListHandler*(
|
||||||
elif e.wantType == WantType.WantBlock:
|
elif e.wantType == WantType.WantBlock:
|
||||||
trace "Added entry to peer's want blocks list", cid = e.cid
|
trace "Added entry to peer's want blocks list", cid = e.cid
|
||||||
peerCtx.peerWants.add(e)
|
peerCtx.peerWants.add(e)
|
||||||
codexBlockExchangeWantBlockListsReceived.inc()
|
codex_block_exchange_want_block_lists_received.inc()
|
||||||
else:
|
else:
|
||||||
# peer doesn't want this block anymore
|
# peer doesn't want this block anymore
|
||||||
if e.cancel:
|
if e.cancel:
|
||||||
|
@ -485,7 +485,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
||||||
task.id,
|
task.id,
|
||||||
blocks)
|
blocks)
|
||||||
|
|
||||||
codexBlockExchangeBlocksSent.inc(blocks.len.int64)
|
codex_block_exchange_blocks_sent.inc(blocks.len.int64)
|
||||||
|
|
||||||
trace "About to remove entries from peerWants", blocks = blocks.len, items = task.peerWants.len
|
trace "About to remove entries from peerWants", blocks = blocks.len, items = task.peerWants.len
|
||||||
# Remove successfully sent blocks
|
# Remove successfully sent blocks
|
||||||
|
|
|
@ -24,8 +24,8 @@ import ../../blocktype
|
||||||
logScope:
|
logScope:
|
||||||
topics = "codex pendingblocks"
|
topics = "codex pendingblocks"
|
||||||
|
|
||||||
declareGauge(codexBlockExchangePendingBlockRequests, "codex blockexchange pending block requests")
|
declareGauge(codex_block_exchange_pending_block_requests, "codex blockexchange pending block requests")
|
||||||
declareGauge(codexBlockExchangeRetrievalTimeUs, "codex blockexchange block retrieval time us")
|
declareGauge(codex_block_exchange_retrieval_time_us, "codex blockexchange block retrieval time us")
|
||||||
|
|
||||||
const
|
const
|
||||||
DefaultBlockTimeout* = 10.minutes
|
DefaultBlockTimeout* = 10.minutes
|
||||||
|
@ -40,7 +40,7 @@ type
|
||||||
blocks*: Table[Cid, BlockReq] # pending Block requests
|
blocks*: Table[Cid, BlockReq] # pending Block requests
|
||||||
|
|
||||||
proc updatePendingBlockGauge(p: PendingBlocksManager) =
|
proc updatePendingBlockGauge(p: PendingBlocksManager) =
|
||||||
codexBlockExchangePendingBlockRequests.set(p.blocks.len.int64)
|
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
|
||||||
|
|
||||||
proc getWantHandle*(
|
proc getWantHandle*(
|
||||||
p: PendingBlocksManager,
|
p: PendingBlocksManager,
|
||||||
|
@ -88,7 +88,7 @@ proc resolve*(p: PendingBlocksManager,
|
||||||
startTime = pending[].startTime
|
startTime = pending[].startTime
|
||||||
stopTime = getMonoTime().ticks
|
stopTime = getMonoTime().ticks
|
||||||
retrievalDurationUs = (stopTime - startTime) div 1000
|
retrievalDurationUs = (stopTime - startTime) div 1000
|
||||||
codexBlockExchangeRetrievalTimeUs.set(retrievalDurationUs)
|
codex_block_exchange_retrieval_time_us.set(retrievalDurationUs)
|
||||||
trace "Block retrieval time", retrievalDurationUs
|
trace "Block retrieval time", retrievalDurationUs
|
||||||
|
|
||||||
proc setInFlight*(p: PendingBlocksManager,
|
proc setInFlight*(p: PendingBlocksManager,
|
||||||
|
|
|
@ -4,7 +4,7 @@ import ../statemachine
|
||||||
import ./errorhandling
|
import ./errorhandling
|
||||||
import ./error
|
import ./error
|
||||||
|
|
||||||
declareCounter(codexPurchasesCancelled, "codex purchases cancelled")
|
declareCounter(codex_purchases_cancelled, "codex purchases cancelled")
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "marketplace purchases cancelled"
|
topics = "marketplace purchases cancelled"
|
||||||
|
@ -15,7 +15,7 @@ method `$`*(state: PurchaseCancelled): string =
|
||||||
"cancelled"
|
"cancelled"
|
||||||
|
|
||||||
method run*(state: PurchaseCancelled, machine: Machine): Future[?State] {.async.} =
|
method run*(state: PurchaseCancelled, machine: Machine): Future[?State] {.async.} =
|
||||||
codexPurchasesCancelled.inc()
|
codex_purchases_cancelled.inc()
|
||||||
let purchase = Purchase(machine)
|
let purchase = Purchase(machine)
|
||||||
|
|
||||||
warn "Request cancelled, withdrawing remaining funds", requestId = $purchase.requestId
|
warn "Request cancelled, withdrawing remaining funds", requestId = $purchase.requestId
|
||||||
|
|
|
@ -3,7 +3,7 @@ import pkg/chronicles
|
||||||
import ../statemachine
|
import ../statemachine
|
||||||
import ../../utils/exceptions
|
import ../../utils/exceptions
|
||||||
|
|
||||||
declareCounter(codexPurchasesError, "codex purchases error")
|
declareCounter(codex_purchases_error, "codex purchases error")
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "marketplace purchases errored"
|
topics = "marketplace purchases errored"
|
||||||
|
@ -15,7 +15,7 @@ method `$`*(state: PurchaseErrored): string =
|
||||||
"errored"
|
"errored"
|
||||||
|
|
||||||
method run*(state: PurchaseErrored, machine: Machine): Future[?State] {.async.} =
|
method run*(state: PurchaseErrored, machine: Machine): Future[?State] {.async.} =
|
||||||
codexPurchasesError.inc()
|
codex_purchases_error.inc()
|
||||||
let purchase = Purchase(machine)
|
let purchase = Purchase(machine)
|
||||||
|
|
||||||
error "Purchasing error", error=state.error.msgDetail, requestId = purchase.requestId
|
error "Purchasing error", error=state.error.msgDetail, requestId = purchase.requestId
|
||||||
|
|
|
@ -2,7 +2,7 @@ import pkg/metrics
|
||||||
import ../statemachine
|
import ../statemachine
|
||||||
import ./error
|
import ./error
|
||||||
|
|
||||||
declareCounter(codexPurchasesFailed, "codex purchases failed")
|
declareCounter(codex_purchases_failed, "codex purchases failed")
|
||||||
|
|
||||||
type
|
type
|
||||||
PurchaseFailed* = ref object of PurchaseState
|
PurchaseFailed* = ref object of PurchaseState
|
||||||
|
@ -11,6 +11,6 @@ method `$`*(state: PurchaseFailed): string =
|
||||||
"failed"
|
"failed"
|
||||||
|
|
||||||
method run*(state: PurchaseFailed, machine: Machine): Future[?State] {.async.} =
|
method run*(state: PurchaseFailed, machine: Machine): Future[?State] {.async.} =
|
||||||
codexPurchasesFailed.inc()
|
codex_purchases_failed.inc()
|
||||||
let error = newException(PurchaseError, "Purchase failed")
|
let error = newException(PurchaseError, "Purchase failed")
|
||||||
return some State(PurchaseErrored(error: error))
|
return some State(PurchaseErrored(error: error))
|
||||||
|
|
|
@ -2,7 +2,7 @@ import pkg/metrics
|
||||||
import pkg/chronicles
|
import pkg/chronicles
|
||||||
import ../statemachine
|
import ../statemachine
|
||||||
|
|
||||||
declareCounter(codexPurchasesFinished, "codex purchases finished")
|
declareCounter(codex_purchases_finished, "codex purchases finished")
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "marketplace purchases finished"
|
topics = "marketplace purchases finished"
|
||||||
|
@ -13,7 +13,7 @@ method `$`*(state: PurchaseFinished): string =
|
||||||
"finished"
|
"finished"
|
||||||
|
|
||||||
method run*(state: PurchaseFinished, machine: Machine): Future[?State] {.async.} =
|
method run*(state: PurchaseFinished, machine: Machine): Future[?State] {.async.} =
|
||||||
codexPurchasesFinished.inc()
|
codex_purchases_finished.inc()
|
||||||
let purchase = Purchase(machine)
|
let purchase = Purchase(machine)
|
||||||
info "Purchase finished", requestId = purchase.requestId
|
info "Purchase finished", requestId = purchase.requestId
|
||||||
purchase.future.complete()
|
purchase.future.complete()
|
||||||
|
|
|
@ -3,7 +3,7 @@ import ../statemachine
|
||||||
import ./errorhandling
|
import ./errorhandling
|
||||||
import ./submitted
|
import ./submitted
|
||||||
|
|
||||||
declareCounter(codexPurchasesPending, "codex purchases pending")
|
declareCounter(codex_purchases_pending, "codex purchases pending")
|
||||||
|
|
||||||
type PurchasePending* = ref object of ErrorHandlingState
|
type PurchasePending* = ref object of ErrorHandlingState
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ method `$`*(state: PurchasePending): string =
|
||||||
"pending"
|
"pending"
|
||||||
|
|
||||||
method run*(state: PurchasePending, machine: Machine): Future[?State] {.async.} =
|
method run*(state: PurchasePending, machine: Machine): Future[?State] {.async.} =
|
||||||
codexPurchasesPending.inc()
|
codex_purchases_pending.inc()
|
||||||
let purchase = Purchase(machine)
|
let purchase = Purchase(machine)
|
||||||
let request = !purchase.request
|
let request = !purchase.request
|
||||||
await purchase.market.requestStorage(request)
|
await purchase.market.requestStorage(request)
|
||||||
|
|
|
@ -5,7 +5,7 @@ import ./errorhandling
|
||||||
import ./finished
|
import ./finished
|
||||||
import ./failed
|
import ./failed
|
||||||
|
|
||||||
declareCounter(codexPurchasesStarted, "codex purchases started")
|
declareCounter(codex_purchases_started, "codex purchases started")
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "marketplace purchases started"
|
topics = "marketplace purchases started"
|
||||||
|
@ -16,7 +16,7 @@ method `$`*(state: PurchaseStarted): string =
|
||||||
"started"
|
"started"
|
||||||
|
|
||||||
method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.} =
|
method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.} =
|
||||||
codexPurchasesStarted.inc()
|
codex_purchases_started.inc()
|
||||||
let purchase = Purchase(machine)
|
let purchase = Purchase(machine)
|
||||||
|
|
||||||
let clock = purchase.clock
|
let clock = purchase.clock
|
||||||
|
|
|
@ -8,7 +8,7 @@ import ./cancelled
|
||||||
logScope:
|
logScope:
|
||||||
topics = "marketplace purchases submitted"
|
topics = "marketplace purchases submitted"
|
||||||
|
|
||||||
declareCounter(codexPurchasesSubmitted, "codex purchases submitted")
|
declareCounter(codex_purchases_submitted, "codex purchases submitted")
|
||||||
|
|
||||||
type PurchaseSubmitted* = ref object of ErrorHandlingState
|
type PurchaseSubmitted* = ref object of ErrorHandlingState
|
||||||
|
|
||||||
|
@ -16,7 +16,7 @@ method `$`*(state: PurchaseSubmitted): string =
|
||||||
"submitted"
|
"submitted"
|
||||||
|
|
||||||
method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.} =
|
method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.} =
|
||||||
codexPurchasesSubmitted.inc()
|
codex_purchases_submitted.inc()
|
||||||
let purchase = Purchase(machine)
|
let purchase = Purchase(machine)
|
||||||
let request = !purchase.request
|
let request = !purchase.request
|
||||||
let market = purchase.market
|
let market = purchase.market
|
||||||
|
|
|
@ -7,7 +7,7 @@ import ./cancelled
|
||||||
import ./finished
|
import ./finished
|
||||||
import ./failed
|
import ./failed
|
||||||
|
|
||||||
declareCounter(codexPurchasesUnknown, "codex purchases unknown")
|
declareCounter(codex_purchases_unknown, "codex purchases unknown")
|
||||||
|
|
||||||
type PurchaseUnknown* = ref object of ErrorHandlingState
|
type PurchaseUnknown* = ref object of ErrorHandlingState
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@ method `$`*(state: PurchaseUnknown): string =
|
||||||
"unknown"
|
"unknown"
|
||||||
|
|
||||||
method run*(state: PurchaseUnknown, machine: Machine): Future[?State] {.async.} =
|
method run*(state: PurchaseUnknown, machine: Machine): Future[?State] {.async.} =
|
||||||
codexPurchasesUnknown.inc()
|
codex_purchases_unknown.inc()
|
||||||
let purchase = Purchase(machine)
|
let purchase = Purchase(machine)
|
||||||
if (request =? await purchase.market.getRequest(purchase.requestId)) and
|
if (request =? await purchase.market.getRequest(purchase.requestId)) and
|
||||||
(requestState =? await purchase.market.requestState(purchase.requestId)):
|
(requestState =? await purchase.market.requestState(purchase.requestId)):
|
||||||
|
|
|
@ -43,8 +43,8 @@ import ./json
|
||||||
logScope:
|
logScope:
|
||||||
topics = "codex restapi"
|
topics = "codex restapi"
|
||||||
|
|
||||||
declareCounter(codexApiUploads, "codex API uploads")
|
declareCounter(codex_api_uploads, "codex API uploads")
|
||||||
declareCounter(codexApiDownloads, "codex API downloads")
|
declareCounter(codex_api_downloads, "codex API downloads")
|
||||||
|
|
||||||
proc validate(
|
proc validate(
|
||||||
pattern: string,
|
pattern: string,
|
||||||
|
@ -168,7 +168,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||||
trace "Sending chunk", size = buff.len
|
trace "Sending chunk", size = buff.len
|
||||||
await resp.sendChunk(addr buff[0], buff.len)
|
await resp.sendChunk(addr buff[0], buff.len)
|
||||||
await resp.finish()
|
await resp.finish()
|
||||||
codexApiDownloads.inc()
|
codex_api_downloads.inc()
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Excepting streaming blocks", exc = exc.msg
|
trace "Excepting streaming blocks", exc = exc.msg
|
||||||
return RestApiResponse.error(Http500)
|
return RestApiResponse.error(Http500)
|
||||||
|
@ -243,7 +243,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||||
trace "Error uploading file", exc = error.msg
|
trace "Error uploading file", exc = error.msg
|
||||||
return RestApiResponse.error(Http500, error.msg)
|
return RestApiResponse.error(Http500, error.msg)
|
||||||
|
|
||||||
codexApiUploads.inc()
|
codex_api_uploads.inc()
|
||||||
trace "Uploaded file", cid
|
trace "Uploaded file", cid
|
||||||
return RestApiResponse.response($cid)
|
return RestApiResponse.response($cid)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
|
|
|
@ -31,9 +31,9 @@ export blocktype, cid
|
||||||
logScope:
|
logScope:
|
||||||
topics = "codex repostore"
|
topics = "codex repostore"
|
||||||
|
|
||||||
declareGauge(codexRepostoreBlocks, "codex repostore blocks")
|
declareGauge(codex_repostore_blocks, "codex repostore blocks")
|
||||||
declareGauge(codexRepostoreBytesUsed, "codex repostore bytes used")
|
declareGauge(codex_repostore_bytes_used, "codex repostore bytes used")
|
||||||
declareGauge(codexRepostoreBytesReserved, "codex repostore bytes reserved")
|
declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved")
|
||||||
|
|
||||||
const
|
const
|
||||||
DefaultBlockTtl* = 24.hours
|
DefaultBlockTtl* = 24.hours
|
||||||
|
@ -69,9 +69,9 @@ iterator items*(q: BlockExpirationIter): Future[?BlockExpiration] =
|
||||||
yield q.next()
|
yield q.next()
|
||||||
|
|
||||||
proc updateMetrics(self: RepoStore) =
|
proc updateMetrics(self: RepoStore) =
|
||||||
codexRepostoreBlocks.set(self.totalBlocks.int64)
|
codex_repostore_blocks.set(self.totalBlocks.int64)
|
||||||
codexRepostoreBytesUsed.set(self.quotaUsedBytes.int64)
|
codex_repostore_bytes_used.set(self.quotaUsedBytes.int64)
|
||||||
codexRepostoreBytesReserved.set(self.quotaReservedBytes.int64)
|
codex_repostore_bytes_reserved.set(self.quotaReservedBytes.int64)
|
||||||
|
|
||||||
func totalUsed*(self: RepoStore): uint =
|
func totalUsed*(self: RepoStore): uint =
|
||||||
(self.quotaUsedBytes + self.quotaReservedBytes)
|
(self.quotaUsedBytes + self.quotaReservedBytes)
|
||||||
|
|
Loading…
Reference in New Issue